支持原创,转载请附上原文链接
0 简介
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(Publish/Subscribe)模式的轻量级通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布,目前最新版本为v3.1.1。MQTT最大的优点在于可以以极少的代码和有限的带宽,为远程设备提供实时可靠的消息服务。做为一种低开销、低带宽占用的即时通讯协议,MQTT在物联网、小型设备、移动应用等方面有广泛的应用。
这个系列的文章不会对MQTT协议栈进行深入的分析,后续有机会另行分析。对于MQTT协议,这篇文章分析的不错,https://blog.csdn.net/anxianfeng55555/article/details/80908795
paho mqtt c 是一个开源的MQTT协议栈库,github地址为
https://github.com/eclipse/paho.mqtt.c
代码拉下来之后,编译后会生成动态库
git clone https://github.com/eclipse/paho.mqtt.c.git
cd paho.mqtt.c
make
lrwxrwxrwx 1 wangqian wangqian 19 9月 20 17:42 build/output/libpaho-mqtt3a.so -> libpaho-mqtt3a.so.1*
lrwxrwxrwx 1 wangqian wangqian 20 9月 20 17:42 build/output/libpaho-mqtt3as.so -> libpaho-mqtt3as.so.1*
lrwxrwxrwx 1 wangqian wangqian 19 9月 20 17:41 build/output/libpaho-mqtt3c.so -> libpaho-mqtt3c.so.1*
lrwxrwxrwx 1 wangqian wangqian 20 9月 20 17:41 build/output/libpaho-mqtt3cs.so -> libpaho-mqtt3cs.so.1*
如上,paho mqtt c库支持4中模式的库
- paho-mqtt3a – asynchronous (MQTTAsync)
- paho-mqtt3as – asynchronous with SSL (MQTTAsync)
- paho-mqtt3c – “classic” / synchronous (MQTTClient)
- paho-mqtt3cs – “classic” / synchronous with SSL (MQTTClient)
下面主要分析paho-mqtt3a这个库的代码。这是一个异步的库,发送和接收,是通过将对应的指令加入到工作队列中,且有一个发送和接收的线程来处理这两个队列的机制。
1 源码分析
第一从用户的使用角度来分析,用户使用MQTT的第一步,就是需要connect
1.1 MQTTAsync_connect
从源代码下手,MQTTAsync_connect的功能概括为如下几点:
- 检查连接参数的合法性
- 启动发送和接收线程
- 设置连接相关的各种参数
- 添加连接指令到工作队列中
int MQTTAsync_connect(MQTTAsync handle, const MQTTAsync_connectOptions* options)
{
MQTTAsyncs* m = handle;
int rc = MQTTASYNC_SUCCESS;
MQTTAsync_queuedCommand* conn;
FUNC_ENTRY;
/**
* 参数判断代码 和 连接回调变量设置代码 ,省略
**/
......
tostop = 0;
/**
* 启动接收和发送线程
**/
if (sendThread_state != STARTING && sendThread_state != RUNNING)
{
MQTTAsync_lock_mutex(mqttasync_mutex);
sendThread_state = STARTING;
Thread_start(MQTTAsync_sendThread, NULL);
MQTTAsync_unlock_mutex(mqttasync_mutex);
}
if (receiveThread_state != STARTING && receiveThread_state != RUNNING)
{
MQTTAsync_lock_mutex(mqttasync_mutex);
receiveThread_state = STARTING;
Thread_start(MQTTAsync_receiveThread, handle);
MQTTAsync_unlock_mutex(mqttasync_mutex);
}
/**
* 设置心跳保活时间,这个参数,会关系到MQTT弱网断连,后面会专门分析MQTT的心跳保活机制,省略
**/
m->c->keepAliveInterval = options->keepAliveInterval;
setRetryLoopInterval(options->keepAliveInterval);
......
/**
* MQTT遗愿相关参数的设置,省略
**/
......
#if defined(OPENSSL)
/**
* OPENSSL 加密相关的参数设置,省略
**/
......
#else
if (options->struct_version != 0 && options->ssl)
{
rc = MQTTASYNC_SSL_NOT_SUPPORTED;
goto exit;
}
#endif
if (m->c->username)
free((void*)m->c->username);
if (options->username)
m->c->username = MQTTStrdup(options->username);
if (m->c->password)
free((void*)m->c->password);
if (options->password)
{
m->c->password = MQTTStrdup(options->password);
m->c->passwordlen = (int)strlen(options->password);
}
else if (options->struct_version >= 5 && options->binarypwd.data)
{
m->c->passwordlen = options->binarypwd.len;
if ((m->c->password = malloc(m->c->passwordlen)) == NULL)
{
rc = PAHO_MEMORY_ERROR;
goto exit;
}
memcpy((void*)m->c->password, options->binarypwd.data, m->c->passwordlen);
}
......
/**
* 到这里位置,MQTT 连接准备工作完毕,下面将向MQTT工作线程队列添加连接指令 CONNECT
*/
/* Add connect request to operation queue */
if ((conn = malloc(sizeof(MQTTAsync_queuedCommand))) == NULL)
{
rc = PAHO_MEMORY_ERROR;
goto exit;
}
memset(conn, , sizeof(MQTTAsync_queuedCommand));
conn->client = m;
if (options)
{
conn->command.onSuccess = options->onSuccess;
conn->command.onFailure = options->onFailure;
conn->command.onSuccess5 = options->onSuccess5;
conn->command.onFailure5 = options->onFailure5;
conn->command.context = options->context;
}
conn->command.type = CONNECT;
conn->command.details.conn.currentURI = 0;
rc = MQTTAsync_addCommand(conn, sizeof(conn));
exit:
FUNC_EXIT_RC(rc);
return rc;
}
1.2 发送线程 MQTTAsync_sendThread
发送线程,功能概括为如下几点:
- 修改发送线程工作状态
- 如果工作队列中有指令,处理工作队列中的指令
- 等待send_cond信号量,如果没有1秒超时退出
- 检查超时
发送线程工作状态,是一个全局变量,paho库支持多个client,但是发送线程却只有一个线程,代码如下:
static thread_return_type WINAPI MQTTAsync_sendThread(void* n)
{
FUNC_ENTRY;
pthread_setname_np(pthread_self(), "MQTTSend");
MQTTAsync_lock_mutex(mqttasync_mutex);
sendThread_state = RUNNING;
sendThread_id = Thread_getid();
MQTTAsync_unlock_mutex(mqttasync_mutex);
while (!tostop)
{
int rc;
while (commands->count > 0)
{ // 如果发送队列中有数据,处理队列中的指令
if (MQTTAsync_processCommand() == 0)
break; /* no commands were processed, so go into a wait */
}
// 等待信号量,如果没有,超时1秒退出
if ((rc = Thread_wait_cond(send_cond, 1)) != 0 && rc != ETIMEDOUT)
Log(LOG_ERROR, -1, "Error %d waiting for condition variable", rc);
// 检查超时相关的数据
MQTTAsync_checkTimeouts();
}
sendThread_state = STOPPING;
MQTTAsync_lock_mutex(mqttasync_mutex);
sendThread_state = STOPPED;
sendThread_id = 0;
MQTTAsync_unlock_mutex(mqttasync_mutex);
FUNC_EXIT;
return 0;
}
发送线程的主要执行是在MQTTAsync_processCommand中,主要如下几点:
- 从队列中寻找可执行的指令
- 从工作列表中去掉寻找到的指令的link
- 执行指令
- 指令执行结果的处理
1.3 接收线程 MQTTAsync_receiveThread
接收线程,主要处理接收到消息和重试消息的发送,概括如下几点:
- 调用MQTTAsync_cycle 获取接收到的指令
- 判断接收指令是否有正常数据,如果没有,执行对应的处理策略
- 处理接收指令:
3.1 如果收到业务消息,调用回调接口
3.2 处理ACK相关的消息,包括CONNACK、SUBACK、UNSUBACK、DISCONNECT
static thread_return_type WINAPI MQTTAsync_receiveThread(void* n)
{
long timeout = 10L; /* first time in we have a small timeout. Gets things started more quickly */
pthread_setname_np(pthread_self(), "MQTTReceive");
FUNC_ENTRY;
MQTTAsync_lock_mutex(mqttasync_mutex);
receiveThread_state = RUNNING;
receiveThread_id = Thread_getid();
while (!tostop)
{
int rc = SOCKET_ERROR;
int sock = -1;
MQTTAsyncs* m = NULL;
MQTTPacket* pack = NULL;
MQTTAsync_unlock_mutex(mqttasync_mutex);
pack = MQTTAsync_cycle(&sock, timeout, &rc);
MQTTAsync_lock_mutex(mqttasync_mutex);
if (tostop)
break;
timeout = 1000L;
if (sock == 0)
continue;
/* find client corresponding to socket */
if (ListFindItem(handles, &sock, clientSockCompare) == NULL)
{
Log(TRACE_MINIMUM, -1, "Could not find client corresponding to socket %d", sock);
/* Socket_close(sock); - removing socket in this case is not necessary (Bug 442400) */
continue;
}
m = (MQTTAsyncs*)(handles->current->content);
if (m == NULL)
{
Log(LOG_ERROR, -1, "Client structure was NULL for socket %d - removing socket", sock);
Socket_close(sock);
continue;
}
if (rc == SOCKET_ERROR)
{
Log(TRACE_MINIMUM, -1, "Error from MQTTAsync_cycle() - removing socket %d", sock);
if (m->c->connected == 1)
MQTTAsync_disconnect_internal(m, 0);
else if (m->c->connect_state != NOT_IN_PROGRESS)
nextOrClose(m, rc, "socket error");
else /* calling disconnect_internal won t have any effect if we re already disconnected */
MQTTAsync_closeOnly(m->c, MQTTREASONCODE_SUCCESS, NULL);
}
else
{
if (m->c->messageQueue->count > 0)
{
qEntry* qe = (qEntry*)(m->c->messageQueue->first->content);
int topicLen = qe->topicLen;
if (strlen(qe->topicName) == topicLen)
topicLen = 0;
if (m->ma)
rc = MQTTAsync_deliverMessage(m, qe->topicName, topicLen, qe->msg);
else
rc = 1;
if (rc)
{
#if !defined(NO_PERSISTENCE)
if (m->c->persistence)
MQTTPersistence_unpersistQueueEntry(m->c, (MQTTPersistence_qEntry*)qe);
#endif
ListRemove(m->c->messageQueue, qe); /* qe is freed here */
}
else
Log(TRACE_MIN, -1, "False returned from messageArrived for client %s, message remains on queue",
m->c->clientID);
}
if (pack)
{
if (pack->header.bits.type == CONNACK)
{
......
}
else if (pack->header.bits.type == SUBACK)
{
......
}
else if (pack->header.bits.type == UNSUBACK)
{
......
}
else if (pack->header.bits.type == DISCONNECT)
{
......
}
}
}
}
receiveThread_state = STOPPED;
receiveThread_id = 0;
MQTTAsync_unlock_mutex(mqttasync_mutex);
#if !defined(_WIN32) && !defined(_WIN64)
if (sendThread_state != STOPPED)
Thread_signal_cond(send_cond);
#else
if (sendThread_state != STOPPED)
Thread_post_sem(send_sem);
#endif
FUNC_EXIT;
#if defined(_WIN32) || defined(_WIN64)
ExitThread(0);
#endif
return 0;
}
接收线程中,接收和处理数据,主要在MQTTAsync_cycle中,功能概括几点:
- 从socket读取数据Socket_getReadySocket
- 读取socket读取到数据,通过MQTTPacket_Factory获取pack数据
- 处理pack中消息,type类型为:PUBLISH、PUBACK、PUBCOMP、PUBREC、PUBREL、PINGRESP
















暂无评论内容