Mosquitto原理与实践

Mosquitto-test

一、运行方式

打开三个终端

1
2
3
4
5
6
7
8
#mosquitto broker:
./mosquitto -c mosquitto.conf -v

#mosquitto sub:
./mosquitto_sub -h localhost -t "hello" -i "sub1"

#mosquitto pub:
./mosquitto_pub -h localhost -t "hello" -m "hello,world" -i "pub"

二、常用参数说明

参数 描述
-h 服务器主机,默认localhost
-t 指定主题
-u 用户名
-P 密码
-i 客户端id,唯一
-m 发布的消息内容

三、Bug/Warning

  1. Warning: Mosquitto should not be run as root/administrator.

    最好不要以root的身份来运行Mosquitto代理,可以换一个用户,如Wuhlan,同时mosquitto.config里的user root也要修改为user Wuhlan

  2. Library /lib/libmosquittopp.so can not find symbol: _ZdlPvj

    编译成功,运行失败。这个报错大概的意思应该是libmosquittopp.so找不到入口,导致的原因是我在配置链接方式的时候,多链接了这个库。

四、MQTT的基本原理

MQTT 是一个轻量级协议,使用基于 TCP/IP 协议的发布/订阅消息转发模式,在物联网应用中大规模使用。以Broker代理服务器作为中转站,客户端之间通过消息订阅和推送的方式来进行通信。如下图所示:

MQTT原理

mosquitto结构体,它保存了客户端连接broker的相关参数。如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
struct mosquitto {
#ifndef WIN32
int sock; /*服务器程序与该客户端连接通信所用的socket描述符*/
# ifndef WITH_BROKER
int sockpairR, sockpairW;
# endif
#else
SOCKET sock;
# ifndef WITH_BROKER
SOCKET sockpairR, sockpairW;
# endif
#endif
enum _mosquitto_protocol protocol;
char *address;/*该客户端的IP地址*/
char *id;/*该客户端登陆mosquitto程序时所提供的ID值,该值与其他的客户端不能重复*/
char *username;/*登录用户名*/
char *password;/*密码*/
uint16_t keepalive;/*该客户端需在此时间内向mosquitto服务器程序发送一条ping/ping消息*/
uint16_t last_mid;
enum mosquitto_client_state state;
time_t last_msg_in;/*last_msg_in和last_msg_out用于记录上次收发消息的时间*/
time_t last_msg_out;
time_t ping_t;
struct _mosquitto_packet in_packet;
struct _mosquitto_packet *current_out_packet;
struct _mosquitto_packet *out_packet;
struct mosquitto_message *will;
...
bool want_write;
bool want_connect;
#if defined(WITH_THREADING) && !defined(WITH_BROKER)
pthread_mutex_t callback_mutex;
pthread_mutex_t log_callback_mutex;
pthread_mutex_t msgtime_mutex;
pthread_mutex_t out_packet_mutex;
pthread_mutex_t current_out_packet_mutex;
pthread_mutex_t state_mutex;
pthread_mutex_t in_message_mutex;
pthread_mutex_t out_message_mutex;
pthread_t thread_id;
#endif
bool clean_session;
#ifdef WITH_BROKER
bool is_dropping;
bool is_bridge;
struct _mqtt3_bridge *bridge;
struct mosquitto_client_msg *msgs;/*用于暂时存储发往该context的消息。*/
struct mosquitto_client_msg *last_msg;
int msg_count;
int msg_count12;
struct _mosquitto_acl_user *acl_list;
struct _mqtt3_listener *listener;
time_t disconnect_t;
struct _mosquitto_packet *out_packet_last;
struct _mosquitto_subhier **subs;
int sub_count;
int pollfd_index;
...
};

常用的相关函数如下:

函数名称 功能 备注
mosquitto_lib_init() 使用mosquitto库的函数前,要先进行初始化
mosquitto_lib_cleanup() 使用完mosquitto库的函数后,要进行相关的清除操作
mosquitto_new() 创建一个新的mosquitto客户端实例,使用完后一定要使用mosquitto_destroy()释放相关内存 id需要设置一下,否则可能出现意想不到的错误。设置为 true 可指示代理在断开连接时清除所有消息和订阅,设置为 false 以指示代理保留它们。
mosquitto_destroy() 用于释放与 mosquitto 客户端实例关联的内存
mosquitto_connect_callback_set() 设置连接回调。当代理发送 CONNECCT ACK 消息以响应连接时,将调用此消息 第二个参数为回调函数
mosquitto_disconnect_callback_set() 设置断开连接回调。当代理收到断开连接命令并断开客户端连接时,将调用此选项 第二个参数为回调函数
mosquitto_publish_callback_set() 设置发布回调。当使用 mosquitto_publish 启动的消息已成功发送到代理时,将调用此选项 第二个参数为回调函数
mosquitto_subscribe_callback_set() 设置订阅回调。当代理响应订阅请求时,将调用此函数 第二个参数为回调函数
mosquitto_message_callback_set() 设设置消息回调。当从代理接收到消息时,将调用此函数 第二个参数为回调函数
mosquitto_loop_forever() 此函数在无限阻塞循环中调用 loop()。对于只想在程序中运行 MQTT 客户机循环的情况,它非常有用。它处理服务器连接丢失时的重新连接。如果在回调中调用 mosquitto_disconnect(),它将返回
mosquitto_loop_start() 网络事件循环处理函数,客户端通过创建新的线程不断调用mosquitto_loop()函数来处理网络事件,该函数非阻塞
mosquitto_loop_stop() 这是线程化客户端接口的一部分。调用此命令一次以停止以前使用mosquitto_loop_start创建的网络线程。此调用将阻塞,直到网络线程完成。要结束网络线程,必须先前调用 mosquitto_disconnect 或已将 force 参数设置为 true

五、参考资料

[1] mosquitto库常用的相关函数解析

[2] 基于MQTT协议的Mosquitto的使用及libmosquitto客户端编程


Mosquitto原理与实践
https://wuhlan3.gitee.io/2023/02/10/MQTT原理与实践/
Author
Wuhlan3
Posted on
February 10, 2023
Licensed under