From: jweigele Date: Thu, 15 Sep 2022 18:09:57 +0000 (-0700) Subject: Fix some of the async stuff, but ffmpegfile is a work in progress X-Git-Url: http://git.hexthepla.net/?a=commitdiff_plain;h=bdd48151d8778919eb4ba8fb81daa9283a894d9c;p=grahbot Fix some of the async stuff, but ffmpegfile is a work in progress --- diff --git a/Dockerfile b/Dockerfile index f219eee..c901502 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,24 +1,29 @@ # Use the official image as a parent image. -FROM alpine +#FROM alpine:3.13 +FROM debian:bullseye-slim # Set the working directory. WORKDIR /usr/src/app # Run the command inside your image filesystem. -RUN apk update -RUN apk add musl-dev -RUN apk add python3-dev -RUN apk add py3-numpy -RUN apk add ffmpeg -RUN apk add py3-wheel -RUN apk add py3-pip -RUN apk add py3-cffi -RUN apk add gcc -RUN apk add make +RUN apt-get update +#RUN apk musl-dev +RUN apt-get install -y python3-dev +RUN apt-get install -y python3-numpy +#RUN apt-get install -y ffmpeg +RUN apt-get install -y python3-wheel +RUN apt-get install -y python3-pip +RUN apt-get install -y python3-cffi +RUN apt-get install -y gcc +RUN apt-get install -y make +RUN apt-get install -y libopus0 +RUN pip3 install --upgrade pip +RUN pip3 install orjson RUN pip3 install SSLContext RUN pip3 install pynacl RUN pip3 install discord.py aioamqp RUN pip3 install aioprometheus +RUN pip3 install pedalboard # Run the specified command within the container. CMD [ "/usr/src/app/grahbot.py" ] diff --git a/ffmpegfile.py b/ffmpegfile.py index 6437503..54c0d01 100644 --- a/ffmpegfile.py +++ b/ffmpegfile.py @@ -10,6 +10,7 @@ import logging import threading import numpy import io +from pedalboard.io import AudioFile log = logging.getLogger(__name__) log.setLevel(logging.DEBUG) @@ -19,6 +20,9 @@ log.addHandler(ch) MAX_LENGTH = int(2*2*0.020*48000) +SAMPLE_RATE = 48000 +# per chunk per channel +MAX_SAMPLES = int(SAMPLE_RATE * 0.02) class AudioChunk(object): # 16 bits channel, 2 channels, 20 ms, 48khz @@ -33,6 +37,7 @@ class AudioChunk(object): @property def sample_buffer(self): #log.debug(self.samples) + log.debug(len(self.samples)) #log.debug(list(self.samples)) return struct.pack('<'+'h'*self.samples.size, *list(self.samples)) @@ -131,30 +136,30 @@ class AudioBuffer(discord.AudioSource): self.sample_lock.release() async def process_file(self, filename): - rand_str = str(hex(random.randint(0, 10000))) + #rand_str = str(hex(random.randint(0, 10000))) #fifo_path = '/tmp/fifo{}'.format(rand_str) locked = False try: #os.mkfifo(fifo_path) - args = ['ffmpeg', '-i', '{}'.format(filename)] - args += ['-f', 's16le', '-acodec', 'pcm_s16le', '-ac', '2', '-ar', '48000', '-'] - p = await asyncio.create_subprocess_exec(*args, stdin=None, stderr=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE) + #args = ['ffmpeg', '-i', '{}'.format(filename)] + #args += ['-f', 's16le', '-acodec', 'pcm_s16le', '-ac', '2', '-ar', '48000', '-'] + #p = await asyncio.create_subprocess_exec(*args, stdin=None, stderr=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE) #return_fifo = open(fifo_path, 'rb') #return_bytes = await asyncio.to_thread(return_fifo.read) #return_bytes = return_fifo.read() - stdout, stderr = await p.communicate() + #stdout, stderr = await p.communicate() #log.debug(stdout) #log.debug(stderr) - return_bytes = stdout - await p.wait() + #return_bytes = stdout + #await p.wait() #return_bytes = p.communicate()[0] - byte_length = len(return_bytes) - sample_index = 0 #len(self.samples) + #byte_length = len(return_bytes) + #sample_index = 0 #len(self.samples) min_deck_size = self.DECK_SIZE byte_start = 0 - return_byte_length = len(return_bytes) + #return_byte_length = len(return_bytes) chunks_to_append = [] - byte_reader = io.BytesIO(return_bytes) + #byte_reader = io.BytesIO(return_bytes) # byte processing and chunk generation #while byte_length > 0: #log.debug(byte_length) @@ -166,16 +171,27 @@ class AudioBuffer(discord.AudioSource): # log.debug('byte start {} sample bytes {}'.format(byte_start, sample_bytes)) # read in the next chunk, store for now - next_bytes = byte_reader.read(MAX_LENGTH) - while next_bytes: - chunk = AudioChunk(init_bytes=next_bytes) - #log.debug(chunk, chunk.samples) - chunks_to_append.append(chunk) - next_bytes = byte_reader.read(MAX_LENGTH) - - # chomp - #byte_start += sample_bytes - #byte_length = return_byte_length - byte_start + #next_bytes = byte_reader.read(MAX_LENGTH) + with AudioFile(filename).resampled_to(target_sample_rate=SAMPLE_RATE) as f: + #audio = f.read(f.frames) + next_samples = f.read(MAX_SAMPLES) + channels = len(next_samples.shape) + #log.debug(next_samples) + while numpy.size(next_samples) > 0: + # switch to mono if needed (channel shouldn't change from initial) + if channels == 3: + next_samples = numpy.delete(next_samples, 1, 0) + # for the downcast to int16 + next_samples *= 32768.0 + next_bytes = next_samples.astype('int16') + chunk = AudioChunk(init_samples=next_bytes) + #log.debug(chunk, chunk.samples) + chunks_to_append.append(chunk) + next_samples = f.read(MAX_SAMPLES) + + # chomp + #byte_start += sample_bytes + #byte_length = return_byte_length - byte_start # chunk append/insertion (along with mixing) # this is the part that actually needs locked access @@ -194,7 +210,7 @@ class AudioBuffer(discord.AudioSource): #log.debug('mixing chunk {}'.format(chunk)) self.samples[sample_index] = self.samples[sample_index].mix_stream(chunk) sample_index +=1 - #log.debug('finished chunk append final sample_index {} sample_size {}'.format(sample_index, self.sample_size)) + log.debug('finished chunk append final sample_index {} sample_size {}'.format(sample_index, self.sample_size)) diff --git a/grahbot.py b/grahbot.py index 709bf88..4daf242 100755 --- a/grahbot.py +++ b/grahbot.py @@ -3,6 +3,7 @@ import discord import socket import random +import time import ssl import sys import discord.voice_client @@ -23,6 +24,7 @@ ch.setFormatter(logging.Formatter('|%(levelname)s|%(asctime)s|%(module)s|%(linen log.addHandler(ch) from collections import defaultdict, OrderedDict +#discord.opus.load_opus('opus') PERSISTENT_DIR = '/var/lib/grahbot' @@ -84,7 +86,8 @@ class GrahState(object): return None def get_user_guild(self, user): - log.debug('Get user guild enter for {}'.format(user.id)) + if user: + log.debug('Get user guild enter for {}'.format(user.id)) if str(user.id) in self.data['user']: log.debug('Found user, returning..') retval = discord.utils.get(self.grahbot.guilds, id=self.data['user'][str(user.id)]) @@ -121,7 +124,8 @@ class HornClient(object): ssl=ssl._create_unverified_context(), verify_ssl=False ) - except aioamqp.AmqpClosedConnection: + except aioamqp.AmqpClosedConnection as e: + traceback.print_exc() log.error("closed connections") log.info('Connected to rabbitmq') self.channel = await protocol.channel() @@ -169,8 +173,8 @@ class HornProm(object): await svr.start(port=42069) log.debug('Started prom server?') - while (True): - await asyncio.sleep(1) +# while (True): +# await asyncio.sleep(1) class GrahDiscordException(Exception): pass @@ -200,18 +204,21 @@ class GrahDiscordBot(discord.Client, HornClient, HornProm): self.player = None self.used = [] self.state = GrahState(grahbot=self) + self.tasks = set() - self.loop = asyncio.new_event_loop() - asyncio.set_event_loop(self.loop) + self.loop = asyncio.get_event_loop() + #asyncio.set_event_loop(self.loop) #self.horn_client = HornClient('config-piege.json') HornClient.__init__(self, config) - self.rabbit = self.loop.run_until_complete(self.rabbit_connect()) + self.rabbit = self.loop.create_task(self.rabbit_connect()) + self.tasks.add(self.rabbit) HornProm.__init__(self) self.horn_prom = self.loop.create_task(self.prom_connect()) + self.tasks.add(self.horn_prom) discord.Client.__init__(self, loop=self.loop, intents=intents) - self.loop_forever() + #self.loop_forever() def is_playing(self): @@ -267,11 +274,15 @@ class GrahDiscordBot(discord.Client, HornClient, HornProm): # for further manipulation if we want to do funky stuff - def loop_forever(self): + async def loop_forever(self): try: log.debug('Starting the main discord loop') - self.loop.run_until_complete(self.start(self.config['token'])) + await self.start(self.config['token']) + + while True: + await asyncio.sleep(1) log.debug('Discord loop complete') + except KeyboardInterrupt: self.loop.run_until_complete(self.logout()) pending = asyncio.all_tasks(loop=self.loop) @@ -285,8 +296,8 @@ class GrahDiscordBot(discord.Client, HornClient, HornProm): gathered.exception() except: pass - except Exception: - pass + except Exception as e: + traceback.print_exc() finally: self.loop.close() @@ -437,8 +448,8 @@ class GrahDiscordBot(discord.Client, HornClient, HornProm): def contextual_guild(self, message): channel = message.channel if isinstance(channel, discord.DMChannel): - log.debug('Is a DM with {}'.format(channel.recipient)) - retval = self.member_guild_list(channel.recipient) + log.debug('Is a DM with {}'.format(message.author)) + retval = self.member_guild_list(message.author) log.debug('Member guilds: {}'.format(retval)) return retval else: @@ -448,7 +459,21 @@ class GrahDiscordBot(discord.Client, HornClient, HornProm): def channel_guild(self, message): if isinstance(message.channel, discord.DMChannel): - return self.state.get_user_guild(message.channel.recipient) + #c = self.get_channel(message.channel.id) + #c = await self.fetch_channel(message.channel.id) + #print(c) + #for attr in dir(message): + # try: + # print(attr) + # print(getattr(message, attr)) + # except Exception as e: + # pass + #print('author id') + #print(message.author.id) + #for attr in dir(self): + # print(attr) + # print(getattr(self, attr)) + return self.state.get_user_guild(message.author) else: return self.contextual_guild(message) @@ -460,9 +485,9 @@ class GrahDiscordBot(discord.Client, HornClient, HornProm): guild_name = message.content.replace('!selectguild ', '') for guild in guilds: if guild.name == guild_name: - log.debug('Setting member {} to guild {}'.format(message.channel.recipient, guild.name)) - self.member_guilds[message.channel.recipient] = [guild] - self.state.set_user_guild(message.channel.recipient, guild) + log.debug('Setting member {} to guild {}'.format(message.author, guild.name)) + self.member_guilds[message.author] = [guild] + self.state.set_user_guild(message.author, guild) return else: log.debug('Guild {} does not match {}'.format(guild.name, guild_name)) @@ -578,11 +603,25 @@ class GrahDiscordBot(discord.Client, HornClient, HornProm): else: # try to get the already established VC and resave self.state.set_voice(before.channel.guild, self.state.get_voice(before.channel.guild)) - - -if __name__ == '__main__': +async def main(): config = json.load(open('{}/config.json'.format(PERSISTENT_DIR), 'r')) intents = discord.Intents.default() intents.members = True intents.voice_states = True bot = GrahDiscordBot(config, intents=intents) + + async with bot: + task = bot.loop.create_task(bot.loop_forever()) + bot.tasks.add(task) + + pending = asyncio.all_tasks(loop=bot.loop) + for task in bot.tasks: + if task.get_coro() != main: + print(task) + print(task.get_coro().cr_code) + await task + + + +if __name__ == '__main__': + asyncio.run(main())