Source code for mqttools.common

import binascii
import enum
import logging
import struct
import traceback
from io import BytesIO

import bitstruct

# Connection flags.
CLEAN_START    = 0x02
WILL_FLAG      = 0x04
WILL_QOS_1     = 0x08
WILL_QOS_2     = 0x10
WILL_RETAIN    = 0x20
PASSWORD_FLAG  = 0x40
USER_NAME_FLAG = 0x80


def hexlify(data):
    if data is None:
        return None
    else:
        return binascii.hexlify(data).decode('ascii').upper()


# Control packet types.
class ControlPacketType(enum.IntEnum):
    CONNECT     = 1
    CONNACK     = 2
    PUBLISH     = 3
    PUBACK      = 4
    PUBREC      = 5
    PUBREL      = 6
    PUBCOMP     = 7
    SUBSCRIBE   = 8
    SUBACK      = 9
    UNSUBSCRIBE = 10
    UNSUBACK    = 11
    PINGREQ     = 12
    PINGRESP    = 13
    DISCONNECT  = 14
    AUTH        = 15


class ConnectReasonCode(enum.IntEnum):
    SUCCESS                              = 0
    V3_1_1_UNACCEPTABLE_PROTOCOL_VERSION = 1
    V3_1_1_IDENTIFIER_REJECTED           = 2
    V3_1_1_SERVER_UNAVAILABLE            = 3
    V3_1_1_BAD_USER_NAME_OR_PASSWORD     = 4
    V3_1_1_NOT_AUTHORIZED                = 5
    UNSPECIFIED_ERROR                    = 128
    MALFORMED_PACKET                     = 129
    PROTOCOL_ERROR                       = 130
    IMPLEMENTATION_SPECIFIC_ERROR        = 131
    UNSUPPORTED_PROTOCOL_VERSION         = 132
    CLIENT_IDENTIFIER_NOT_VALID          = 133
    BAD_USER_NAME_OR_PASSWORD            = 134
    NOT_AUTHORIZED                       = 135
    SERVER_UNAVAILABLE                   = 136
    SERVER_BUSY                          = 137
    BANNED                               = 138
    BAD_AUTHENTICATION_METHOD            = 140
    TOPIC_NAME_INVALID                   = 144
    PACKET_TOO_LARGE                     = 149
    QUOTA_EXCEEDED                       = 151
    PAYLOAD_FORMAT_INVALID               = 153
    RETAIN_NOT_SUPPORTED                 = 154
    QOS_NOT_SUPPORTED                    = 155
    USE_ANOTHER_SERVER                   = 156
    SERVER_MOVED                         = 157
    CONNECTION_RATE_EXCEEDED             = 159


class DisconnectReasonCode(enum.IntEnum):
    NORMAL_DISCONNECTION                   = 0
    DISCONNECT_WITH_WILL_MESSAGE           = 4
    UNSPECIFIED_ERROR                      = 128
    MALFORMED_PACKET                       = 129
    PROTOCOL_ERROR                         = 130
    IMPLEMENTATION_SPECIFIC_ERROR          = 131
    NOT_AUTHORIZED                         = 135
    SERVER_BUSY                            = 137
    SERVER_SHUTTING_DOWN                   = 139
    KEEP_ALIVE_TIMEOUT                     = 141
    SESSION_TAKEN_OVER                     = 142
    TOPIC_FILTER_INVALID                   = 143
    TOPIC_NAME_INVALID                     = 144
    RECEIVE_MAXIMUM_EXCEEDED               = 147
    TOPIC_ALIAS_INVALID                    = 148
    PACKET_TOO_LARGE                       = 149
    MESSAGE_RATE_TOO_HIGH                  = 150
    QUOTA_EXCEEDED                         = 151
    ADMINISTRATIVE_ACTION                  = 152
    PAYLOAD_FORMAT_INVALID                 = 153
    RETAIN_NOT_SUPPORTED                   = 154
    QOS_NOT_SUPPORTED                      = 155
    USE_ANOTHER_SERVER                     = 156
    SERVER_MOVED                           = 157
    SHARED_SUBSCRIPTIONS_NOT_SUPPORTED     = 158
    CONNECTION_RATE_EXCEEDED               = 159
    MAXIMUM_CONNECT_TIME                   = 160
    SUBSCRIPTION_IDENTIFIERS_NOT_SUPPORTED = 161
    WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED   = 162


class SubackReasonCode(enum.IntEnum):
    GRANTED_QOS_0                          = 0
    GRANTED_QOS_1                          = 1
    GRANTED_QOS_2                          = 2
    UNSPECIFIED_ERROR                      = 128
    IMPLEMENTATION_SPECIFIC_ERROR          = 131
    NOT_AUTHORIZED                         = 135
    TOPIC_FILTER_INVALID                   = 143
    PACKET_IDENTIFIER_IN_USE               = 145
    QUOTA_EXCEEDED                         = 151
    SHARED_SUBSCRIPTIONS_NOT_SUPPORTED     = 158
    SUBSCRIPTION_IDENTIFIERS_NOT_SUPPORTED = 161
    WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED   = 162


class UnsubackReasonCode(enum.IntEnum):
    SUCCESS                       = 0
    NO_SUBSCRIPTION_EXISTED       = 17
    UNSPECIFIED_ERROR             = 128
    IMPLEMENTATION_SPECIFIC_ERROR = 131
    NOT_AUTHORIZED                = 135
    TOPIC_FILTER_INVALID          = 143
    PACKET_IDENTIFIER_IN_USE      = 145


class PropertyIds(enum.IntEnum):
    PAYLOAD_FORMAT_INDICATOR          = 1
    MESSAGE_EXPIRY_INTERVAL           = 2
    CONTENT_TYPE                      = 3
    RESPONSE_TOPIC                    = 8
    CORRELATION_DATA                  = 9
    SUBSCRIPTION_IDENTIFIER           = 11
    SESSION_EXPIRY_INTERVAL           = 17
    ASSIGNED_CLIENT_IDENTIFIER        = 18
    SERVER_KEEP_ALIVE                 = 19
    AUTHENTICATION_METHOD             = 21
    AUTHENTICATION_DATA               = 22
    REQUEST_PROBLEM_INFORMATION       = 23
    WILL_DELAY_INTERVAL               = 24
    REQUEST_RESPONSE_INFORMATION      = 25
    RESPONSE_INFORMATION              = 26
    SERVER_REFERENCE                  = 28
    REASON_STRING                     = 31
    RECEIVE_MAXIMUM                   = 33
    TOPIC_ALIAS_MAXIMUM               = 34
    TOPIC_ALIAS                       = 35
    MAXIMUM_QOS                       = 36
    RETAIN_AVAILABLE                  = 37
    USER_PROPERTY                     = 38
    MAXIMUM_PACKET_SIZE               = 39
    WILDCARD_SUBSCRIPTION_AVAILABLE   = 40
    SUBSCRIPTION_IDENTIFIER_AVAILABLE = 41
    SHARED_SUBSCRIPTION_AVAILABLE     = 42


# MQTT 5.0
PROTOCOL_VERSION = 5

CF_FIXED_HEADER = bitstruct.compile('u4u4')

MAXIMUM_PACKET_SIZE = 268435455  # (128 ^ 4 - 1)

LOGGER = logging.getLogger(__name__)


class Error(Exception):
    pass


class MalformedPacketError(Error):
    pass


[docs]class TimeoutError(Error): pass
class PayloadReader(BytesIO): def __init__(self, data): super().__init__(data) self._length = len(data) def read(self, size): data = super().read(size) if len(data) != size: raise MalformedPacketError('Payload too short.') return data def read_all(self): return super().read() def is_data_available(self): return self.tell() < self._length def pack_string(data): data = data.encode('utf-8') packed = struct.pack('>H', len(data)) packed += data return packed def unpack_string(payload): size = unpack_u16(payload) try: return payload.read(size).decode('utf-8') except UnicodeDecodeError: raise MalformedPacketError('String not UTF-8 encoded.') def pack_u32(value): return struct.pack('>I', value) def unpack_u32(payload): return struct.unpack('>I', payload.read(4))[0] def pack_u16(value): return struct.pack('>H', value) def unpack_u16(payload): return struct.unpack('>H', payload.read(2))[0] def pack_u8(value): return struct.pack('B', value) def unpack_u8(payload): return payload.read(1)[0] def pack_property(property_id, value): return struct.pack('B', property_id) + { PropertyIds.PAYLOAD_FORMAT_INDICATOR: pack_u8, PropertyIds.MESSAGE_EXPIRY_INTERVAL: pack_u32, PropertyIds.CONTENT_TYPE: pack_string, PropertyIds.RESPONSE_TOPIC: pack_string, PropertyIds.CORRELATION_DATA: pack_binary, PropertyIds.SUBSCRIPTION_IDENTIFIER: pack_variable_integer, PropertyIds.SESSION_EXPIRY_INTERVAL: pack_u32, PropertyIds.ASSIGNED_CLIENT_IDENTIFIER: pack_string, PropertyIds.SERVER_KEEP_ALIVE: pack_u16, PropertyIds.AUTHENTICATION_METHOD: pack_string, PropertyIds.AUTHENTICATION_DATA: pack_binary, PropertyIds.REQUEST_PROBLEM_INFORMATION: pack_u8, PropertyIds.WILL_DELAY_INTERVAL: pack_u32, PropertyIds.REQUEST_RESPONSE_INFORMATION: pack_u8, PropertyIds.RESPONSE_INFORMATION: pack_string, PropertyIds.SERVER_REFERENCE: pack_string, PropertyIds.REASON_STRING: pack_string, PropertyIds.RECEIVE_MAXIMUM: pack_u16, PropertyIds.TOPIC_ALIAS_MAXIMUM: pack_u16, PropertyIds.TOPIC_ALIAS: pack_u16, PropertyIds.MAXIMUM_QOS: pack_u8, PropertyIds.RETAIN_AVAILABLE: pack_u8, PropertyIds.USER_PROPERTY: pack_string, PropertyIds.MAXIMUM_PACKET_SIZE: pack_u32, PropertyIds.WILDCARD_SUBSCRIPTION_AVAILABLE: pack_u8, PropertyIds.SUBSCRIPTION_IDENTIFIER_AVAILABLE: pack_u8, PropertyIds.SHARED_SUBSCRIPTION_AVAILABLE: pack_u8 }[property_id](value) def pack_properties(packet_name, properties): packed = b'' for property_id, value in properties.items(): packed += pack_property(property_id, value) return pack_variable_integer(len(packed)) + packed def unpack_property(property_id, payload): return { PropertyIds.PAYLOAD_FORMAT_INDICATOR: unpack_u8, PropertyIds.MESSAGE_EXPIRY_INTERVAL: unpack_u32, PropertyIds.CONTENT_TYPE: unpack_string, PropertyIds.RESPONSE_TOPIC: unpack_string, PropertyIds.CORRELATION_DATA: unpack_binary, PropertyIds.SUBSCRIPTION_IDENTIFIER: unpack_variable_integer, PropertyIds.SESSION_EXPIRY_INTERVAL: unpack_u32, PropertyIds.ASSIGNED_CLIENT_IDENTIFIER: unpack_string, PropertyIds.SERVER_KEEP_ALIVE: unpack_u16, PropertyIds.AUTHENTICATION_METHOD: unpack_string, PropertyIds.AUTHENTICATION_DATA: unpack_binary, PropertyIds.REQUEST_PROBLEM_INFORMATION: unpack_u8, PropertyIds.WILL_DELAY_INTERVAL: unpack_u32, PropertyIds.REQUEST_RESPONSE_INFORMATION: unpack_u8, PropertyIds.RESPONSE_INFORMATION: unpack_string, PropertyIds.SERVER_REFERENCE: unpack_string, PropertyIds.REASON_STRING: unpack_string, PropertyIds.RECEIVE_MAXIMUM: unpack_u16, PropertyIds.TOPIC_ALIAS_MAXIMUM: unpack_u16, PropertyIds.TOPIC_ALIAS: unpack_u16, PropertyIds.MAXIMUM_QOS: unpack_u8, PropertyIds.RETAIN_AVAILABLE: unpack_u8, PropertyIds.USER_PROPERTY: unpack_string, PropertyIds.MAXIMUM_PACKET_SIZE: unpack_u32, PropertyIds.WILDCARD_SUBSCRIPTION_AVAILABLE: unpack_u8, PropertyIds.SUBSCRIPTION_IDENTIFIER_AVAILABLE: unpack_u8, PropertyIds.SHARED_SUBSCRIPTION_AVAILABLE: unpack_u8 }[property_id](payload) def unpack_properties(packet_name, allowed_property_ids, payload): """Return a dictionary of unpacked properties. """ end_pos = unpack_variable_integer(payload) end_pos += payload.tell() properties = {} while payload.tell() < end_pos: property_id = payload.read(1)[0] if property_id not in allowed_property_ids: raise MalformedPacketError( f'Invalid property identifier {property_id}.') property_id = PropertyIds(property_id) properties[property_id] = unpack_property(property_id, payload) return properties def pack_binary(data): packed = struct.pack('>H', len(data)) packed += data return packed def unpack_binary(payload): size = unpack_u16(payload) return payload.read(size) def pack_variable_integer(value): if value == 0: packed = b'\x00' else: packed = b'' while value > 0: encoded_byte = (value & 0x7f) value >>= 7 if value > 0: encoded_byte |= 0x80 packed += struct.pack('B', encoded_byte) return packed def unpack_variable_integer(payload): value = 0 multiplier = 1 byte = 0x80 while (byte & 0x80) == 0x80: byte = unpack_u8(payload) value += ((byte & 0x7f) * multiplier) multiplier <<= 7 return value def pack_fixed_header(message_type, flags, size): packed = CF_FIXED_HEADER.pack(message_type, flags) packed += pack_variable_integer(size) return packed def unpack_fixed_header(payload): packet_type, flags = CF_FIXED_HEADER.unpack(payload) try: packet_type = ControlPacketType(packet_type) except ValueError: raise MalformedPacketError(f'Invalid packet type {packet_type}.') return packet_type, flags def pack_connect(client_id, clean_start, will_topic, will_message, will_retain, will_qos, username, password, keep_alive_s, properties): flags = 0 if clean_start: flags |= CLEAN_START payload_length = len(client_id) + 2 if will_topic: flags |= WILL_FLAG if will_retain: flags |= WILL_RETAIN if will_qos == 1: flags |= WILL_QOS_1 elif will_qos == 2: flags |= WILL_QOS_2 payload_length += 1 packed_will_topic = pack_string(will_topic) payload_length += len(packed_will_topic) payload_length += len(will_message) + 2 if password is not None: flags |= PASSWORD_FLAG payload_length += len(password) + 2 if username is not None: flags |= USER_NAME_FLAG packed_username = pack_string(username) payload_length += len(packed_username) properties = pack_properties('CONNECT', properties) packed = pack_fixed_header(ControlPacketType.CONNECT, 0, 10 + payload_length + len(properties)) packed += pack_string('MQTT') packed += struct.pack('>BBH', PROTOCOL_VERSION, flags, keep_alive_s) packed += properties packed += pack_string(client_id) if flags & WILL_FLAG: packed += pack_variable_integer(0) packed += packed_will_topic packed += pack_binary(will_message) if flags & USER_NAME_FLAG: packed += packed_username if flags & PASSWORD_FLAG: packed += pack_binary(password) return packed def unpack_connect(payload): if unpack_string(payload) != 'MQTT': raise MalformedPacketError('Invalid MQTT magic string.') if unpack_u8(payload) != PROTOCOL_VERSION: raise MalformedPacketError('Wrong protocol version.') flags = unpack_u8(payload) clean_start = bool(flags & CLEAN_START) keep_alive_s = unpack_u16(payload) properties = unpack_properties( 'CONNECT', [ PropertyIds.SESSION_EXPIRY_INTERVAL, PropertyIds.AUTHENTICATION_METHOD, PropertyIds.AUTHENTICATION_DATA, PropertyIds.REQUEST_PROBLEM_INFORMATION, PropertyIds.REQUEST_RESPONSE_INFORMATION, PropertyIds.RECEIVE_MAXIMUM, PropertyIds.TOPIC_ALIAS_MAXIMUM, PropertyIds.USER_PROPERTY, PropertyIds.MAXIMUM_PACKET_SIZE ], payload) client_id = unpack_string(payload) if flags & WILL_FLAG: will_properties = unpack_properties( 'CONNECT-WILL', [ PropertyIds.WILL_DELAY_INTERVAL, PropertyIds.PAYLOAD_FORMAT_INDICATOR, PropertyIds.MESSAGE_EXPIRY_INTERVAL, PropertyIds.CONTENT_TYPE, PropertyIds.RESPONSE_TOPIC, PropertyIds.USER_PROPERTY ], payload) will_topic = unpack_string(payload) will_message = unpack_binary(payload) will_retain = bool(flags & WILL_RETAIN) else: will_topic = None will_message = None will_retain = None if flags & USER_NAME_FLAG: username = unpack_string(payload) else: username = None if flags & PASSWORD_FLAG: password = unpack_binary(payload) else: password = None return (client_id, clean_start, will_topic, will_message, will_retain, keep_alive_s, properties, username, password) def pack_connack(session_present, reason, properties): properties = pack_properties('CONNACK', properties) packed = pack_fixed_header(ControlPacketType.CONNACK, 0, 2 + len(properties)) packed += pack_u8(int(session_present)) packed += pack_u8(reason) packed += properties return packed def unpack_connack(payload): flags = unpack_u8(payload) session_present = bool(flags & 1) try: reason = ConnectReasonCode(unpack_u8(payload)) except ValueError: raise MalformedPacketError(f'Invalid CONNACK reason {reason}.') properties = unpack_properties( 'CONNACK', [ PropertyIds.SESSION_EXPIRY_INTERVAL, PropertyIds.ASSIGNED_CLIENT_IDENTIFIER, PropertyIds.SERVER_KEEP_ALIVE, PropertyIds.AUTHENTICATION_METHOD, PropertyIds.AUTHENTICATION_DATA, PropertyIds.RESPONSE_INFORMATION, PropertyIds.SERVER_REFERENCE, PropertyIds.REASON_STRING, PropertyIds.RECEIVE_MAXIMUM, PropertyIds.TOPIC_ALIAS_MAXIMUM, PropertyIds.MAXIMUM_QOS, PropertyIds.RETAIN_AVAILABLE, PropertyIds.USER_PROPERTY, PropertyIds.MAXIMUM_PACKET_SIZE, PropertyIds.WILDCARD_SUBSCRIPTION_AVAILABLE, PropertyIds.SUBSCRIPTION_IDENTIFIER_AVAILABLE, PropertyIds.SHARED_SUBSCRIPTION_AVAILABLE ], payload) return session_present, reason, properties def pack_disconnect(reason): packed = pack_fixed_header(ControlPacketType.DISCONNECT, 0, 2) packed += struct.pack('B', reason) packed += pack_variable_integer(0) return packed def unpack_disconnect(payload): if payload.is_data_available(): reason = payload.read(1)[0] else: reason = 0 try: reason = DisconnectReasonCode(reason) except ValueError: raise MalformedPacketError(f'Invalid DISCONNECT reason {reason}.') if payload.is_data_available(): properties = unpack_properties( 'DISCONNECT', [ PropertyIds.SESSION_EXPIRY_INTERVAL, PropertyIds.SERVER_REFERENCE, PropertyIds.REASON_STRING, PropertyIds.USER_PROPERTY ], payload) else: properties = {} return reason, properties def pack_subscribe(topic, retain_handling, packet_identifier): packed_topic = pack_string(topic) packed = pack_fixed_header(ControlPacketType.SUBSCRIBE, 2, len(packed_topic) + 4) packed += struct.pack('>HB', packet_identifier, 0) packed += packed_topic packed += struct.pack('B', retain_handling << 4) return packed def unpack_subscribe(payload): packet_identifier = unpack_u16(payload) properties = unpack_properties('SUBSCRIBE', [ PropertyIds.SUBSCRIPTION_IDENTIFIER, PropertyIds.USER_PROPERTY ], payload) subscriptions = [] while payload.is_data_available(): topic = unpack_string(payload) options = unpack_u8(payload) subscriptions.append((topic, options)) return packet_identifier, properties, subscriptions def pack_suback(packet_identifier, reasons): properties = pack_properties('SUBACK', {}) packed = pack_fixed_header(ControlPacketType.SUBACK, 0, 2 + len(properties) + len(reasons)) packed += pack_u16(packet_identifier) packed += properties packed += reasons return packed def unpack_suback(payload): packet_identifier = unpack_u16(payload) properties = unpack_properties('SUBACK', [ PropertyIds.REASON_STRING, PropertyIds.USER_PROPERTY ], payload) reasons = [] while payload.is_data_available(): try: reason = SubackReasonCode(unpack_u8(payload)) except ValueError: raise MalformedPacketError(f'Invalid SUBACK reason {reason}.') reasons.append(reason) return packet_identifier, properties, reasons def pack_unsubscribe(topic, packet_identifier): packed_topic = pack_string(topic) packed = pack_fixed_header(ControlPacketType.UNSUBSCRIBE, 2, len(packed_topic) + 3) packed += struct.pack('>HB', packet_identifier, 0) packed += packed_topic return packed def unpack_unsubscribe(payload): packet_identifier = unpack_u16(payload) unpack_u8(payload) topics = [] while payload.is_data_available(): topics.append(unpack_string(payload)) return packet_identifier, topics def pack_unsuback(packet_identifier, reasons): packed = pack_fixed_header(ControlPacketType.UNSUBACK, 0, 3 + len(reasons)) packed += pack_u16(packet_identifier) packed += pack_properties('UNSUBACK', {}) packed += reasons return packed def unpack_unsuback(payload): packet_identifier = unpack_u16(payload) properties = unpack_properties('UNSUBACK', [ PropertyIds.REASON_STRING, PropertyIds.USER_PROPERTY ], payload) reasons = [] while payload.is_data_available(): try: reason = UnsubackReasonCode(unpack_u8(payload)) except ValueError: raise MalformedPacketError(f'Invalid UNSUBACK reason {reason}.') reasons.append(reason) return packet_identifier, properties, reasons def pack_publish(topic, message, retain, properties): flags = 0 if retain: flags |= 1 properties = pack_properties('PUBLISH', properties) packed_topic = pack_string(topic) size = len(packed_topic) + len(message) + len(properties) packed = pack_fixed_header(ControlPacketType.PUBLISH, flags, size) packed += packed_topic packed += properties packed += message return packed def unpack_publish(payload, qos): topic = unpack_string(payload) if qos > 0: raise MalformedPacketError('Only QoS 0 is supported.') properties = unpack_properties( 'PUBLISH', [ PropertyIds.PAYLOAD_FORMAT_INDICATOR, PropertyIds.MESSAGE_EXPIRY_INTERVAL, PropertyIds.CONTENT_TYPE, PropertyIds.RESPONSE_TOPIC, PropertyIds.CORRELATION_DATA, PropertyIds.SUBSCRIPTION_IDENTIFIER, PropertyIds.TOPIC_ALIAS, PropertyIds.USER_PROPERTY ], payload) message = payload.read_all() return topic, message, properties def pack_pingreq(): return pack_fixed_header(ControlPacketType.PINGREQ, 0, 0) def pack_pingresp(): return pack_fixed_header(ControlPacketType.PINGRESP, 0, 0) def format_properties(properties): if not properties: return [] lines = [' Properties:'] for identifier, value in properties.items(): lines.append(f' {identifier.name}({identifier.value}): {value}') return lines def format_connect(payload): (client_id, clean_start, will_topic, will_message, will_retain, keep_alive_s, properties, username, password) = unpack_connect(payload) return [ f' ClientId: {client_id}', f' CleanStart: {clean_start}', f' WillTopic: {will_topic}', f' WillMessage: {hexlify(will_message)}', f' WillRetain: {will_retain}', f' KeepAlive: {keep_alive_s}', f' UserName: {username}', f' Password: {password}' ] + format_properties(properties) def format_connack(payload): session_present, reason, properties = unpack_connack(payload) return [ f' SessionPresent: {session_present}', f' Reason: {reason.name}({reason.value})' ] + format_properties(properties) def format_publish(flags, payload): dup = bool((flags >> 3) & 0x1) qos = ((flags >> 1) & 0x3) retain = bool(flags & 0x1) topic, message, properties = unpack_publish(payload, qos) return [ f' DupFlag: {dup}', f' QoSLevel: {qos}', f' Retain: {retain}', f' Topic: {topic}', f' Message: {hexlify(message)}', ] + format_properties(properties) def format_subscribe(payload): packet_identifier, properties, subscriptions = unpack_subscribe(payload) lines = [ f' PacketIdentifier: {packet_identifier}', ' Subscriptions:' ] for topic, flags in subscriptions: lines += [ f' Topic: {topic}', f' MaximumQoS: {flags & 0x3}', f' NoLocal: {bool((flags >> 2) & 0x1)}', f' RetainAsPublished: {bool((flags >> 3) & 0x1)}', f' RetainHandling: {(flags >> 4) & 0x3}' ] return lines def format_suback(payload): packet_identifier, properties, reasons = unpack_suback(payload) return [ f' PacketIdentifier: {packet_identifier}', ] + format_properties(properties) + [ ' Reasons:' ] + [ f' {reason.name}({reason.value})' for reason in reasons ] def format_unsubscribe(payload): packet_identifier, topics = unpack_unsubscribe(payload) return [ f' PacketIdentifier: {packet_identifier}', ' Topics:' ] + [f' {topic}' for topic in topics] def format_unsuback(payload): packet_identifier, properties, reasons = unpack_unsuback(payload) return [ f' PacketIdentifier: {packet_identifier}', ] + format_properties(properties) + [ ' Reasons:' ] + [ f' {reason.name}({reason.value})' for reason in reasons ] def format_disconnect(payload): reason, properties = unpack_disconnect(payload) return [ f' Reason: {reason.name}({reason.value})', ] + format_properties(properties) def format_packet(prefix, packet): lines = [] try: packet_type, flags = unpack_fixed_header(packet) payload = PayloadReader(packet[1:]) size = unpack_variable_integer(payload) packet_kind = packet_type.name lines.append( f'{prefix} {packet_kind}({packet_type.value}) packet of {len(packet)} ' f'byte(s)') if packet_kind == 'CONNECT': lines += format_connect(payload) elif packet_kind == 'CONNACK': lines += format_connack(payload) elif packet_kind == 'PUBLISH': lines += format_publish(flags, payload) elif packet_kind == 'SUBSCRIBE': lines += format_subscribe(payload) elif packet_kind == 'SUBACK': lines += format_suback(payload) elif packet_kind == 'UNSUBSCRIBE': lines += format_unsubscribe(payload) elif packet_kind == 'UNSUBACK': lines += format_unsuback(payload) elif packet_kind == 'DISCONNECT': lines += format_disconnect(payload) except Exception as e: lines.append(f' *** Malformed packet ({e}) ***') lines.append('') lines += [f' {line}' for line in traceback.format_exc().splitlines()] lines.append('') return lines def format_connect_compact(payload): (client_id, _, will_topic, will_message, _, keep_alive_s, _, _name, _) = unpack_connect(payload) parts = [f'ClientId={client_id}'] if will_topic is not None: parts.append(f'WillTopic={will_topic}') if will_message is not None: parts.append(f'WillMessage={hexlify(will_message)}') parts.append(f'KeepAlive={keep_alive_s}') return parts def format_connack_compact(payload): _, reason, _ = unpack_connack(payload) return [f'Reason={reason.name}({reason.value})'] def format_publish_compact(flags, payload): qos = ((flags >> 1) & 0x3) topic, message, _ = unpack_publish(payload, qos) return [ f'Topic={topic}', f'Message={hexlify(message)}', ] def format_subscribe_compact(payload): _, _, subscriptions = unpack_subscribe(payload) parts = [] for topic, _ in subscriptions: parts.append(f'Topic={topic}') return parts def format_suback_compact(payload): _, _, reasons = unpack_suback(payload) parts = [] for reason in reasons: parts.append(f'Reason={reason.name}({reason.value})') return parts def format_unsubscribe_compact(payload): _, topics = unpack_unsubscribe(payload) parts = [] for topic in topics: parts.append(f'Topic={topic}') return parts def format_unsuback_compact(payload): _, _, reasons = unpack_unsuback(payload) parts = [] for reason in reasons: parts.append(f'Reason={reason.name}({reason.value})') return parts def format_disconnect_compact(payload): reason, _ = unpack_disconnect(payload) return [f'Reason={reason.name}({reason.value})'] def format_packet_compact(prefix, packet): try: packet_type, flags = unpack_fixed_header(packet) payload = PayloadReader(packet[1:]) size = unpack_variable_integer(payload) packet_kind = packet_type.name except Exception as e: return f'{prefix} *** Malformed packet ({e}) ***' try: if packet_kind == 'CONNECT': extra = format_connect_compact(payload) elif packet_kind == 'CONNACK': extra = format_connack_compact(payload) elif packet_kind == 'PUBLISH': extra = format_publish_compact(flags, payload) elif packet_kind == 'SUBSCRIBE': extra = format_subscribe_compact(payload) elif packet_kind == 'SUBACK': extra = format_suback_compact(payload) elif packet_kind == 'UNSUBSCRIBE': extra = format_unsubscribe_compact(payload) elif packet_kind == 'UNSUBACK': extra = format_unsuback_compact(payload) elif packet_kind == 'DISCONNECT': extra = format_disconnect_compact(payload) else: extra = [] extra = ', '.join(extra) if extra: extra = ': ' + extra except Exception as e: extra = f': *** Malformed packet ({e}) ***' return f'{prefix} {packet_kind}({packet_type.value}){extra}'