Python で MQTT (Paho)

690 2020-11-08 11:23

pythonにおけるmqttのクライアントであるpaho-mqttの使い方、そしてクライアントとMQTTブローカー間の接続、publish/subscribeなどの機能をPythonで実装する方法を紹介します。

準備

今回はPython 3.6を使います。以下のコマンドでPythonのバージョンを確認できます。

➜  ~ python3 --version             
Python 3.6.7

paho (mqttクライアント) について

Paho Python Clientには、Python 2.7または3.x上でMQTT v3.1とv3.1.1をサポートするクライアントのクラスが用意されています。また、MQTTサーバーに単発でpublishするようなヘルパー関数も用意されています。

pahoのインストール

pip3 install paho-mqtt

pipは、Pythonパッケージの管理ツールです。pipを使って、Pythonパッケージの検索、ダウンロード、インストール、アンインストールを行えます。

PythonでMQTTを使う

MQTTブローカーに接続

EMQ Xが提供しているテスト用のMQTTブローカーを使用します(このブローカーは、EMQ X Cloudをベースに運用されています)。ブローカーのアクセス情報は以下の通りです。接続先は以下の通りです。

  • Host: broker.emqx.io
  • TCP Port: 1883
  • Websocket Port: 8083

pahoをimport

from paho.mqtt import client as mqtt_client

接続のパラメータ設定

ブローカーに接続するためのアドレス、ポート、トピックを設定します。今回は、Pythonの関数random.randintを使って、Client IDをランダムに生成します。

broker = 'broker.emqx.io'
port = 1883
topic = "/python/mqtt"
client_id = f'python-mqtt-{random.randint(0, 1000)}'
# username = 'emqx'
# password = 'public'

接続する関数を書く

on_connectはコールバック関数です。この関数は、クライアントが接続した後に呼び出されます。この関数にあるrc (return code) によって、クライアントが正常に接続したかどうかを判断することができます。このコールバック関数を使ってMQTTクライアントを作成し、broker.emqx.ioに接続するクライアントを作ります。

def connect_mqtt():
    def on_connect(client, userdata, flags, rc):
        if rc == 0:
            print("Connected to MQTT Broker!")
        else:
            print("Failed to connect, return code %d\n", rc)
    # Set Connecting Client ID
    client = mqtt_client.Client(client_id)
    client.username_pw_set(username, password)
    client.on_connect = on_connect
    client.connect(broker, port)
    return client

publish (送信)

上記のクライアントを引数に取るpublish関数を作ります。今回は、whileループを使って1秒ごとにトピック/python/mqttにメッセージを送信するようにします。

def publish(client):
    msg_count = 0
    while True:
        time.sleep(1)
        msg = f"messages: {msg_count}"
        result = client.publish(topic, msg)
        # result: [0, 1]
        status = result[0]
        if status == 0:
            print(f"Send `{msg}` to topic `{topic}`")
        else:
            print(f"Failed to send message to topic {topic}")
        msg_count += 1

subscribe (受信)

コールバック関数on_messageを書きます。この関数は、クライアントがブローカーからメッセージを受信したときに呼び出されます。今回は、subscribeしたトピックの名前と受信したメッセージを出力します。

def subscribe(client: mqtt_client):
    def on_message(client, userdata, msg):
        print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")

    client.subscribe(topic)
    client.on_message = on_message

スクリプトを完成させる

メッセージをpublishするスクリプト

# python 3.6

import random
import time

from paho.mqtt import client as mqtt_client


broker = 'broker.emqx.io'
port = 1883
topic = "python/mqtt"
# generate client ID with pub prefix randomly
client_id = f'python-mqtt-{random.randint(0, 1000)}'
# username = 'emqx'
# password = 'public'

def connect_mqtt():
    def on_connect(client, userdata, flags, rc):
        if rc == 0:
            print("Connected to MQTT Broker!")
        else:
            print("Failed to connect, return code %d\n", rc)

    client = mqtt_client.Client(client_id)
    client.username_pw_set(username, password)
    client.on_connect = on_connect
    client.connect(broker, port)
    return client


def publish(client):
    msg_count = 0
    while True:
        time.sleep(1)
        msg = f"messages: {msg_count}"
        result = client.publish(topic, msg)
        # result: [0, 1]
        status = result[0]
        if status == 0:
            print(f"Send `{msg}` to topic `{topic}`")
        else:
            print(f"Failed to send message to topic {topic}")
        msg_count += 1


def run():
    client = connect_mqtt()
    client.loop_start()
    publish(client)


if __name__ == '__main__':
    run()

メッセージをsubscribeするスクリプト

# python3.6

import random

from paho.mqtt import client as mqtt_client


broker = 'broker.emqx.io'
port = 1883
topic = "python/mqtt"
# generate client ID with pub prefix randomly
client_id = f'python-mqtt-{random.randint(0, 100)}'
# username = 'emqx'
# password = 'public'


def connect_mqtt() -> mqtt_client:
    def on_connect(client, userdata, flags, rc):
        if rc == 0:
            print("Connected to MQTT Broker!")
        else:
            print("Failed to connect, return code %d\n", rc)

    client = mqtt_client.Client(client_id)
    client.username_pw_set(username, password)
    client.on_connect = on_connect
    client.connect(broker, port)
    return client


def subscribe(client: mqtt_client):
    def on_message(client, userdata, msg):
        print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")

    client.subscribe(topic)
    client.on_message = on_message


def run():
    client = connect_mqtt()
    subscribe(client)
    client.loop_forever()


if __name__ == '__main__':
    run()

実行

publish

python3 pub.py

image.png

subscibe

python3 sub.py

image.png

おわりに

この記事は How to use MQTT in Python (Paho)の翻訳です。