From db52771c0b2e0d66bf5491fcdfb87949fa16b787 Mon Sep 17 00:00:00 2001 From: jweigele Date: Sun, 14 Aug 2022 02:02:11 -0700 Subject: [PATCH] Revamp to use aioamqp instead, which seems to work with the latest python version (3.10) --- Dockerfile | 2 +- grahbot.py | 65 +++++++++++++++++++++++++++++++----------------------- 2 files changed, 38 insertions(+), 29 deletions(-) diff --git a/Dockerfile b/Dockerfile index a5e53f6..f219eee 100644 --- a/Dockerfile +++ b/Dockerfile @@ -17,7 +17,7 @@ RUN apk add gcc RUN apk add make RUN pip3 install SSLContext RUN pip3 install pynacl -RUN pip3 install discord.py asynqp +RUN pip3 install discord.py aioamqp RUN pip3 install aioprometheus # Run the specified command within the container. diff --git a/grahbot.py b/grahbot.py index 94bd706..3faed6a 100755 --- a/grahbot.py +++ b/grahbot.py @@ -4,8 +4,6 @@ import discord import socket import random import ssl -import sslcontext -import asynqp import discord.voice_client import datetime import asyncio @@ -14,6 +12,7 @@ import traceback import os import logging import ffmpegfile +import aioamqp from aioprometheus import Counter, Gauge from aioprometheus.service import Service log = logging.getLogger(__name__) @@ -103,34 +102,41 @@ class HornClient(object): def __init__(self, config): self.rabbit_config = config - def process_msg(self, msg): - log.debug('>> {}'.format(msg.body)) + async def process_msg(self, channel, body, envelope, properties): + log.debug('>> {}'.format(body)) async def rabbit_connect(self): log.debug('Creating rabbitmq socket to {}:{}'.format(self.rabbit_config['host'], int(self.rabbit_config['port']))) - # CREATE SOCKET - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - # WRAP SOCKET - sock = sslcontext.wrap_socket(sock) - sock.connect((self.rabbit_config['host'], int(self.rabbit_config['port']))) - - connection = await asynqp.connect(virtual_host=self.rabbit_config['vhost'], username=self.rabbit_config['user'], password=self.rabbit_config['password'], sock=sock) + # aioamqp connect + try: + log.debug('Doing the await now') + transport, protocol = await aioamqp.connect( + host=self.rabbit_config['host'], + port=int(self.rabbit_config['port']), + login=self.rabbit_config['user'], + virtualhost=self.rabbit_config['vhost'], + password=self.rabbit_config['password'], + ssl=ssl._create_unverified_context(), + verify_ssl=False + ) + except aioamqp.AmqpClosedConnection: + log.error("closed connections") log.info('Connected to rabbitmq') - channel = await connection.open_channel() + self.channel = await protocol.channel() log.debug('Declaring exchange') - exchange = await channel.declare_exchange(self.rabbit_config['exchange'], 'topic', passive=True) + exchange = await self.channel.exchange_declare(self.rabbit_config['exchange'], 'topic', passive=True) - self.channel = await connection.open_channel() log.debug('Declaring queue') - self.queue = await self.channel.declare_queue('horn_listener') - await self.queue.bind(exchange, routing_key=self.rabbit_config['exchange']) + queue_name = 'horn_listener' + self.queue = await self.channel.queue_declare(queue_name, durable=True) + await self.channel.queue_bind(queue_name, self.rabbit_config['exchange'], routing_key=self.rabbit_config['exchange']) log.debug('Bound') - await self.queue.consume(self.process_msg, no_ack=True) + await self.channel.basic_consume(self.process_msg, queue_name=queue_name, no_ack=True) log.debug('Consumed a thing') - while (True): - await asyncio.sleep(1) + #while (True): +# await asyncio.sleep(1) #msg = asynqp.Message("omg here is the time {}".format(datetime.datetime.now())) #exchange.publish(msg, routing_key=self.rabbit_config['exchange']) @@ -174,19 +180,19 @@ class GrahDiscordBot(discord.Client, HornClient, HornProm): AUDIO_FILES = ['mp3', '.ogg'] CMD_MARKER = '!' CMD_PREFIX = 'command_' - def process_msg(self, msg): - log.debug('Received a message!!! {}'.format(msg)) - if msg._properties['content_type'] == 'application/json': + async def process_msg(self, channel, body, envelope, properties): + log.debug('Received a message!!! {}'.format(body)) + if properties.content_type == 'application/json': # decode the object from json - obj = json.loads(msg.body.decode(msg._properties['content_encoding'])) + obj = json.loads(body) else: - obj = msg.body + obj = body if 'output_type' in obj and obj['output_type'] == 'discord': - log.debug('Received a horn for us! {}'.format(msg.body)) + log.debug('Received a horn for us! {}'.format(body)) asyncio.ensure_future(self.horn(obj, properties=None)) else: - log.debug('Received a horn not for us: {}'.format(msg.body)) + log.debug('Received a horn not for us: {}'.format(body)) def __init__(self, config, intents=None): self.config = config @@ -194,10 +200,11 @@ class GrahDiscordBot(discord.Client, HornClient, HornProm): self.used = [] self.state = GrahState(grahbot=self) - self.loop = asyncio.get_event_loop() + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.loop) #self.horn_client = HornClient('config-piege.json') HornClient.__init__(self, config) - self.rabbit = self.loop.create_task(self.rabbit_connect()) + self.rabbit = self.loop.run_until_complete(self.rabbit_connect()) HornProm.__init__(self) self.horn_prom = self.loop.create_task(self.prom_connect()) @@ -261,7 +268,9 @@ class GrahDiscordBot(discord.Client, HornClient, HornProm): # for further manipulation if we want to do funky stuff def loop_forever(self): try: + log.debug('Starting the main discord loop') self.loop.run_until_complete(self.start(self.config['token'])) + log.debug('Discord loop complete') except KeyboardInterrupt: self.loop.run_until_complete(self.logout()) pending = asyncio.all_tasks(loop=self.loop) -- 2.30.2