104 lines
3.5 KiB
JavaScript
104 lines
3.5 KiB
JavaScript
import TelegramBot from 'node-telegram-bot-api'
|
||
import { PrismaClient } from '@prisma/client'
|
||
import dotenv from 'dotenv'
|
||
import { handleIncomingMessage } from './router/index.js'
|
||
import { initRabbit, getChannel } from './utils/rmq.js';
|
||
import { getSessionState,lockSessionToBot,unlockSession } from './router/session.js'
|
||
|
||
dotenv.config()
|
||
|
||
const prisma = new PrismaClient()
|
||
const bots = {}
|
||
|
||
|
||
async function main() {
|
||
const channel = await initRabbit(process.env.RABBIT_URL || 'amqp://telegram_main:PASSWORD@rabbit.vidi.one:5672/telegram_main');
|
||
|
||
const externalBots = await prisma.externalBot.findMany({
|
||
include: { bots: { include: { commands: { include: { groupIds: true } }, groups: true } } }
|
||
})
|
||
|
||
// сначала ассерт очередей и потребитель OutMessage…
|
||
for (const ext of externalBots) {
|
||
for( const bot of ext.bots){
|
||
const inQ = `InMessage${bot.name}`;
|
||
const outQ = `OutMessage${bot.name}`;
|
||
const sessionQ = `SessionQueue${bot.name}`;
|
||
await channel.assertQueue(inQ, { durable: true });
|
||
await channel.assertQueue(outQ, { durable: true });
|
||
await channel.assertQueue(sessionQ, { durable: true });
|
||
|
||
|
||
channel.consume(outQ, msg => {
|
||
const { method,form } = JSON.parse(msg.content.toString());
|
||
|
||
// Низкоуровневый вызов HTTP-запроса к Telegram API
|
||
bots[ext.token]._request(method,form);
|
||
|
||
channel.ack(msg);
|
||
});
|
||
|
||
channel.consume(sessionQ, async reqMsg => {
|
||
try {
|
||
const { action, userId } = JSON.parse(reqMsg.content.toString());
|
||
// внешний бот, привязанный к этой очереди
|
||
const externalBotId = ext.id;
|
||
|
||
switch (action) {
|
||
case 'get': {
|
||
// узнаём, захватил ли кто-то сессию
|
||
const target = await getSessionState(userId, externalBotId);
|
||
const resp = JSON.stringify({ externalBotId: target });
|
||
channel.sendToQueue(
|
||
reqMsg.properties.replyTo,
|
||
Buffer.from(resp),
|
||
{ correlationId: reqMsg.properties.correlationId }
|
||
);
|
||
break;
|
||
}
|
||
case 'lock': {
|
||
// этот бот берёт сессию
|
||
await lockSessionToBot(userId, externalBotId, bot.id);
|
||
break;
|
||
}
|
||
case 'unlock': {
|
||
// этот бот снимает свою блокировку
|
||
await unlockSession(userId, externalBotId);
|
||
break;
|
||
}
|
||
default:
|
||
console.warn(`Unknown session action: ${action}`);
|
||
}
|
||
|
||
channel.ack(reqMsg);
|
||
} catch (err) {
|
||
console.error('Error processing sessionQ message', err);
|
||
// не возвращаем в очередь
|
||
channel.nack(reqMsg, false, false);
|
||
}
|
||
});
|
||
}
|
||
}
|
||
|
||
// теперь создаём и запускаем Telegram‑ботов
|
||
for (const ext of externalBots) {
|
||
if (!ext.token || bots[ext.token]) continue;
|
||
const bot = new TelegramBot(ext.token, { polling: true, debug: true });
|
||
bots[ext.token] = bot;
|
||
bot.processUpdate = async (update) => {
|
||
const ctx = { update, ext, bot, prisma, allBots: externalBots };
|
||
console.log('handleIncomingMessage');
|
||
await handleIncomingMessage(ctx);
|
||
}
|
||
|
||
// bot..on('message', async msg => {
|
||
//
|
||
// await handleIncomingMessage(ctx);
|
||
// });
|
||
console.log(`Started polling for ExternalBot "${ext.name}"`);
|
||
}
|
||
}
|
||
|
||
|
||
main().catch(console.error)
|