TgRouting/deamon/external-bot-daemon.js

104 lines
3.5 KiB
JavaScript
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import TelegramBot from 'node-telegram-bot-api'
import { PrismaClient } from '@prisma/client'
import dotenv from 'dotenv'
import { handleIncomingMessage } from './router/index.js'
import { initRabbit, getChannel } from './utils/rmq.js';
import { getSessionState,lockSessionToBot,unlockSession } from './router/session.js'
dotenv.config()
const prisma = new PrismaClient()
const bots = {}
async function main() {
const channel = await initRabbit(process.env.RABBIT_URL || 'amqp://telegram_main:PASSWORD@rabbit.vidi.one:5672/telegram_main');
const externalBots = await prisma.externalBot.findMany({
include: { bots: { include: { commands: { include: { groupIds: true } }, groups: true } } }
})
// сначала ассерт очередей и потребитель OutMessage…
for (const ext of externalBots) {
for( const bot of ext.bots){
const inQ = `InMessage${bot.name}`;
const outQ = `OutMessage${bot.name}`;
const sessionQ = `SessionQueue${bot.name}`;
await channel.assertQueue(inQ, { durable: true });
await channel.assertQueue(outQ, { durable: true });
await channel.assertQueue(sessionQ, { durable: true });
channel.consume(outQ, msg => {
const { method,form } = JSON.parse(msg.content.toString());
// Низкоуровневый вызов HTTP-запроса к Telegram API
bots[ext.token]._request(method,form);
channel.ack(msg);
});
channel.consume(sessionQ, async reqMsg => {
try {
const { action, userId } = JSON.parse(reqMsg.content.toString());
// внешний бот, привязанный к этой очереди
const externalBotId = ext.id;
switch (action) {
case 'get': {
// узнаём, захватил ли кто-то сессию
const target = await getSessionState(userId, externalBotId);
const resp = JSON.stringify({ externalBotId: target });
channel.sendToQueue(
reqMsg.properties.replyTo,
Buffer.from(resp),
{ correlationId: reqMsg.properties.correlationId }
);
break;
}
case 'lock': {
// этот бот берёт сессию
await lockSessionToBot(userId, externalBotId, bot.id);
break;
}
case 'unlock': {
// этот бот снимает свою блокировку
await unlockSession(userId, externalBotId);
break;
}
default:
console.warn(`Unknown session action: ${action}`);
}
channel.ack(reqMsg);
} catch (err) {
console.error('Error processing sessionQ message', err);
// не возвращаем в очередь
channel.nack(reqMsg, false, false);
}
});
}
}
// теперь создаём и запускаем Telegramботов
for (const ext of externalBots) {
if (!ext.token || bots[ext.token]) continue;
const bot = new TelegramBot(ext.token, { polling: true, debug: true });
bots[ext.token] = bot;
bot.processUpdate = async (update) => {
const ctx = { update, ext, bot, prisma, allBots: externalBots };
console.log('handleIncomingMessage');
await handleIncomingMessage(ctx);
}
// bot..on('message', async msg => {
//
// await handleIncomingMessage(ctx);
// });
console.log(`Started polling for ExternalBot "${ext.name}"`);
}
}
main().catch(console.error)