paho mqtt c 源码分析-1


支持原创,转载请附上原文链接


0 简介

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(Publish/Subscribe)模式的轻量级通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布,目前最新版本为v3.1.1。MQTT最大的优点在于可以以极少的代码和有限的带宽,为远程设备提供实时可靠的消息服务。做为一种低开销、低带宽占用的即时通讯协议,MQTT在物联网、小型设备、移动应用等方面有广泛的应用。
这个系列的文章不会对MQTT协议栈进行深入的分析,后续有机会另行分析。对于MQTT协议,这篇文章分析的不错,https://blog.csdn.net/anxianfeng55555/article/details/80908795

paho mqtt c 是一个开源的MQTT协议栈库,github地址为

https://github.com/eclipse/paho.mqtt.c

代码拉下来之后,编译后会生成动态库

git clone https://github.com/eclipse/paho.mqtt.c.git
cd paho.mqtt.c
make

lrwxrwxrwx 1 wangqian wangqian 19 9月 20 17:42 build/output/libpaho-mqtt3a.so -> libpaho-mqtt3a.so.1*
lrwxrwxrwx 1 wangqian wangqian 20 9月 20 17:42 build/output/libpaho-mqtt3as.so -> libpaho-mqtt3as.so.1*
lrwxrwxrwx 1 wangqian wangqian 19 9月 20 17:41 build/output/libpaho-mqtt3c.so -> libpaho-mqtt3c.so.1*
lrwxrwxrwx 1 wangqian wangqian 20 9月 20 17:41 build/output/libpaho-mqtt3cs.so -> libpaho-mqtt3cs.so.1*

如上,paho mqtt c库支持4中模式的库

  • paho-mqtt3a – asynchronous (MQTTAsync)
  • paho-mqtt3as – asynchronous with SSL (MQTTAsync)
  • paho-mqtt3c – “classic” / synchronous (MQTTClient)
  • paho-mqtt3cs – “classic” / synchronous with SSL (MQTTClient)

下面主要分析paho-mqtt3a这个库的代码。这是一个异步的库,发送和接收,是通过将对应的指令加入到工作队列中,且有一个发送和接收的线程来处理这两个队列的机制。

1 源码分析

第一从用户的使用角度来分析,用户使用MQTT的第一步,就是需要connect

1.1 MQTTAsync_connect

从源代码下手,MQTTAsync_connect的功能概括为如下几点:

  1. 检查连接参数的合法性
  2. 启动发送和接收线程
  3. 设置连接相关的各种参数
  4. 添加连接指令到工作队列中

int MQTTAsync_connect(MQTTAsync handle, const MQTTAsync_connectOptions* options)
{
    MQTTAsyncs* m = handle;
    int rc = MQTTASYNC_SUCCESS;
    MQTTAsync_queuedCommand* conn;

    FUNC_ENTRY;
    /**
      * 参数判断代码 和 连接回调变量设置代码 ,省略
      **/
        ......
    tostop = 0;
   
    /**
      * 启动接收和发送线程
      **/
    if (sendThread_state != STARTING && sendThread_state != RUNNING)
    {
        MQTTAsync_lock_mutex(mqttasync_mutex);
        sendThread_state = STARTING;
        Thread_start(MQTTAsync_sendThread, NULL);
        MQTTAsync_unlock_mutex(mqttasync_mutex);
    }
    if (receiveThread_state != STARTING && receiveThread_state != RUNNING)
    {
        MQTTAsync_lock_mutex(mqttasync_mutex);
        receiveThread_state = STARTING;
        Thread_start(MQTTAsync_receiveThread, handle);
        MQTTAsync_unlock_mutex(mqttasync_mutex);
    }
    
    /**
      * 设置心跳保活时间,这个参数,会关系到MQTT弱网断连,后面会专门分析MQTT的心跳保活机制,省略
      **/
    m->c->keepAliveInterval = options->keepAliveInterval;
    setRetryLoopInterval(options->keepAliveInterval);
    ......

    /**
     * MQTT遗愿相关参数的设置,省略
     **/
    ......

#if defined(OPENSSL)
    /**
     * OPENSSL 加密相关的参数设置,省略
     **/
      ......   
#else
    if (options->struct_version != 0 && options->ssl)
    {
        rc = MQTTASYNC_SSL_NOT_SUPPORTED;
        goto exit;
    }
#endif

    if (m->c->username)
        free((void*)m->c->username);
    if (options->username)
        m->c->username = MQTTStrdup(options->username);
    if (m->c->password)
        free((void*)m->c->password);
    if (options->password)
    {
        m->c->password = MQTTStrdup(options->password);
        m->c->passwordlen = (int)strlen(options->password);
    }
    else if (options->struct_version >= 5 && options->binarypwd.data)
    {
        m->c->passwordlen = options->binarypwd.len;
        if ((m->c->password = malloc(m->c->passwordlen)) == NULL)
        {
            rc = PAHO_MEMORY_ERROR;
            goto exit;
        }
        memcpy((void*)m->c->password, options->binarypwd.data, m->c->passwordlen);
    }
    ......
    /**
     * 到这里位置,MQTT 连接准备工作完毕,下面将向MQTT工作线程队列添加连接指令 CONNECT
     */
    /* Add connect request to operation queue */
    if ((conn = malloc(sizeof(MQTTAsync_queuedCommand))) == NULL)
    {
        rc = PAHO_MEMORY_ERROR;
        goto exit;
    }
    memset(conn,   , sizeof(MQTTAsync_queuedCommand));
    conn->client = m;
    if (options)
    {
        conn->command.onSuccess = options->onSuccess;
        conn->command.onFailure = options->onFailure;
        conn->command.onSuccess5 = options->onSuccess5;
        conn->command.onFailure5 = options->onFailure5;
        conn->command.context = options->context;
    }
    conn->command.type = CONNECT;
    conn->command.details.conn.currentURI = 0;
    rc = MQTTAsync_addCommand(conn, sizeof(conn));

exit:
    FUNC_EXIT_RC(rc);
    return rc;
}

1.2 发送线程 MQTTAsync_sendThread

发送线程,功能概括为如下几点:

  1. 修改发送线程工作状态
  2. 如果工作队列中有指令,处理工作队列中的指令
  3. 等待send_cond信号量,如果没有1秒超时退出
  4. 检查超时
    发送线程工作状态,是一个全局变量,paho库支持多个client,但是发送线程却只有一个线程,代码如下:

static thread_return_type WINAPI MQTTAsync_sendThread(void* n)
{
    FUNC_ENTRY;
    pthread_setname_np(pthread_self(), "MQTTSend");
    MQTTAsync_lock_mutex(mqttasync_mutex);
    sendThread_state = RUNNING;
    sendThread_id = Thread_getid();
    MQTTAsync_unlock_mutex(mqttasync_mutex);
    while (!tostop)
    {
        int rc;

        while (commands->count > 0)
        {   // 如果发送队列中有数据,处理队列中的指令
            if (MQTTAsync_processCommand() == 0)
                break;  /* no commands were processed, so go into a wait */
        }
               // 等待信号量,如果没有,超时1秒退出
        if ((rc = Thread_wait_cond(send_cond, 1)) != 0 && rc != ETIMEDOUT)
            Log(LOG_ERROR, -1, "Error %d waiting for condition variable", rc);
        
               // 检查超时相关的数据
        MQTTAsync_checkTimeouts();
    }
    sendThread_state = STOPPING;
    MQTTAsync_lock_mutex(mqttasync_mutex);
    sendThread_state = STOPPED;
    sendThread_id = 0;
    MQTTAsync_unlock_mutex(mqttasync_mutex);
    FUNC_EXIT;
    return 0;
}

发送线程的主要执行是在MQTTAsync_processCommand中,主要如下几点:

  • 从队列中寻找可执行的指令
  • 从工作列表中去掉寻找到的指令的link
  • 执行指令
  • 指令执行结果的处理

1.3 接收线程 MQTTAsync_receiveThread

接收线程,主要处理接收到消息和重试消息的发送,概括如下几点:

  1. 调用MQTTAsync_cycle 获取接收到的指令
  2. 判断接收指令是否有正常数据,如果没有,执行对应的处理策略
  3. 处理接收指令:
    3.1 如果收到业务消息,调用回调接口
    3.2 处理ACK相关的消息,包括CONNACK、SUBACK、UNSUBACK、DISCONNECT

static thread_return_type WINAPI MQTTAsync_receiveThread(void* n)
{
    long timeout = 10L; /* first time in we have a small timeout.  Gets things started more quickly */
    pthread_setname_np(pthread_self(), "MQTTReceive");

    FUNC_ENTRY;
    MQTTAsync_lock_mutex(mqttasync_mutex);
    receiveThread_state = RUNNING;
    receiveThread_id = Thread_getid();
    while (!tostop)
    {
        int rc = SOCKET_ERROR;
        int sock = -1;
        MQTTAsyncs* m = NULL;
        MQTTPacket* pack = NULL;

        MQTTAsync_unlock_mutex(mqttasync_mutex);
        pack = MQTTAsync_cycle(&sock, timeout, &rc);
        MQTTAsync_lock_mutex(mqttasync_mutex);
        if (tostop)
            break;
        timeout = 1000L;

        if (sock == 0)
            continue;
        /* find client corresponding to socket */
        if (ListFindItem(handles, &sock, clientSockCompare) == NULL)
        {
            Log(TRACE_MINIMUM, -1, "Could not find client corresponding to socket %d", sock);
            /* Socket_close(sock); - removing socket in this case is not necessary (Bug 442400) */
            continue;
        }
        m = (MQTTAsyncs*)(handles->current->content);
        if (m == NULL)
        {
            Log(LOG_ERROR, -1, "Client structure was NULL for socket %d - removing socket", sock);
            Socket_close(sock);
            continue;
        }
        if (rc == SOCKET_ERROR)
        {
            Log(TRACE_MINIMUM, -1, "Error from MQTTAsync_cycle() - removing socket %d", sock);
            if (m->c->connected == 1)
                MQTTAsync_disconnect_internal(m, 0);
            else if (m->c->connect_state != NOT_IN_PROGRESS)
                nextOrClose(m, rc, "socket error");
            else /* calling disconnect_internal won t have any effect if we re already disconnected */
                MQTTAsync_closeOnly(m->c, MQTTREASONCODE_SUCCESS, NULL);
        }
        else
        {
            if (m->c->messageQueue->count > 0)
            {
                qEntry* qe = (qEntry*)(m->c->messageQueue->first->content);
                int topicLen = qe->topicLen;

                if (strlen(qe->topicName) == topicLen)
                    topicLen = 0;

                if (m->ma)
                    rc = MQTTAsync_deliverMessage(m, qe->topicName, topicLen, qe->msg);
                else
                    rc = 1;

                if (rc)
                {
#if !defined(NO_PERSISTENCE)
                    if (m->c->persistence)
                        MQTTPersistence_unpersistQueueEntry(m->c, (MQTTPersistence_qEntry*)qe);
#endif
                    ListRemove(m->c->messageQueue, qe); /* qe is freed here */
                }
                else
                    Log(TRACE_MIN, -1, "False returned from messageArrived for client %s, message remains on queue",
                        m->c->clientID);
            }
            if (pack)
            {
                if (pack->header.bits.type == CONNACK)
                {
                    ......
                }
                else if (pack->header.bits.type == SUBACK)
                {
                    ......
                }
                else if (pack->header.bits.type == UNSUBACK)
                {
                    ......
                }
                else if (pack->header.bits.type == DISCONNECT)
                {
                    ......
                }
            }
        }
    }
    receiveThread_state = STOPPED;
    receiveThread_id = 0;
    MQTTAsync_unlock_mutex(mqttasync_mutex);
#if !defined(_WIN32) && !defined(_WIN64)
    if (sendThread_state != STOPPED)
        Thread_signal_cond(send_cond);
#else
    if (sendThread_state != STOPPED)
        Thread_post_sem(send_sem);
#endif
    FUNC_EXIT;
#if defined(_WIN32) || defined(_WIN64)
    ExitThread(0);
#endif
    return 0;
}

接收线程中,接收和处理数据,主要在MQTTAsync_cycle中,功能概括几点:

  1. 从socket读取数据Socket_getReadySocket
  2. 读取socket读取到数据,通过MQTTPacket_Factory获取pack数据
  3. 处理pack中消息,type类型为:PUBLISH、PUBACK、PUBCOMP、PUBREC、PUBREL、PINGRESP
© 版权声明
THE END
如果内容对您有所帮助,就支持一下吧!
点赞0 分享
鸣的头像 - 鹿快
评论 抢沙发

请登录后发表评论

    暂无评论内容