TgRouting/clientExample/RmqTelegramBot.js

132 lines
4.0 KiB
JavaScript
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// RmqTelegramBot.js
import TelegramBot from 'node-telegram-bot-api'
import { v4 as uuidv4 } from 'uuid'
import { getChannel } from './utils/rmq.js'
export class RmqTelegramBot {
constructor(botName, token, options = {}) {
this._bot = new TelegramBot(token, { ...options, polling: false, webHook: false, debug: true })
this._name = botName
this._channel = getChannel()
this._inQ = `InMessage${botName}`
this._outQ = `OutMessage${botName}`
this._sessionQ = `SessionQueue${botName}` // общая системная очередь для сессий
// убедимся, что очередь существует
this._channel.assertQueue(this._sessionQ, { durable: true })
// переопределяем HTTPзапрос, чтобы шлём всё в OutMessage…
this._bot._request = (type, form) => {
const payload = { method: type, form }
this._channel.sendToQueue(
this._outQ,
Buffer.from(JSON.stringify(payload)),
{ persistent: true }
)
}
// InMessage → processUpdate
this._channel.consume(this._inQ, msg => {
try {
const { update } = JSON.parse(msg.content.toString())
console.log('update',update);
this._bot.processUpdate(update)
this._channel.ack(msg)
} catch (err) {
console.error('Failed to process update from RMQ', err)
this._channel.nack(msg, false, false)
}
})
// Proxy для автоматической делегации любых методов в this._bot
return new Proxy(this, {
get(target, prop) {
if (prop in target) {
const v = target[prop]
return typeof v === 'function' ? v.bind(target) : v
}
if (prop in target._bot) {
const v = target._bot[prop]
return typeof v === 'function' ? v.bind(target._bot) : v
}
throw new Error(`No such method or property: ${String(prop)}`)
}
})
}
// ——————— Сессионные методы ———————
/**
* RPCзапрос текущего занятия сессии
* @param {string} userId
* @returns {Promise<object>} состояние из sessionсервиса
*/
async requestSessionState(userId) {
const correlationId = uuidv4()
// временная очередь-ответчик
const { queue: replyQ } = await this._channel.assertQueue('', { exclusive: true })
return new Promise((resolve, reject) => {
this._channel.consume(replyQ, msg => {
if (msg.properties.correlationId === correlationId) {
const state = JSON.parse(msg.content.toString())
resolve(state)
}
}, { noAck: true })
// запрос в общий SessionQueue
const payload = { action: 'get', userId }
this._channel.sendToQueue(
this._sessionQ,
Buffer.from(JSON.stringify(payload)),
{ correlationId, replyTo: replyQ }
)
})
}
/**
* Захватить внимание этого бота за пользователем
* @param {string} userId
*/
lockSession(userId) {
const payload = {
action: 'lock',
userId,
externalBotId: this._name
}
this._channel.sendToQueue(
this._sessionQ,
Buffer.from(JSON.stringify(payload)),
{ persistent: true }
)
}
/**
* Отпустить внимание (снять блокировку)
* @param {string} userId
*/
unlockSession(userId) {
const payload = { action: 'unlock', userId }
this._channel.sendToQueue(
this._sessionQ,
Buffer.from(JSON.stringify(payload)),
{ persistent: true }
)
}
// ——————— Событийные обёртки ———————
on(event, listener) {
this._bot.on(event, listener)
return this
}
once(event, listener) {
this._bot.once(event, listener)
return this
}
removeListener(event, listener) {
this._bot.removeListener(event, listener)
return this
}
}