keywords
- MQTT Qos
- Qos 与 订阅者、发布者、服务器的关系
- 不同Qos pub success的理解
- paho mqtt c 源码分析
0 引言
QoS(Quality of Service),消息服务质量,通过配置不同等级的Qos,可以满足不同场景下消息的触达率、带宽消耗以及传输性能
MQTT 提供了3种Qos:
- 0: At most once
- 1: At least once
- 2: Exactly once
Qos的进一步理解:
- 0: 订阅者(Subscriber)最多收到一次,服务器代理(Broker)最多收到一次,发布者只发送一次
- 1: 订阅者至少收到一次,服务器代理至少收到一次,发布者发送次数大于等于1次(发送至至收到服务器 PUBACK为止)
- 2: 订阅者有且收到一次(个人理解: 订阅者端的协议栈SDK可能收到多次pubmsg,但是SDK发送给上层业务方,只会有一次),服务器代理有且收到一次完整交互数据包,发布者发送次数大于等于1次
1 Qos2的深度解析
对于Qos0、1还是比较好理解,Qos2是如何保障 有且收到一次,以及如何理解 有且收到一次 ?
下面我们从sender(client)和receiver(broker)来理解Qos2是如何保障有且只收到一次的
client broker
----pubmsg---->
<---PUBREC-----
----PUBREL---->
<---PUBCOMP----
client端消息发送成功的工作流程:pubmsg后-->收到PUBREC,发送PUBREL-->收到PUBCOMP,此时会从发送队列中删除该消息
broker端消息收到成功的工作流程是:收到pubmsg,发送PUBREC-->收到PUBREL,发送PUBCOMP,此时会回调SDK上层业务方通知收到Qos2消息
对于broker来说,收到PUBREL时,表明收到一条完整的Qos2消息,此时才会通知SDK业务方
当然,不难理解,Qos2的交互过程中,会存在由于网络问题导致失败,下面我们逐个过程分析:
- pubmsg 消息丢失
这个比较好理解,pubmsg丢失,broker肯定收不到消息,所以client会再次发送pubmsg -
PUBREC丢失
如果PUBREC丢失,client会再次发送消息,broker 协议栈SDK会再次收到pubmsg的,此时是否意味着就违反了Qos2的原则呢? 实则并不是,Qos2有且只收到一次,是针对SDK上层业务方,并不是针对SDK的,由于此时broker还没有收到PUBREL,交互流程并没有结束,所以并不会回调通知业务方收到过Qos2的消息 - PUBREL丢失
broker会认为PUBREC没有被收到,会重发PUBREC - PUBCOMP 丢失
client会认为PUBREL没有被收到,重发PUBCOMP
从上面分析可以看到,Qos2从工作流程上保障了有且只收到一次
2 结合源码,理解不同Qos实现
主要结合paho mqtt 的c代码,分析Qos实现,第一结论如下:
- Qos0的消息,只有在发送过程中被中断了,才会存储并尝试重发(如果发送过程中,连接断开,消息会直接丢弃)
- Qos1和2的消息,发送后,会存储在outboundMsgs队列中,直到收到预期的数据回复包
相关源码如下:
// 对于Qos0的消息,如果发送过程中被中断,会存储下来,并尝试重发
MQTTProtocol_startPublishCommon
if (qos == 0 && rc == TCPSOCKET_INTERRUPTED)
MQTTProtocol_storeQoS0(pubclient, publish);
// 非Qos0的消息,pub后,会加入到outboundMsgs队列中,在MQTTProtocol_retries()中会处理outboundMsgs的重发
MQTTProtocol_startPublish()
if (qos > 0)
{
*mm = MQTTProtocol_createMessage(publish, mm, qos, retained, 1);
ListAppend(pubclient->outboundMsgs, *mm, (*mm)->len);
/* we change these pointers to the saved message location just in case the packet could not be written
entirely; the socket buffer will use these locations to finish writing the packet */
p.payload = (*mm)->publish->payload;
p.topic = (*mm)->publish->topic;
p.properties = (*mm)->properties;
p.MQTTVersion = (*mm)->MQTTVersion;
}
3 结合源码,理解不同Qos pub success 回调
对于何时调用pub成功回调,总结如下:
- QOS1,消息发送成功
- QOS1消息,收到PUBACK
- QOS2消息,收到PUBCOMP
// Qos0 pub success 回调代码分析
rc = MQTTProtocol_startPublish(command->client->c, p, command->command.details.pub.qos, command->command.details.pub.retained, &msg);
if (command->command.details.pub.qos == 0)
{
if (rc == TCPSOCKET_COMPLETE)
{
if (command->command.onSuccess)
{
......
(*(command->command.onSuccess))(command->command.context, &data);
}
else if (command->command.onSuccess5)
{
......
(*(command->command.onSuccess5))(command->command.context, &data);
}
}
else
{
if (rc != SOCKET_ERROR)
command->command.details.pub.destinationName = NULL; /* this will be freed by the protocol code */
command->client->pending_write = &command->command;
}
}
// Qos1/2 pub success 回调代码分析
else if (pack->header.bits.type == PUBACK || pack->header.bits.type == PUBCOMP ||
pack->header.bits.type == PUBREC)
{
......
// type 是PUBACK(Qos1时) 或者 type 是PUBCOMP(Qos2)时
if (m && (msgtype != PUBREC || ackrc >= MQTTREASONCODE_UNSPECIFIED_ERROR))
{
ListElement* current = NULL;
if (m->dc)
{
Log(TRACE_MIN, -1, "Calling deliveryComplete for client %s, msgid %d", m->c->clientID, msgid);
(*(m->dc))(m->dcContext, msgid);
}
/* use the msgid to find the callback to be called */
while (ListNextElement(m->responses, ¤t))
{
MQTTAsync_queuedCommand* command = (MQTTAsync_queuedCommand*)(current->content);
if (command->command.token == msgid)
{
if (!ListDetach(m->responses, command)) /* then remove the response from the list */
Log(LOG_ERROR, -1, "Publish command not removed from command list");
if (command->command.onSuccess)
{
......
(*(command->command.onSuccess))(command->command.context, &data);
}
else if (command->command.onSuccess5 && ackrc < MQTTREASONCODE_UNSPECIFIED_ERROR)
{
......
(*(command->command.onSuccess5))(command->command.context, &data);
}
else if (command->command.onFailure5 && ackrc >= MQTTREASONCODE_UNSPECIFIED_ERROR)
{
......
(*(command->command.onFailure5))(command->command.context, &data);
}
MQTTAsync_freeCommand(command);
break;
}
}
if (mqttversion >= MQTTVERSION_5)
MQTTProperties_free(&msgprops);
}
}
99 参考
https://www.hivemq.com/blog/mqtt-essentials-part-6-mqtt-quality-of-service-levels/
支持原创,转载请附上原文链接
© 版权声明
文章版权归作者所有,未经允许请勿转载。如内容涉嫌侵权,请在本页底部进入<联系我们>进行举报投诉!
THE END






![[C++探索之旅] 第一部分第十一课:小练习,猜单词 - 鹿快](https://img.lukuai.com/blogimg/20251015/da217e2245754101b3d2ef80869e9de2.jpg)










暂无评论内容