物联网通讯协议mqtt环境搭建与python代码实现


物联网通讯协议mqtt环境搭建与python代码实现

一、mqtt协议与TCP协议区别

MQTT(Message Queuing Telemetry Transport)是一种基于发布/订阅模式的消息传输协议,用于在客户端和服务器之间传输消息。MQTT 通常用于物联网(IoT)设备之间的通信,因为它具有较低的带宽和电量要求,适用于资源有限的设备。

TCP/IP(Transmission Control Protocol/Internet Protocol)是一个基于网络通信的协议族,它提供了一种端到端的数据传输机制。TCP/IP 是计算机网络中最广泛使用的协议,它支持数据的可靠传输和错误检测,同时也具有流控制和拥塞控制等功能,可以保证数据在网络中正确、高效地传输。

MQTT 和 TCP/IP 的最大区别是应用场景不同。MQTT 是专门用于传输消息的协议,它适用于 IoT 等需要传输小量数据的场景。而 TCP/IP 则是一种通用的协议,适用于传输各种类型的数据。

此外,MQTT 和 TCP/IP 在传输方式、连接方式、消息格式等方面也有所不同。MQTT 是基于发布/订阅模式的协议,连接时需要建立一个 MQTT 客户端和一个 MQTT 服务器之间的连接,消息格式通常是 JSON 格式。TCP/IP 则是一种面向连接的协议,连接时需要建立一个客户端和服务器之间的连接,消息格式没有限制,可以是任何格式的数据。

二、mqtt协议的组成

mqtt协议由三个主要部分组成,分别为:服务器/Broker、发布者/Publisher和订阅者/Subscriber。它们之间使用话题/topic进行通信。

三、mqtt服务器的搭建

EMQ的开源服务端emqxEMQX: 大规模分布式物联网 MQTT 消息服务器,目前是使用最广泛的mqtt服务端,可以部署在本地及云端。下面以部署在本地为例。
将要Window版本的EMQXReleases · emqx/emqx (github.com)下载到本地,解压缩之后,进入bin目录,打开cmd命令行,执行指令emqx start即可运行mqtt服务端,之后cmd窗口就可以关闭,服务已经在后台运行。如果想关闭mqtt服务器,可以同样方式打开命令行运行emqx stop

运行mqtt服务器之后,可以在本地打开网页,在地址栏输入ip:18083即可进入后台管理,默认用户名为admin,密码为public

emqx服务器默认占用的TCP端口包括:

1883	MQTT 协议端口
8883	MQTT/SSL 端口
8083    MQTT/WebSocket 端口
8080    HTTP API 端口
18083   Dashboard(**商业智能仪表盘**) 管理控制台端口

四、mqtt本地客户端安装

本地客户端主要是为了进行调试使用。

emq的windows本地客户端下载地址为MQTT X:跨平台 MQTT 5.0 桌面客户端工具

安装完成后,打开mqtt X软件,进行连接,这里需要注意的是,端口号必须设置为1883,因为1883是emqx服务器默认开启的协议端口。

如果无法正常通信的话,可能是防火墙问题,可以单独将相应的端口开放。

五、python编程实现

  1. 发布者/Publisher的代码实现:
# encoding:utf-8
"""
__time__    : 2023/2/21 13:45
__author__  : LIU SIYU
__email__   :18811379768@163.com
__address__ :Beijing Institute of Technology
"""
import paho.mqtt.client as mqtt
from PySide6.QtCore import Qt, QTimer
from PySide6.QtWidgets import QApplication, QHBoxLayout, QLabel, QMainWindow, QPushButton, QVBoxLayout, QWidget
from PySide6.QtGui import QFont


class ClientWindow(QMainWindow):
    def __init__(self, parent=None):
        super().__init__(parent)
        self.setWindowTitle("MQTT Client")
        self.setGeometry(600, 100, 400, 300)

        # 连接参数
        self.broker_address = "127.0.0.1"
        self.topic = "bci/topic"
        self.message = "Hello, world!"
        self.message_count = 0

        self.interval = 100  # ms

        # 创建 MQTT 客户端
        self.client = mqtt.Client()
        self.client.on_connect = self.on_connect
        self.client.on_publish = self.on_publish

        # 创建界面
        main_widget = QWidget(self)
        self.setCentralWidget(main_widget)
        layout = QVBoxLayout(main_widget)
        self.status_label = QLabel("Disconnected")
        font = QFont("Arial", 16)  # 创建字体对象,指定字体和字号
        self.status_label.setFont(font)  # 设置标签的字体
        layout.addWidget(self.status_label)
        button_layout = QHBoxLayout()
        self.connect_button = QPushButton("Connect")
        self.connect_button.setFont(font)
        self.connect_button.clicked.connect(self.connect_to_broker)
        self.disconnect_button = QPushButton("Disconnect")
        self.disconnect_button.setFont(font)
        self.disconnect_button.clicked.connect(self.disconnect_from_broker)
        button_layout.addWidget(self.connect_button)
        button_layout.addWidget(self.disconnect_button)
        layout.addLayout(button_layout)

        # 创建定时器
        self.timer = QTimer(self)
        self.timer.timeout.connect(self.publish_message)

    def connect_to_broker(self):
        self.client.connect(self.broker_address, 1883, 60)  # 必须是1883端口,否则连接不上
        self.client.reconnect()
        self.client.loop_start()    # 开启一个独立的循环通讯子线程。loop_start()和loop_stop()必须成对使用,否则会无法清除client._thread 子进程,以后再使用loop_start()就无效了
        # self.client.loop_forever()  # 阻塞式,直到客户端socket被关闭

        self.status_label.setText("Connected")
        self.timer.start(self.interval)

    def disconnect_from_broker(self):
        self.client.disconnect()
        self.client.loop_stop()
        self.status_label.setText("Disconnected")
        self.timer.stop()

    def on_connect(self, client, userdata, flags, rc):
        if rc == 0:
            self.status_label.setText("Connected")
            self.timer.start(self.interval)
        else:
            self.status_label.setText("Connection failed")

    def on_publish(self, client, userdata, mid):
        pass

    def publish_message(self):
        print("Publishing message: {}".format(self.message) + ": "+str(self.message_count) + "次")
        self.client.publish(self.topic, self.message+": "+str(self.message_count))
        self.message_count += 1

    def closeEvent(self, event):
        self.client.disconnect()
        super().closeEvent(event)


if __name__ == "__main__":
    app = QApplication()
    window = ClientWindow()
    window.show()
    app.exec()
  1. 订阅者/Subscriber的代码实现:
# encoding:utf-8
"""
__time__    : 2023/2/21 13:45
__author__  : LIU SIYU
__email__   :18811379768@163.com
__address__ :Beijing Institute of Technology
"""
import sys
import paho.mqtt.client as mqtt
from PySide6.QtCore import Qt, QTimer
from PySide6.QtWidgets import QApplication, QHBoxLayout, QLabel, QMainWindow, QVBoxLayout, QWidget
from PySide6.QtGui import QFont


class ServerWindow(QMainWindow):
    def __init__(self, parent=None):
        super().__init__(parent)
        self.setWindowTitle("MQTT Server")
        self.setGeometry(100, 100, 400, 300)

        # 连接参数
        self.broker_address = "127.0.0.1"
        self.topic = "bci/topic"

        # 创建 MQTT 客户端
        self.client = mqtt.Client()
        self.client.on_connect = self.on_connect
        self.client.on_message = self.on_message

        # 创建界面
        main_widget = QWidget(self)
        self.setCentralWidget(main_widget)
        layout = QVBoxLayout(main_widget)

        font = QFont("Arial", 16)  # 创建字体对象,指定字体和字号
        self.message_id_label = QLabel("Message received:")
        self.message_id_label.setFont(font)  # 设置标签的字体
        layout.addWidget(self.message_id_label)
        self.message_label = QLabel("No message received yet")
        self.message_label.setFont(font)  # 设置标签的字体
        self.message_label.setGeometry(100, 100, 200, 50)
        layout.addWidget(self.message_label)

        self.client.connect(self.broker_address, 1883, 60)
        self.client.loop_start()   # 开启一个独立的循环通讯子线程。loop_start()和loop_stop()必须成对使用,否则会无法清除client._thread 子进程,以后再使用loop_start()就无效了
        # self.client.loop_forever()  # 阻塞式,直到客户端socket被关闭

    def on_connect(self, client, userdata, flags, rc):
        if rc == 0:
            self.client.subscribe(self.topic)

    def on_message(self, client, userdata, message):
        print("Message received: " + message.payload.decode())
        self.message_label.setText(message.payload.decode())


if __name__ == '__main__':
    app = QApplication(sys.argv)
    window = ServerWindow()
    window.show()
    sys.exit(app.exec())

文章作者: BITBCI
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 BITBCI !
  目录