Revamp to use aioamqp instead, which seems to work with the latest python version...
authorjweigele <jweigele@local>
Sun, 14 Aug 2022 09:02:11 +0000 (02:02 -0700)
committerjweigele <jweigele@local>
Sun, 14 Aug 2022 09:02:11 +0000 (02:02 -0700)
Dockerfile
grahbot.py

index a5e53f6034fe2ec0f112b0fb6e7b34b0acb03724..f219eeecbee368d073dff15b79a7c327265ff547 100644 (file)
@@ -17,7 +17,7 @@ RUN apk add gcc
 RUN apk add make
 RUN pip3 install SSLContext
 RUN pip3 install pynacl
-RUN pip3 install discord.py asynqp
+RUN pip3 install discord.py aioamqp
 RUN pip3 install aioprometheus
 
 # Run the specified command within the container.
index 94bd7060b85fdc674251a54454ef0987d9069f0b..3faed6a578b89991926f2bb74113eea6708e6ecf 100755 (executable)
@@ -4,8 +4,6 @@ import discord
 import socket
 import random
 import ssl
-import sslcontext
-import asynqp
 import discord.voice_client
 import datetime
 import asyncio
@@ -14,6 +12,7 @@ import traceback
 import os
 import logging
 import ffmpegfile
+import aioamqp
 from aioprometheus import Counter, Gauge
 from aioprometheus.service import Service
 log = logging.getLogger(__name__)
@@ -103,34 +102,41 @@ class HornClient(object):
     def __init__(self, config):
         self.rabbit_config = config
 
-    def process_msg(self, msg):
-        log.debug('>> {}'.format(msg.body))
+    async def process_msg(self, channel, body, envelope, properties):
+        log.debug('>> {}'.format(body))
 
     async def rabbit_connect(self):
         log.debug('Creating rabbitmq socket to {}:{}'.format(self.rabbit_config['host'], int(self.rabbit_config['port'])))
-        # CREATE SOCKET
-        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 
-        # WRAP SOCKET
-        sock = sslcontext.wrap_socket(sock)
-        sock.connect((self.rabbit_config['host'], int(self.rabbit_config['port'])))
-
-        connection = await asynqp.connect(virtual_host=self.rabbit_config['vhost'], username=self.rabbit_config['user'], password=self.rabbit_config['password'], sock=sock)
+        # aioamqp connect
+        try:
+            log.debug('Doing the await now')
+            transport, protocol = await aioamqp.connect(
+                host=self.rabbit_config['host'],
+                port=int(self.rabbit_config['port']),
+                login=self.rabbit_config['user'],
+                virtualhost=self.rabbit_config['vhost'],
+                password=self.rabbit_config['password'],
+                ssl=ssl._create_unverified_context(),
+                verify_ssl=False
+                )
+        except aioamqp.AmqpClosedConnection:
+            log.error("closed connections")
         log.info('Connected to rabbitmq')
-        channel = await connection.open_channel()
+        self.channel = await protocol.channel()
         log.debug('Declaring exchange')
-        exchange = await channel.declare_exchange(self.rabbit_config['exchange'], 'topic', passive=True)
+        exchange = await self.channel.exchange_declare(self.rabbit_config['exchange'], 'topic', passive=True)
 
-        self.channel = await connection.open_channel()
         log.debug('Declaring queue')
-        self.queue = await self.channel.declare_queue('horn_listener')
-        await self.queue.bind(exchange, routing_key=self.rabbit_config['exchange'])
+        queue_name = 'horn_listener'
+        self.queue = await self.channel.queue_declare(queue_name, durable=True)
+        await self.channel.queue_bind(queue_name, self.rabbit_config['exchange'], routing_key=self.rabbit_config['exchange'])
         log.debug('Bound')
-        await self.queue.consume(self.process_msg, no_ack=True)
+        await self.channel.basic_consume(self.process_msg, queue_name=queue_name, no_ack=True)
         log.debug('Consumed a thing')
 
-        while (True):
-            await asyncio.sleep(1)
+        #while (True):
+#            await asyncio.sleep(1)
             #msg = asynqp.Message("omg here is the time {}".format(datetime.datetime.now()))
             #exchange.publish(msg, routing_key=self.rabbit_config['exchange'])
 
@@ -174,19 +180,19 @@ class GrahDiscordBot(discord.Client, HornClient, HornProm):
     AUDIO_FILES = ['mp3', '.ogg']
     CMD_MARKER = '!'
     CMD_PREFIX = 'command_'
-    def process_msg(self, msg):
-        log.debug('Received a message!!! {}'.format(msg))
-        if msg._properties['content_type'] == 'application/json':
+    async def process_msg(self, channel, body, envelope, properties):
+        log.debug('Received a message!!! {}'.format(body))
+        if properties.content_type == 'application/json':
             # decode the object from json
-            obj = json.loads(msg.body.decode(msg._properties['content_encoding']))
+            obj = json.loads(body)
         else:
-            obj = msg.body
+            obj = body
 
         if 'output_type' in obj and obj['output_type'] == 'discord':
-            log.debug('Received a horn for us! {}'.format(msg.body))
+            log.debug('Received a horn for us! {}'.format(body))
             asyncio.ensure_future(self.horn(obj, properties=None))
         else:
-            log.debug('Received a horn not for us: {}'.format(msg.body))
+            log.debug('Received a horn not for us: {}'.format(body))
 
     def __init__(self, config, intents=None):
         self.config = config
@@ -194,10 +200,11 @@ class GrahDiscordBot(discord.Client, HornClient, HornProm):
         self.used = []
         self.state = GrahState(grahbot=self)
 
-        self.loop = asyncio.get_event_loop()
+        self.loop = asyncio.new_event_loop()
+        asyncio.set_event_loop(self.loop)
         #self.horn_client = HornClient('config-piege.json')
         HornClient.__init__(self, config)
-        self.rabbit = self.loop.create_task(self.rabbit_connect())
+        self.rabbit = self.loop.run_until_complete(self.rabbit_connect())
 
         HornProm.__init__(self)
         self.horn_prom = self.loop.create_task(self.prom_connect())
@@ -261,7 +268,9 @@ class GrahDiscordBot(discord.Client, HornClient, HornProm):
     # for further manipulation if we want to do funky stuff
     def loop_forever(self):
         try:
+            log.debug('Starting the main discord loop')
             self.loop.run_until_complete(self.start(self.config['token']))
+            log.debug('Discord loop complete')
         except KeyboardInterrupt:
             self.loop.run_until_complete(self.logout())
             pending = asyncio.all_tasks(loop=self.loop)