node.js下进行mqtt实践

通过 mqtt 可以将设备连接在一起,能够实现将消息(可能来自服务器也可能来自其它设备)推送到设备,如果设备离线,服务器可以暂存消息,在设备上线时再推送,有一些特性很关键:

  • offline

    允许设备暂时离线。

    即使是使用固定宽带,有些用户也会因为各种原因无法保持稳定的长连接,可能是上级路由设备有限制,或者是带宽被其它应用抢占而导致长连接不稳定。将设备的在线状态与 TCP 长连接状态耦合在一起是不明智的。

  • bridge

    设备连接在不同的 broker 上,通过 bridge 实现互通。

    支持几万台设备在线,估计一台 broker 就够了,但是一旦达到数十万、百万甚至上亿,肯定需要搭建 broker 集群,参见 The C10K problem

简单起见, node.js 服务器端使用 mosca, 客户端使用 MQTT.js ,由于 mosca 不支持 bridge,本文不涉及 bridge 特性。

客户端与服务器通信

  • 客户端通过服务器给自已发个消息

    server.js

    var mosca = require('mosca');
    
    var settings = {
        port: 1883
    };
    
    var server = new mosca.Server(settings);
    server.on('ready', function () {
        console.log('mosca server running');
    }).on('clientConnected', function (client) {
        console.log('client(' + client.id + ') connected');
    }).on('published', function (packet, client) {
        console.log('client(' + (client ? client.id : 'internal') + ') published topic(' + packet.topic + '): ' + packet.payload);
    }).on('subscribed', function (topic, client) {
        console.log('client(' + client.id + ') subscribed topic(' + topic + ')');
    }).on('unsubscribed', function (topic, client) {
        console.log('client(' + client.id + ') unsubscribed topic(' + topic + ')');
    }).on('clientDisconnecting', function (client) {
        console.log('client(' + client.id + ') disconnecting');
    }).on('clientDisconnected', function (client) {
        console.log('client(' + client.id + ') disconnected');
    });
    

    client.js

    var mqtt = require('mqtt');
    
    
    var client = mqtt.connect('mqtt://127.0.0.1:1883');
    client.on('connect', function () {
        client.subscribe('presence');
        client.publish('presence', 'a message from myself');
    }).on('message', function (topic, message) {
        console.log(topic + ': ' + message.toString());
        client.end();
    });
    

    运行 server.js

    $ node server.js
    mosca server running
    client(mqttjs_a423c0af) connected
    client(internal) published topic($SYS/41TXEHPDe/new/clients): mqttjs_a423c0af
    client(mqttjs_a423c0af) subscribed topic(presence)
    client(internal) published topic($SYS/41TXEHPDe/new/subscribes): {"clientId":"mqttjs_a423c0af","topic":"presence"}
    client(mqttjs_a423c0af) published topic(presence): a message from myself
    client(mqttjs_a423c0af) unsubscribed topic(presence)
    client(mqttjs_a423c0af) disconnected
    client(internal) published topic($SYS/41TXEHPDe/new/unsubscribes): {"clientId":"mqttjs_a423c0af","topic":"presence"}
    client(internal) published topic($SYS/41TXEHPDe/disconnect/clients): mqttjs_a423c0af
    

    运行 client.js

    $ node client.js
    presence: a message from myself
    $ 
    

客户端与客户端通信

  • 客户端发送消息给另一个客户端

    下面的例子演示了客户端通过约定的 topic 互相通信。

    client_sub.js

    var mqtt = require('mqtt');
    
    var client = mqtt.connect('mqtt://127.0.0.1:1883');
    client.on('connect', function () {
        client.publish('sub', 'message from pub');
    }).on('message', function (topic, message) {
        console.log(topic + ': ' + message.toString());
        client.end();
    });
    

    client_pub.js

    var mqtt = require('mqtt');
    
    var client = mqtt.connect('mqtt://127.0.0.1:1883');
    client.on('connect', function () {
        client.publish('sub', 'message from pub');
    }).on('message', function (topic, message) {
        console.log(topic + ': ' + message.toString());
        client.end();
    });
    

    运行 server.js

    $ node server.js
    mosca server running
    client(mqttjs_ebdc9fd4) connected
    client(internal) published topic($SYS/4Jk9PBwDe/new/clients): mqttjs_ebdc9fd4
    client(mqttjs_ebdc9fd4) subscribed topic(sub)
    client(internal) published topic($SYS/4Jk9PBwDe/new/subscribes): {"clientId":"mqttjs_ebdc9fd4","topic":"sub"}
    client(mqttjs_ff000868) connected
    client(internal) published topic($SYS/4Jk9PBwDe/new/clients): mqttjs_ff000868
    client(mqttjs_ff000868) published topic(sub): message from pub
    client(mqttjs_ebdc9fd4) unsubscribed topic(sub)
    client(mqttjs_ebdc9fd4) disconnected
    client(internal) published topic($SYS/4Jk9PBwDe/new/unsubscribes): {"clientId":"mqttjs_ebdc9fd4","topic":"sub"}
    client(internal) published topic($SYS/4Jk9PBwDe/disconnect/clients): mqttjs_ebdc9fd4
    

    运行 client_sub.js

    $ node client_sub.js
    sub: message from pub
    $ 
    

    运行 client_pub.js

    $ node client_pub.js
    

客户端与客户端离线通信

离线通信需要同时满足以下条件

  • 服务器配置持久存储
  • 订阅方启用会话状态

    连接服务器时使用同样的 clientId 并指定 cleanfalse

  • 发布方发布持久消息

    发布消息时指定 qos 大于 0 以及 retaintrue

下面的例子演示了客户端接收离线消息

client_sub.js

var mqtt = require('mqtt');

var client = mqtt.connect('mqtt://127.0.0.1:1883', {clientId: 'sub', clean: false});
client.on('connect', function () {
    client.subscribe('sub');
}).on('message', function (topic, message) {
    console.log(topic + ': ' + message.toString());
    client.end();
});

client_pub.js

var mqtt = require('mqtt');

var client = mqtt.connect('mqtt://127.0.0.1:1883', {clientId: 'pub'});
client.on('connect', function () {
    client.publish('sub', 'message from pub', {qos: 1, retain: true});
}).on('message', function (topic, message) {
    console.log(topic + ': ' + message.toString());
    client.end();
});

运行 srever.js

$ node mqtt_server.js
mosca server running
client(sub) connected
client(internal) published topic($SYS/V19OSVfix/new/clients): sub
client(sub) subscribed topic(sub)
client(internal) published topic($SYS/V19OSVfix/new/subscribes): {"clientId":"sub","topic":"sub"}
client(sub) disconnected
client(internal) published topic($SYS/V19OSVfix/disconnect/clients): sub
client(pub) connected
client(internal) published topic($SYS/V19OSVfix/new/clients): pub
client(pub) published topic(sub): message from pub
client(sub) connected
client(internal) published topic($SYS/V19OSVfix/new/clients): sub
client(sub) subscribed topic(sub)
client(internal) published topic($SYS/V19OSVfix/new/subscribes): {"clientId":"sub","topic":"sub"}
client(sub) disconnected
client(internal) published topic($SYS/V19OSVfix/disconnect/clients): sub

运行 client_sub.js 订阅消息后退出

$ node client_sub.js 
sub: message from pub
$ 

运行 client_pub.js 发布消息

$ node mqtt_client_pub.js 

运行 client_sub.js 接收离线消息后退出

$ node client_sub.js 
sub: message from pub
$