
官方文档
https://www.eggjs.org/zh-CN/core/cluster-and-ipc
因为eggjs在生产服务器启动的时候会尽可能的多利用服务器资源,所以多核情况下就会出现有些代码被重复执行,比如存储日志等。
agent就是专门解决这个问题,因为他只会启动一个线程且相对更稳定。
在根目录创建agent.js
// agent.js
const mqtt = require('mqtt');
module.exports = agent => {
// 在这里写你的初始化逻辑
//线程准备好后再进行mqtt连接
agent.messenger.on('egg-ready', () => {
const mqttClinet = mqtt.connect(agent.config.mqtt, agent.config.mqttAuth);
mqttClinet.on('connect', () => {
mqttClinet.subscribe('#')
});
mqttClinet.on('message', (topic, message) => {
//mqtt收到信息后转发给其他线程
agent.messenger.sendRandom('mqtt', {
topic: topic,
message: message.toString()
});
})
})
//发送publish
agent.messenger.on('publish', (data) => {
if (mqttClinet) {
mqttClinet.subscribe(data.sn + 'state')
mqttClinet.publish(data.sn + 'ctr', data.cmd)
}
});
//订阅
agent.messenger.on('subscribe', (topic) => {
if (mqttClinet) {
mqttClinet.subscribe(topic)
}
});
//取消订阅
agent.messenger.on('unsubscribe', (topic) => {
if (mqttClinet) {
mqttClinet.unsubscribe(topic)
}
});
};
在根目录创建app.js
//app.js
class AppBootHook {
constructor(app) {
this.app = app;
}
configWillLoad() {
}
async didLoad() {
}
async willReady() {
}
async didReady() {
// 应用已经启动完毕
const ctx = await this.app.createAnonymousContext();
//监听来自agent的消息
this.app.messenger.on('mqtt', (data) => {
//转发到具体的服务
ctx.service.mqtt.dealMessage(data)
});
}
async serverDidReady() {
}
}
module.exports = AppBootHook;
mqtt服务示例
//接收agent发过来的mqtt信息
async dealMessage(info) {
}
//订阅主题
async subscribe(sn) {
this.app.messenger.sendToAgent('subscribe', sn)
}
//发布消息
async publish(sn, cmd) {
this.app.messenger.sendToAgent('publish', {
sn,
cmd
})
}
这样就解决了信息重复处理的问题。
遇到其他类似的长连接也可以这样处理。