本文主要介绍一下Mosquitto中用到的比较重要的数据结构,为之后的流程处理讲解奠定基础。为了清晰,结构体展示时只保留重要成员。
一、mosquitto_db
struct mosquitto_db{
/*主题树的根结点*/
struct _mosquitto_subhier subs;
/*id:context映射的Hash表首节点,通过该成员去遍历Hash表*/
struct mosquitto *contexts_by_id;
/*sock:context映射的Hash表首节点,通过该成员去遍历Hash表*/
struct mosquitto *contexts_by_sock;
/*存储消息的链表*/
struct mosquitto_msg_store *msg_store;
/*服务器的配置*/
struct mqtt3_config *config;
/*由于断开连接等需要释放的context存放的链表*/
struct mosquitto *ll_for_free;
};
mosquitto_db可谓是mosquitto最核心的结构体,没有之一。它维护着服务器配置
、主题树
、客户端句柄
、接收的消息列表
。其中为了查找方便,建立了通过客户端id来查找客户端句柄的Hash表和通过客户端socket连接描述符查找客户端句柄的Hash表;将连接断开的客户端句柄先存放在ll_for_free
链表中,待到合适时统一释放。
二、mosquitto
mosquitto 是客户端连接句柄,包含着客户端相关信息。
struct mosquitto{
/*socket连接描述符*/
mosq_sock_t sock;
/*客户端使用的协议版本号*/
enum _mosquitto_protocol protocol;
/*客户端的ip地址*/
char *address;
/*客户端的id号,即客户端标识符,服务端根据这个id来唯一识别客户端*/
char *id;
/*客户端的用户名和密码,ACL鉴权使用*/
char *username;
char *password;
/*保活时间*/
uint16_t keepalive;
/*客户端的状态*/
enum mosquitto_client_state state;
/*遗嘱*/
struct mosquitto_message *will;
/*是否清空之前的会话信息*/
bool clean_session;
/*客户端的消息链表,包括客户端发送给服务端,服务端待发送给客户端的*/
struct mosquitto_client_msg *msgs;
/*待发送给客户端的报文链表,消息队列发送时需要先转化为报文,添加到这个链表后才能发送*/
struct _mosquitto_packet *out_packet;
/*该客户端订阅的主题链表*/
struct _mosquitto_subhier **subs;
};
每一个连接的客户端在内存中的映像就是该结构体。clean_session
之前的成员参考协议可以很好理解。下面主要着重说下最后的三个成员。
msgs
是一个消息链表,存放着服务端接收到该客户端的消息,和服务端要发送给客户端的消息,它不包含消息的具体内容,具体内容存放在mosquitto_db
的msg_store
链表中。它只维护着消息的状态;out_packet
是一个待发送的报文链表,所有msgs
中方向为mosq_md_out
的消息,最终会格式化成报文添加到out_packet
中,然后发送出去;subs
是这个客户端所订阅主题存放的数组。
三、mosquitto_msg_store
这个结构体用来存放所有发布的消息用的,整个消息存储结构是一个双向链表。所有服务端接收的主题消息都会在存储在这个链表中。
struct mosquitto_msg_store{
/*前驱结点*/
struct mosquitto_msg_store *next;
/*后继结点*/
struct mosquitto_msg_store *prev;
/*用来标识这个消息是哪一个客户端发布的*/
char *source_id;
/*用来标识这个消息应该发往哪些客户端*/
char **dest_ids;
/*引用次数,当引用次数为0,该消息将被移除并释放结点*/
int ref_count;
/*消息的主题*/
char *topic;
/*消息负载*/
void *payload;
/*消息负载长度*/
uint32_t payloadlen;
/*消息接收时的id*/
uint16_t source_mid;
/*没有使用到*/
uint16_t mid;
/*消息发布时的qos*/
uint8_t qos;
/*是否是保留消息*/
bool retain;
};
内存结构示意图如下。
四、_mosquitto_subhier
_mosquitto_subhier
是主题树的基本结点结构。
主题树采用孩子兄弟表示法
,每一个结点是主题的按层次分的词,比如主题China/Zhejiang/Hangzhou
,那么将会产生三个结点China
、Zhejiang
、Hangzhou
,每一个结点上挂有订阅该主题的客户端句柄链表。
主题树在系统启动时就已经创建出来,其中根节点是mosquitto_db
的成员subs
,在初始化时创建了两个结点,分别是""
和$SYS
,分别为用户主题和系统主题。其中用户主题即为一般情况下客户端发布和订阅的主题,系统主题为系统信息。假设目前主题树中已经存在China/Zhejiang/Hangzhou
和China/Beijing
两个主题,那么整个系统树的结构将是下面这个样子的。
struct _mosquitto_subhier {
/*该结点的父节点*/
struct _mosquitto_subhier *parent;
/*该结点的孩子结点*/
struct _mosquitto_subhier *children;
/*该结点的兄弟结点*/
struct _mosquitto_subhier *next;
/*该结点的订阅者链表*/
struct _mosquitto_subleaf *subs;
/*该结点的主题分段*/
char *topic;
/*该主题结点的保留消息*/
struct mosquitto_msg_store *retained;
};
五、mosquitto_client_msg
mosquitto_client_msg
存储着客户端发来和准备发往客户端的消息。
struct mosquitto_client_msg{
/*下一个结点*/
struct mosquitto_client_msg *next;
/*消息的内容*/
struct mosquitto_msg_store *store;
/*接收到消息的时间戳*/
time_t timestamp;
/*消息的标识*/
uint16_t mid;
/*消息qos*/
uint8_t qos;
/*是否需要保留*/
bool retain;
/*消息的方向,收到或者发出*/
enum mosquitto_msg_direction direction;
/*消息的状态*/
enum mosquitto_msg_state state;
/*是否是重复消息*/
bool dup;
};
客户端的消息队列包含从客户端发来的消息和准备发往客户端的消息。消息队列是存在最大长度的,通过max_queued
进行设置,如果设置为0表示不限制队列长度,当消息达到消息队列最大值,该消息将会被丢弃。而对于队列中的某些消息而言,是需要保存交互状态的,比如qos1,消息状态会从mosq_ms_publish_qos1
转换为mosq_ms_wait_for_puback
。这些正在交互的消息数量,也是可以进行限制,通过max_inflight
进行限制。
当一个客户端发布消息给服务端,服务端将会把这个消息存放在全局消息链表中。然后遍历主题树查找到该主题结点,并通过该结点把订阅该主题的客户端都取出来,然后把这则消息插入客户端句柄中的消息队列,然后等待发送。整体的结构如下。