通过 mqtt 可以将设备连接在一起,能够实现将消息(可能来自服务器也可能来自其它设备)推送到设备,如果设备离线,服务器可以暂存消息,在设备上线时再推送,有一些特性很关键:
offline
允许设备暂时离线。
即使是使用固定宽带,有些用户也会因为各种原因无法保持稳定的长连接,可能是上级路由设备有限制,或者是带宽被其它应用抢占而导致长连接不稳定。将设备的在线状态与 TCP 长连接状态耦合在一起是不明智的。
bridge
设备连接在不同的 broker 上,通过 bridge 实现互通。
支持几万台设备在线,估计一台 broker 就够了,但是一旦达到数十万、百万甚至上亿,肯定需要搭建 broker 集群,参见 The C10K problem。
简单起见, node.js 服务器端使用 mosca, 客户端使用 MQTT.js ,由于 mosca 不支持 bridge,本文不涉及 bridge 特性。
客户端与服务器通信
客户端通过服务器给自已发个消息
server.js
var mosca = require('mosca'); var settings = { port: 1883 }; var server = new mosca.Server(settings); server.on('ready', function () { console.log('mosca server running'); }).on('clientConnected', function (client) { console.log('client(' + client.id + ') connected'); }).on('published', function (packet, client) { console.log('client(' + (client ? client.id : 'internal') + ') published topic(' + packet.topic + '): ' + packet.payload); }).on('subscribed', function (topic, client) { console.log('client(' + client.id + ') subscribed topic(' + topic + ')'); }).on('unsubscribed', function (topic, client) { console.log('client(' + client.id + ') unsubscribed topic(' + topic + ')'); }).on('clientDisconnecting', function (client) { console.log('client(' + client.id + ') disconnecting'); }).on('clientDisconnected', function (client) { console.log('client(' + client.id + ') disconnected'); });
client.js
var mqtt = require('mqtt'); var client = mqtt.connect('mqtt://127.0.0.1:1883'); client.on('connect', function () { client.subscribe('presence'); client.publish('presence', 'a message from myself'); }).on('message', function (topic, message) { console.log(topic + ': ' + message.toString()); client.end(); });
运行
server.js
$ node server.js mosca server running client(mqttjs_a423c0af) connected client(internal) published topic($SYS/41TXEHPDe/new/clients): mqttjs_a423c0af client(mqttjs_a423c0af) subscribed topic(presence) client(internal) published topic($SYS/41TXEHPDe/new/subscribes): {"clientId":"mqttjs_a423c0af","topic":"presence"} client(mqttjs_a423c0af) published topic(presence): a message from myself client(mqttjs_a423c0af) unsubscribed topic(presence) client(mqttjs_a423c0af) disconnected client(internal) published topic($SYS/41TXEHPDe/new/unsubscribes): {"clientId":"mqttjs_a423c0af","topic":"presence"} client(internal) published topic($SYS/41TXEHPDe/disconnect/clients): mqttjs_a423c0af
运行
client.js
$ node client.js presence: a message from myself $
客户端与客户端通信
客户端发送消息给另一个客户端
下面的例子演示了客户端通过约定的
topic
互相通信。client_sub.js
var mqtt = require('mqtt'); var client = mqtt.connect('mqtt://127.0.0.1:1883'); client.on('connect', function () { client.publish('sub', 'message from pub'); }).on('message', function (topic, message) { console.log(topic + ': ' + message.toString()); client.end(); });
client_pub.js
var mqtt = require('mqtt'); var client = mqtt.connect('mqtt://127.0.0.1:1883'); client.on('connect', function () { client.publish('sub', 'message from pub'); }).on('message', function (topic, message) { console.log(topic + ': ' + message.toString()); client.end(); });
运行
server.js
$ node server.js mosca server running client(mqttjs_ebdc9fd4) connected client(internal) published topic($SYS/4Jk9PBwDe/new/clients): mqttjs_ebdc9fd4 client(mqttjs_ebdc9fd4) subscribed topic(sub) client(internal) published topic($SYS/4Jk9PBwDe/new/subscribes): {"clientId":"mqttjs_ebdc9fd4","topic":"sub"} client(mqttjs_ff000868) connected client(internal) published topic($SYS/4Jk9PBwDe/new/clients): mqttjs_ff000868 client(mqttjs_ff000868) published topic(sub): message from pub client(mqttjs_ebdc9fd4) unsubscribed topic(sub) client(mqttjs_ebdc9fd4) disconnected client(internal) published topic($SYS/4Jk9PBwDe/new/unsubscribes): {"clientId":"mqttjs_ebdc9fd4","topic":"sub"} client(internal) published topic($SYS/4Jk9PBwDe/disconnect/clients): mqttjs_ebdc9fd4
运行
client_sub.js
$ node client_sub.js sub: message from pub $
运行
client_pub.js
$ node client_pub.js
客户端与客户端离线通信
离线通信需要同时满足以下条件
- 服务器配置持久存储
订阅方启用会话状态
连接服务器时使用同样的 clientId 并指定
clean
为false
发布方发布持久消息
发布消息时指定
qos
大于0
以及retain
为true
下面的例子演示了客户端接收离线消息
client_sub.js
var mqtt = require('mqtt'); var client = mqtt.connect('mqtt://127.0.0.1:1883', {clientId: 'sub', clean: false}); client.on('connect', function () { client.subscribe('sub'); }).on('message', function (topic, message) { console.log(topic + ': ' + message.toString()); client.end(); });
client_pub.js
var mqtt = require('mqtt'); var client = mqtt.connect('mqtt://127.0.0.1:1883', {clientId: 'pub'}); client.on('connect', function () { client.publish('sub', 'message from pub', {qos: 1, retain: true}); }).on('message', function (topic, message) { console.log(topic + ': ' + message.toString()); client.end(); });
运行 srever.js
$ node mqtt_server.js mosca server running client(sub) connected client(internal) published topic($SYS/V19OSVfix/new/clients): sub client(sub) subscribed topic(sub) client(internal) published topic($SYS/V19OSVfix/new/subscribes): {"clientId":"sub","topic":"sub"} client(sub) disconnected client(internal) published topic($SYS/V19OSVfix/disconnect/clients): sub client(pub) connected client(internal) published topic($SYS/V19OSVfix/new/clients): pub client(pub) published topic(sub): message from pub client(sub) connected client(internal) published topic($SYS/V19OSVfix/new/clients): sub client(sub) subscribed topic(sub) client(internal) published topic($SYS/V19OSVfix/new/subscribes): {"clientId":"sub","topic":"sub"} client(sub) disconnected client(internal) published topic($SYS/V19OSVfix/disconnect/clients): sub
运行 client_sub.js
订阅消息后退出
$ node client_sub.js sub: message from pub $
运行 client_pub.js
发布消息
$ node mqtt_client_pub.js
运行 client_sub.js
接收离线消息后退出
$ node client_sub.js sub: message from pub $