MQTT Tools¶
MQTT tools in Python 3.7 and later.
Both the client and the broker implements MQTT version 5.0 using
asyncio
.
Client features:
- Subscribe to and publish QoS level 0 topics.
- Broker session resume (or clean start support) for less initial communication.
- Topic aliases for smaller publish packets.
monitor
,subscribe
andpublish
command line commands.
Broker features:
- Subscribe to and publish QoS level 0 topics.
- Session resume (or clean start support) for less initial communication. Session state storage in RAM.
broker
command line command.
Limitations:
There are lots of limitations in both the client and the broker. Here are a few of them:
- QoS level 1 and 2 messages are not supported. A session state storage is required to do so, both in the client and the broker.
- Authentication is not supported.
MQTT version 5.0 specification: https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html
Project homepage: https://github.com/eerimoq/mqttools
Documentation: https://mqttools.readthedocs.io
Installation¶
pip install mqttools
Examples¶
There are plenty of examples in the examples folder.
Command line¶
Subscribe¶
Connect to given MQTT broker and subscribe to a topic. All received messages are printed to standard output.
$ mqttools subscribe /test/#
Connecting to 'localhost:1883'.
Connected.
Topic: /test
Message: 11
Topic: /test/mqttools/foo
Message: bar
Publish¶
Connect to given MQTT broker and publish a message to a topic.
$ mqttools publish /test/mqttools/foo bar
Connecting to 'localhost:1883'.
Published 1 message(s) in 0 seconds from 1 concurrent task(s).
Publish multiple messages as quickly as possible with --count
to
benchmark the client and the broker.
$ mqttools publish --count 100 /test/mqttools/foo
Connecting to 'localhost:1883'.
Published 100 message(s) in 0.39 seconds from 10 concurrent task(s).
Monitor¶
Connect to given MQTT broker and monitor given topics in a text based user interface.
$ mqttools monitor /test/#
The menu at the bottom of the monitor shows the available commands.
- Quit: Quit the monitor. Ctrl-C can be used as well.
- Play/Pause: Toggle between playing and paused (or running and freezed).
- Format: Message formatting; auto, binary or text.
Scripting¶
Subscribe¶
An example connecting to an MQTT broker, subscribing to the topic
/test/#
, and printing all published messaged.
import asyncio
import mqttools
async def subscriber():
client = mqttools.Client('localhost', 1883)
await client.start()
await client.subscribe('/test/#')
while True:
message = await client.messages.get()
if message is None:
print('Broker connection lost!')
break
print(f'Topic: {message.topic}')
print(f'Message: {message.message}')
asyncio.run(subscriber())
Publish¶
An example connecting to an MQTT broker and publishing the message
bar
to the topic /test/mqttools/foo
.
import asyncio
import mqttools
async def publisher():
async with mqttools.Client('localhost', 1883) as client:
client.publish(mqttools.Message('/test/mqttools/foo', b'bar'))
asyncio.run(publisher())
Functions and classes¶
-
class
mqttools.
Client
(host, port, client_id=None, will_topic='', will_message=b'', will_retain=False, will_qos=0, keep_alive_s=60, response_timeout=5, topic_aliases=None, topic_alias_maximum=10, session_expiry_interval=0, subscriptions=None, connect_delays=None, username=None, password=None, **kwargs)[source]¶ An MQTT version 5.0 client.
host and port are the host and port of the broker.
client_id is the client id string. If
None
, a random client id is generated on the formmqttools-<UUID[0..14]>
.will_topic, will_message and will_qos are used to ask the broker to send a will when the session ends.
keep_alive_s is the keep alive time in seconds.
response_timeout is the maximum time to wait for a response from the broker.
topic_aliases is a list of topics that should be published with aliases instead of the topic string.
topic_alias_maximum is the maximum number of topic aliases the client is willing to assign on request from the broker.
session_expiry_interval is the session expiry time in seconds. Give as 0 to remove the session when the connection ends. Give as 0xffffffff to never remove the session (given that the broker supports it).
subscriptions is a list of topics and topic-retain-handling tuples to subscribe to after connected in
start()
.connect_delays is a list of delays in seconds between the connection attempts in
start()
. Each delay is used once, except the last delay, which is used until successfully connected. If[]
, only one connection attempt is performed. IfNone
, the default delays[1, 2, 4, 8]
are used.username and password are the credentials. The password must be bytes. By default no credentials are used.
kwargs are passed to
asyncio.open_connection()
.Create a client with default configuration:
>>> client = Client('broker.hivemq.com', 1883)
Create a client with all optional arguments given:
>>> client = Client('broker.hivemq.com', 1883, client_id='my-client', will_topic='/my/last/will', will_message=b'my-last-message', will_qos=0, keep_alive_s=600, response_timeout=30', topic_aliases=['/my/topic']', topic_alias_maximum=100, session_expiry_interval=1800, subscriptions=['a/b', ('test/#', 2)], connect_delays=[1, 2], username='user', password=b'pass', ssl=True)
Use an async context manager for automatic start and stop:
>>> async with Client('broker.hivemq.com', 1883) as client: ... client.publish(Message('foo', b'bar'))
-
client_id
¶ The client identifier string.
-
messages
¶ An
asyncio.Queue
of received messages from the broker. Each message is aMessage
.>>> await client.messages.get() Message('/my/topic', b'my-message')
A
None
message is put in the queue when the broker connection is lost.>>> await client.messages.get() None
-
publish
(message)[source]¶ Publish given message
Message
with QoS 0.>>> client.publish(Message('/my/topic', b'my-message'))
-
on_message
(message)[source]¶ Called for each received MQTT message and when the broker connection is lost. Puts the message on the messages queue by default.
-
start
(resume_session=False)[source]¶ Open a TCP connection to the broker and perform the MQTT connect procedure. This method must be called before any
publish()
orsubscribe()
calls. Callstop()
to close the connection.If resume_session is
True
, the client tries to resume the last session in the broker. ASessionResumeError
exception is raised if the resume fails, and a new session has been created instead.The exceptions below are only raised if
connect_delays
is[]
.Raises
ConnectionRefusedError
if the TCP connection attempt is refused by the broker.Raises
TimeoutError
if the broker does not acknowledge the connect request.Raises
ConnectError
if the broker does not accept the connect request.Raises
SubscribeError
if the broker does not accept the subscribe request(s).>>> await client.start()
Trying to resume a session.
>>> try: ... await client.start(resume_session=True) ... print('Session resumed.') ... except SessionResumeError: ... print('Session not resumed. Subscribe to topics.')
-
stop
()[source]¶ Try to cleanly disconnect from the broker and then close the TCP connection. Call
start()
afterstop()
to reconnect to the broker.>>> await client.stop()
-
subscribe
(topic, retain_handling=0)[source]¶ Subscribe to given topic with QoS 0.
retain_handling controls the the retain handling. May be 0, 1 or 2. If 0, all retained messages matching given topic filter are received. If 1, same as 0, but only if the topic filter did not already exist. If 2, no retained messages are received.
Raises
TimeoutError
if the broker does not acknowledge the subscribe request.Raises
SubscribeError
if the broker does not accept the subscribe request.>>> await client.subscribe('/my/topic') >>> await client.messages.get() Message('/my/topic', b'my-message')
-
unsubscribe
(topic)[source]¶ Unsubscribe from given topic.
Raises
TimeoutError
if the broker does not acknowledge the unsubscribe request.Raises
UnsubscribeError
if the broker does not accept the unsubscribe request.>>> await client.unsubscribe('/my/topic')
-
-
class
mqttools.
Message
(topic, message, retain=False, response_topic=None)[source]¶ A message.
Give retain as
True
to make the message retained.Give response_topic as a topic string to publish a response topic.
-
class
mqttools.
Broker
(addresses)[source]¶ A limited MQTT version 5.0 broker.
addresses is a list of
(host, port)
and(host, port, ssl)
tuples. It may also be the host string or one of the tuples. The broker will listen for clients on all given addresses.ssl
is an SSL context passed to asyncio.start_server() as ssl.Create a broker and serve clients:
>>> broker = Broker('localhost') >>> await broker.serve_forever()
-
class
mqttools.
BrokerThread
(addresses)[source]¶ The same as
Broker
, but running in a thread.Create a broker and serve clients for 60 seconds:
>>> broker = BrokerThread('broker.hivemq.com') >>> broker.start() >>> time.sleep(60) >>> broker.stop()
-
start
()¶ Start the broker in a thread. This function returns immediately.
-