132 lines
4.0 KiB
JavaScript
132 lines
4.0 KiB
JavaScript
// RmqTelegramBot.js
|
||
import TelegramBot from 'node-telegram-bot-api'
|
||
import { v4 as uuidv4 } from 'uuid'
|
||
import { getChannel } from './utils/rmq.js'
|
||
|
||
export class RmqTelegramBot {
|
||
constructor(botName, token, options = {}) {
|
||
this._bot = new TelegramBot(token, { ...options, polling: false, webHook: false, debug: true })
|
||
this._name = botName
|
||
this._channel = getChannel()
|
||
this._inQ = `InMessage${botName}`
|
||
this._outQ = `OutMessage${botName}`
|
||
this._sessionQ = `SessionQueue${botName}` // общая системная очередь для сессий
|
||
|
||
// убедимся, что очередь существует
|
||
this._channel.assertQueue(this._sessionQ, { durable: true })
|
||
|
||
// переопределяем HTTP‑запрос, чтобы шлём всё в OutMessage…
|
||
this._bot._request = (type, form) => {
|
||
const payload = { method: type, form }
|
||
this._channel.sendToQueue(
|
||
this._outQ,
|
||
Buffer.from(JSON.stringify(payload)),
|
||
{ persistent: true }
|
||
)
|
||
}
|
||
|
||
// InMessage → processUpdate
|
||
this._channel.consume(this._inQ, msg => {
|
||
try {
|
||
const { update } = JSON.parse(msg.content.toString())
|
||
console.log('update',update);
|
||
this._bot.processUpdate(update)
|
||
this._channel.ack(msg)
|
||
} catch (err) {
|
||
console.error('Failed to process update from RMQ', err)
|
||
this._channel.nack(msg, false, false)
|
||
}
|
||
})
|
||
|
||
// Proxy для автоматической делегации любых методов в this._bot
|
||
return new Proxy(this, {
|
||
get(target, prop) {
|
||
if (prop in target) {
|
||
const v = target[prop]
|
||
return typeof v === 'function' ? v.bind(target) : v
|
||
}
|
||
if (prop in target._bot) {
|
||
const v = target._bot[prop]
|
||
return typeof v === 'function' ? v.bind(target._bot) : v
|
||
}
|
||
throw new Error(`No such method or property: ${String(prop)}`)
|
||
}
|
||
})
|
||
}
|
||
|
||
// ——————— Сессионные методы ———————
|
||
|
||
/**
|
||
* RPC‑запрос текущего занятия сессии
|
||
* @param {string} userId
|
||
* @returns {Promise<object>} состояние из session‑сервиса
|
||
*/
|
||
async requestSessionState(userId) {
|
||
const correlationId = uuidv4()
|
||
// временная очередь-ответчик
|
||
const { queue: replyQ } = await this._channel.assertQueue('', { exclusive: true })
|
||
|
||
return new Promise((resolve, reject) => {
|
||
this._channel.consume(replyQ, msg => {
|
||
if (msg.properties.correlationId === correlationId) {
|
||
const state = JSON.parse(msg.content.toString())
|
||
resolve(state)
|
||
}
|
||
}, { noAck: true })
|
||
|
||
// запрос в общий SessionQueue
|
||
const payload = { action: 'get', userId }
|
||
this._channel.sendToQueue(
|
||
this._sessionQ,
|
||
Buffer.from(JSON.stringify(payload)),
|
||
{ correlationId, replyTo: replyQ }
|
||
)
|
||
})
|
||
}
|
||
|
||
/**
|
||
* Захватить внимание этого бота за пользователем
|
||
* @param {string} userId
|
||
*/
|
||
lockSession(userId) {
|
||
const payload = {
|
||
action: 'lock',
|
||
userId,
|
||
externalBotId: this._name
|
||
}
|
||
this._channel.sendToQueue(
|
||
this._sessionQ,
|
||
Buffer.from(JSON.stringify(payload)),
|
||
{ persistent: true }
|
||
)
|
||
}
|
||
|
||
/**
|
||
* Отпустить внимание (снять блокировку)
|
||
* @param {string} userId
|
||
*/
|
||
unlockSession(userId) {
|
||
const payload = { action: 'unlock', userId }
|
||
this._channel.sendToQueue(
|
||
this._sessionQ,
|
||
Buffer.from(JSON.stringify(payload)),
|
||
{ persistent: true }
|
||
)
|
||
}
|
||
|
||
// ——————— Событийные обёртки ———————
|
||
|
||
on(event, listener) {
|
||
this._bot.on(event, listener)
|
||
return this
|
||
}
|
||
once(event, listener) {
|
||
this._bot.once(event, listener)
|
||
return this
|
||
}
|
||
removeListener(event, listener) {
|
||
this._bot.removeListener(event, listener)
|
||
return this
|
||
}
|
||
}
|