使用mqtt协议实现iOS端心跳探活

背景

这是AI稳定性建设项目的一个子项,目的是实现对iOS端设备的生命健康检查,技术方案是使用mqtt协议建立并维持iOS端设备与服务器的心跳,服务端对心跳数据建立监测机制。
我参与的主要是iOS端代码的开发,因此该文档主要为在iOS端使用mqtt协议发送心跳的调研和实践。

技术选型

移动端监测与服务器监测最大的不同,应该在于移动端设备所处的网络环境是复杂多变的,而我们的心跳探活目的是确认端设备本身是否正常。

因此在选择端设备与服务端的通信协议时,要求对网络依赖小且在比如弱网环境下依然稳定工作,这样才能避免网络环境不稳定造成的干扰,避免一些误判。

Mqtt (Message Queue Telemetry Transport)协议是一种应用层协议,适用于资源受限例如低带宽、高延迟网络环境下的端到端通信。这种适用性主要是来自于它在包大小的精简以及对push消息场景的一些优化。

因此最终选用的mqtt协议作为心跳探活的通信协议,mqtt的对比介绍还可参见:Choosing Your Messaging Protocol: AMQP, MQTT, or STOMP

方案设计

在mqtt协议中,一个主要的应用场景就是消息订阅及推送即pub/sub。基于pub/sub机制,我们就可以实现端到端的心跳传输:即一个端负责订阅某个topic的消息,另外一个端定时推送消息。

那么在我们这种移动端-服务端的场景中,应该是谁来订阅,谁来推送呢?我们的目的是在服务端建立对移动端的健康监测,而消息推送(pub)是单向的,如果是移动端进行消息sub,服务端进行pub的话,那么移动端必须在收到消息后,再通过某个途径给服务端回一个ack。

如果是移动端进行pub,服务端进行sub,那么仅需要移动端pub->mqtt-broker->服务端sub,即可完成一次完整的心跳探活。因此,我们最终选择使用移动端pub,服务端sub这种方案。

代码实现

mqtt协议有一个官方的实现库mosquitto,使用C语言实现,可参见:GitHub:eclipse/mosquitto,这个库提供了基于mqtt协议的通信建立和维持、消息订阅及推送等功能。已经够我们的场景使用了。
而在iOS端我们主要使用Objective-C(以下简称OC)实现,因此我基于mosquitto库封装出OC层的调用方法,实现在OC层使用mqtt协议进行消息推送。
mosquitto库的api文档可参见:mosquitto-api
主要的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// UAQMonitorMqttClient.h
// 主要的变量及属性
// 声明了MQTTClient类
@interface MQTTClient : NSObject {
struct mosquitto *mosq; // 声明了一个mosquitto对象,由于底层是C写的,因此这里用的struct
}
@property (nonatomic, strong) dispatch_queue_t queue; // 使用iOS GCD维持的一个线程队列,用于维持跟服务端的连接,保证断线能重连
@property (nonatomic, copy) void (^connectionCompletionHandler)(NSUInteger code); // 连接完成后的回调
// 定义了客户端及服务端的属性
@property (nonatomic, copy) NSString *clientID;
@property (nonatomic, copy) NSString *host;
@property (nonatomic, assign) unsigned short port;
@property (nonatomic, assign) unsigned short keepAlive;
@property (nonatomic, assign) unsigned int reconnectDelay;
@property (nonatomic, assign) unsigned int reconnectDelayMax;
@property (nonatomic, assign) BOOL reconnectExponentialBackoff;
@property (nonatomic, assign) BOOL cleanSession; // 告诉服务端的broker是否在客户端断线后,清除所有消息及订阅,默认设置成了false
@property (nonatomic, assign) BOOL connected; // 连接的状态
@property (nonatomic, assign) BOOL looped; // 是否开启了线程队列 (上面说的dispatch_queue)
// 定义使用mosquitto发送消息的质量,枚举值从小到大,对网络质量的要求逐渐提升,默认我们只有0级:即AtMostOnce
typedef enum MQTTQualityOfService : NSUInteger {
AtMostOnce,
AtLeastOnce,
ExactlyOnce
} MQTTQualityOfService;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
// UAQMonitorMqttClient.m
// 初始化一个MQTTClient类
- (MQTTClient*) initWithMqttClient:(NSString *)clientId
serverHost:(NSString *) host
serverPort:(unsigned short) port
cleanSession:(BOOL )cleanSession {
if (!(self = [super init])) {
NSLog(@"[UAQMonitor] MqttClient init failed");
return nil;
}
// Init mqtt configuration.
self.clientID = clientId;
self.host = host;
self.port = port;
// Time interval for server(block) ping the client.
self.keepAlive = 60;
self.reconnectDelay = 2;
self.reconnectDelayMax = 30;
self.reconnectExponentialBackoff = NO;
self.cleanSession = cleanSession;
self.looped = NO;
const char* cstrClientId = [self.clientID cStringUsingEncoding:NSUTF8StringEncoding];
// 创建一个mosquitto对象
mosq = mosquitto_new(cstrClientId, self.cleanSession, (__bridge void *)(self));
if (mosq == NULL) {
NSLog(@"[UAQMonitor] mosquitto create failed");
return nil;
}
// Set callback function for connect | disconnect.
mosquitto_connect_callback_set(mosq, on_connect);
mosquitto_disconnect_callback_set(mosq, on_disconnect);
// Set the TLS.
mosquitto_tls_opts_set(mosq, 0, "tlsv1", NULL);
mosquitto_tls_psk_set(mosq, "a645dce1f8644bbc46342abde10494eb", "apm", NULL);
mosquitto_tls_insecure_set(mosq, true);
// Connect
[self connectServer];
// Loop
// 开启一个GCD线程队列
self.queue = dispatch_queue_create("mqtt_queue", NULL);
// 启动线程循环,目的是维持当前与服务端的连接,出现断线能自动重连
[self startLoop];
return self;
}
// 调用mosquitto lib的方法,向服务端broker发起连接
- (void) connectServer {
const char *cstrHost = [self.host cStringUsingEncoding:NSASCIIStringEncoding];
// 设置异常断线后的重试策略,按照当前默认值,会以2秒递增为等待时间不断尝试重连
mosquitto_reconnect_delay_set(mosq, self.reconnectDelay, self.reconnectDelayMax, self.reconnectExponentialBackoff);
mosquitto_connect(mosq, cstrHost, self.port, self.keepAlive);
}
// 开启线程循环,维持连接不断,或者断线后重连
- (void) startLoop {
if (!self.queue) {
NSLog(@"[UAQMonitor] mqtt start loop fail with no dispatch queue");
return;
}
dispatch_async(self.queue, ^{
DLog(@"[Heartbeat]start mosquitto loop");
self.looped = YES;
// 调用mosquitto lib方法,发起一个循环,阻塞该线程,作用就是不断的判断当前连接是否正常及异常情况下的断线重连
// 看mosquitto文档,实际上这个mosquitto_loop_forever应该是使用了IO多路复用的机制中的select()函数,第二个参数为-1,即设置在调用select的超时时间为1秒(-1表示使用默认值:1s),第三个参数无意义
int ret = mosquitto_loop_forever(mosq, -1, 1);
self.looped = NO;
DLog(@"[Heartbeat]end mosquitto loop with return value: %d", ret);
});
}
// 开始发心跳,即向服务端push消息
- (void)startHeartbeat:(NSString *)timeval payLoad:(NSString *)payLoad {
NSString* mqttTopic = [NSString stringWithFormat:@"gate/%@/heartbeat",self.clientID];
DLog(@"[Heartbeat]Start heart beat with the time interval: %f", [timeval doubleValue]);
// Send a heartbeat firstly.
// 启动的时候马上发一条,调用的方法为publishData
[self publishData:payLoad
toTopic:mqttTopic
withQos:AtMostOnce
retain:NO
completionHandler:^(int ret) {
DLog(@"[Heartbeat] send %@ to MQTT Topic: %@ with return vaule: %d", payLoad, mqttTopic, ret);
}];
// Start a timer to send heartbeat periodic.
// 调用UAQMonitor定时器来建立定时push消息的机制
[[UAQMonitorGCDTimerManager sharedInstance] scheduledDispatchTimerWithName:@"mqtt_timer"
timeInterval:[timeval doubleValue]
queue:nil
repeats:YES
actionOption:AbandonPreviousAction action:^{
[self publishData:payLoad
toTopic:mqttTopic
withQos:AtMostOnce
retain:NO
completionHandler:^(int ret) {
DLog(@"[Heartbeat] send %@ to MQTT Topic: %@ with return vaule: %d", payLoad, mqttTopic, ret);
if (!self.looped) {
DLog(@"[Heartbeat] mqtt loop ended, try to restart it");
[self startLoop];
}
}];
}];
}
// 基于mosquitto_publish封装的发送心跳的方法
// qos默认为最低等级AtMostOnce
// 最后一个参数为服务端是否要求保留该message,默认是NO
- (void)publishData:(NSString *)payload
toTopic:(NSString *)topic
withQos:(MQTTQualityOfService)qos
retain:(BOOL)retain
completionHandler:(void (^)(int mid))completionHandler {
const char* cstrTopic = [topic cStringUsingEncoding:NSUTF8StringEncoding];
int mid;
// 调用mosquitto lib方法来push消息
int ret = mosquitto_publish(mosq, &mid, cstrTopic, (int)payload.length, [payload dataUsingEncoding:NSUTF8StringEncoding].bytes, qos, retain);
if (completionHandler) {
completionHandler(ret);
}
}