paho mqtt c 源码分析-5 (Qos)

keywords

  1. MQTT Qos
  2. Qos 与 订阅者、发布者、服务器的关系
  3. 不同Qos pub success的理解
  4. paho mqtt c 源码分析

0 引言

QoS(Quality of Service),消息服务质量,通过配置不同等级的Qos,可以满足不同场景下消息的触达率、带宽消耗以及传输性能
MQTT 提供了3种Qos:

  • 0: At most once
  • 1: At least once
  • 2: Exactly once

Qos的进一步理解:

  • 0: 订阅者(Subscriber)最多收到一次,服务器代理(Broker)最多收到一次,发布者只发送一次
  • 1: 订阅者至少收到一次,服务器代理至少收到一次,发布者发送次数大于等于1次(发送至至收到服务器 PUBACK为止)
  • 2: 订阅者有且收到一次(个人理解: 订阅者端的协议栈SDK可能收到多次pubmsg,但是SDK发送给上层业务方,只会有一次),服务器代理有且收到一次完整交互数据包,发布者发送次数大于等于1次

1 Qos2的深度解析

对于Qos0、1还是比较好理解,Qos2是如何保障 有且收到一次,以及如何理解 有且收到一次
下面我们从sender(client)和receiver(broker)来理解Qos2是如何保障有且只收到一次

client                  broker
        ----pubmsg---->
        <---PUBREC-----
        ----PUBREL---->
        <---PUBCOMP----

client端消息发送成功的工作流程:pubmsg后-->收到PUBREC,发送PUBREL-->收到PUBCOMP,此时会从发送队列中删除该消息
broker端消息收到成功的工作流程是:收到pubmsg,发送PUBREC-->收到PUBREL,发送PUBCOMP,此时会回调SDK上层业务方通知收到Qos2消息

对于broker来说,收到PUBREL时,表明收到一条完整的Qos2消息,此时才会通知SDK业务方
当然,不难理解,Qos2的交互过程中,会存在由于网络问题导致失败,下面我们逐个过程分析:

  • pubmsg 消息丢失
    这个比较好理解,pubmsg丢失,broker肯定收不到消息,所以client会再次发送pubmsg
  • PUBREC丢失
    如果PUBREC丢失,client会再次发送消息,broker 协议栈SDK会再次收到pubmsg的,此时是否意味着就违反了Qos2的原则呢? 实则并不是,Qos2有且只收到一次,是针对SDK上层业务方,并不是针对SDK的,由于此时broker还没有收到PUBREL,交互流程并没有结束,所以并不会回调通知业务方收到过Qos2的消息
  • PUBREL丢失
    broker会认为PUBREC没有被收到,会重发PUBREC
  • PUBCOMP 丢失
    client会认为PUBREL没有被收到,重发PUBCOMP

从上面分析可以看到,Qos2从工作流程上保障了有且只收到一次

2 结合源码,理解不同Qos实现

主要结合paho mqtt 的c代码,分析Qos实现,第一结论如下:

  • Qos0的消息,只有在发送过程中被中断了,才会存储并尝试重发(如果发送过程中,连接断开,消息会直接丢弃)
  • Qos1和2的消息,发送后,会存储在outboundMsgs队列中,直到收到预期的数据回复包
    相关源码如下:

// 对于Qos0的消息,如果发送过程中被中断,会存储下来,并尝试重发
MQTTProtocol_startPublishCommon
if (qos == 0 && rc == TCPSOCKET_INTERRUPTED)
    MQTTProtocol_storeQoS0(pubclient, publish);

// 非Qos0的消息,pub后,会加入到outboundMsgs队列中,在MQTTProtocol_retries()中会处理outboundMsgs的重发
MQTTProtocol_startPublish()
if (qos > 0)
{
    *mm = MQTTProtocol_createMessage(publish, mm, qos, retained, 1);
    ListAppend(pubclient->outboundMsgs, *mm, (*mm)->len);
    /* we change these pointers to the saved message location just in case the packet could not be written
    entirely; the socket buffer will use these locations to finish writing the packet */
    p.payload = (*mm)->publish->payload;
    p.topic = (*mm)->publish->topic;
    p.properties = (*mm)->properties;
    p.MQTTVersion = (*mm)->MQTTVersion;
}

3 结合源码,理解不同Qos pub success 回调

对于何时调用pub成功回调,总结如下:

  • QOS1,消息发送成功
  • QOS1消息,收到PUBACK
  • QOS2消息,收到PUBCOMP

// Qos0 pub success 回调代码分析
rc = MQTTProtocol_startPublish(command->client->c, p, command->command.details.pub.qos, command->command.details.pub.retained, &msg);
if (command->command.details.pub.qos == 0)
{
  if (rc == TCPSOCKET_COMPLETE)
  {
    if (command->command.onSuccess)
    {
      ......
      (*(command->command.onSuccess))(command->command.context, &data);
    }
    else if (command->command.onSuccess5)
    {
      ......
      (*(command->command.onSuccess5))(command->command.context, &data);
    }
  }
  else
  {
    if (rc != SOCKET_ERROR)
      command->command.details.pub.destinationName = NULL; /* this will be freed by the protocol code */
    command->client->pending_write = &command->command;
  }
}

// Qos1/2 pub success 回调代码分析
else if (pack->header.bits.type == PUBACK || pack->header.bits.type == PUBCOMP ||
    pack->header.bits.type == PUBREC)
{
  ......
  // type 是PUBACK(Qos1时) 或者 type 是PUBCOMP(Qos2)时
  if (m && (msgtype != PUBREC || ackrc >= MQTTREASONCODE_UNSPECIFIED_ERROR))
  {
    ListElement* current = NULL;

    if (m->dc)
    {
      Log(TRACE_MIN, -1, "Calling deliveryComplete for client %s, msgid %d", m->c->clientID, msgid);
      (*(m->dc))(m->dcContext, msgid);
    }
    /* use the msgid to find the callback to be called */
    while (ListNextElement(m->responses, &current))
    {
      MQTTAsync_queuedCommand* command = (MQTTAsync_queuedCommand*)(current->content);
      if (command->command.token == msgid)
      {
        if (!ListDetach(m->responses, command)) /* then remove the response from the list */
          Log(LOG_ERROR, -1, "Publish command not removed from command list");
        if (command->command.onSuccess)
        {
          ......
          (*(command->command.onSuccess))(command->command.context, &data);
        }
        else if (command->command.onSuccess5 && ackrc < MQTTREASONCODE_UNSPECIFIED_ERROR)
        {
          ......
          (*(command->command.onSuccess5))(command->command.context, &data);
        }
        else if (command->command.onFailure5 && ackrc >= MQTTREASONCODE_UNSPECIFIED_ERROR)
        {
          ......
          (*(command->command.onFailure5))(command->command.context, &data);
        }
        MQTTAsync_freeCommand(command);
        break;
      }
    }
    if (mqttversion >= MQTTVERSION_5)
      MQTTProperties_free(&msgprops);
  }
}

99 参考

https://www.hivemq.com/blog/mqtt-essentials-part-6-mqtt-quality-of-service-levels/


支持原创,转载请附上原文链接


© 版权声明
THE END
如果内容对您有所帮助,就支持一下吧!
点赞0 分享
明日方舟Arknights的头像 - 鹿快
评论 抢沙发

请登录后发表评论

    暂无评论内容