From 966b0c576de69511092ebd32706912bc4e8d0f5f Mon Sep 17 00:00:00 2001 From: jweigele Date: Sun, 6 Feb 2022 17:06:05 -0800 Subject: [PATCH] Switch grahbot to alpine for newer python, make a new multi-mixer ffmpeg audio source --- Dockerfile | 18 +++- ffmpegfile.py | 228 ++++++++++++++++++++++++++++++++++++++++++++++++++ grahbot.py | 22 +++-- 3 files changed, 257 insertions(+), 11 deletions(-) create mode 100644 ffmpegfile.py diff --git a/Dockerfile b/Dockerfile index eda2cc5..7ab1f20 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,13 +1,22 @@ # Use the official image as a parent image. -FROM ubuntu +FROM alpine # Set the working directory. WORKDIR /usr/src/app # Run the command inside your image filesystem. -RUN apt-get update -RUN apt-get install -y --no-install-recommends python3-pip ffmpeg -RUN pip3 install discord.py asynqp PyNaCl +RUN apk update +RUN apk add musl-dev +RUN apk add python3-dev +RUN apk add py3-numpy +RUN apk add ffmpeg +RUN apk add py3-wheel +RUN apk add py3-pip +RUN apk add py3-cffi +RUN apk add gcc +RUN apk add make +RUN pip3 install pynacl +RUN pip3 install discord.py asynqp RUN pip3 install aioprometheus # Run the specified command within the container. @@ -15,3 +24,4 @@ CMD [ "/usr/src/app/grahbot.py" ] # Copy the rest of your app's source code from your host to your image filesystem. COPY grahbot.py . +COPY ffmpegfile.py . diff --git a/ffmpegfile.py b/ffmpegfile.py new file mode 100644 index 0000000..6437503 --- /dev/null +++ b/ffmpegfile.py @@ -0,0 +1,228 @@ +import subprocess +import sys +import os +import traceback +import random +import discord +import struct +import asyncio +import logging +import threading +import numpy +import io + +log = logging.getLogger(__name__) +log.setLevel(logging.DEBUG) +ch = logging.StreamHandler() +ch.setFormatter(logging.Formatter('|%(levelname)s|%(asctime)s|%(module)s|%(lineno)d %(message)s')) +log.addHandler(ch) + + +MAX_LENGTH = int(2*2*0.020*48000) + +class AudioChunk(object): + # 16 bits channel, 2 channels, 20 ms, 48khz + MAX_LENGTH = int(2*2*0.020*48000) + def __init__(self, init_bytes=None, init_samples=None): + if init_bytes is not None: + init_bytes = self.pad_out(init_bytes) + self.samples = numpy.array([x[0] for x in struct.iter_unpack(' 0: + retval = self.samples.pop(0) + + else: + # dead air + #log.debug('dead air fill') + retval = AudioChunk(b'') + return retval + + def fill_deck(self): + # block if you got nothin + if self.deck_size == 0: + blocking = True + log.debug('Filling deck, blocking: {}'.format(blocking)) + else: + blocking = False + # only try, do nothing if we don't get it + got_lock = self.sample_lock.acquire(blocking=blocking) + if got_lock: + if self.deck_size < self.DECK_SIZE: + # fill with number missing + for sample_index in range(self.DECK_SIZE - self.deck_size): + self.on_deck.append(self.get_next_sample()) + self.sample_lock.release() + + def terminate_all(self): + self.sample_lock.acquire() + self.samples = [] + self.sample_lock.release() + + async def process_file(self, filename): + rand_str = str(hex(random.randint(0, 10000))) + #fifo_path = '/tmp/fifo{}'.format(rand_str) + locked = False + try: + #os.mkfifo(fifo_path) + args = ['ffmpeg', '-i', '{}'.format(filename)] + args += ['-f', 's16le', '-acodec', 'pcm_s16le', '-ac', '2', '-ar', '48000', '-'] + p = await asyncio.create_subprocess_exec(*args, stdin=None, stderr=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE) + #return_fifo = open(fifo_path, 'rb') + #return_bytes = await asyncio.to_thread(return_fifo.read) + #return_bytes = return_fifo.read() + stdout, stderr = await p.communicate() + #log.debug(stdout) + #log.debug(stderr) + return_bytes = stdout + await p.wait() + #return_bytes = p.communicate()[0] + byte_length = len(return_bytes) + sample_index = 0 #len(self.samples) + min_deck_size = self.DECK_SIZE + byte_start = 0 + return_byte_length = len(return_bytes) + chunks_to_append = [] + byte_reader = io.BytesIO(return_bytes) + # byte processing and chunk generation + #while byte_length > 0: + #log.debug(byte_length) + # keeps us from tailing off the end of the bytestream +# if MAX_LENGTH > byte_length: +# sample_bytes = byte_length +# else: +# sample_bytes = MAX_LENGTH +# log.debug('byte start {} sample bytes {}'.format(byte_start, sample_bytes)) + # read in the next chunk, store for now + + next_bytes = byte_reader.read(MAX_LENGTH) + while next_bytes: + chunk = AudioChunk(init_bytes=next_bytes) + #log.debug(chunk, chunk.samples) + chunks_to_append.append(chunk) + next_bytes = byte_reader.read(MAX_LENGTH) + + # chomp + #byte_start += sample_bytes + #byte_length = return_byte_length - byte_start + + # chunk append/insertion (along with mixing) + # this is the part that actually needs locked access + locked = self.sample_lock.acquire() + sample_index = 0 + log.debug('starting the chunk append now') + for chunk in chunks_to_append: + if self.deck_size < min_deck_size: + min_deck_size = self.deck_size + + if self.sample_size == sample_index:# or True: + #log.debug('appending chunk {} {}'.format(chunk, chunk.samples)) + self.samples.append(chunk) + # replace + else: + #log.debug('mixing chunk {}'.format(chunk)) + self.samples[sample_index] = self.samples[sample_index].mix_stream(chunk) + sample_index +=1 + #log.debug('finished chunk append final sample_index {} sample_size {}'.format(sample_index, self.sample_size)) + + + + except Exception as e: + #log.debug('sample size is {} sample_index is {}'.format(self.sample_size, sample_index)) + traceback.print_exc() + finally: + if locked: + log.debug('min deck size: {}'.format(min_deck_size)) + self.sample_lock.release() + #os.unlink(fifo_path) + + def print_buffer(self): + for sample in self.samples: + sys.stdout.buffer.write(sample.read()) + +if __name__ == '__main__': + ab = AudioBuffer() + ab.process_file(sys.argv[1]) + for x in range(100): + #sys.stderr.write('step {}'.format(x)) + sys.stdout.buffer.write(ab.read()) + ab.process_file(sys.argv[2]) + sys.stderr.write('{}'.format(ab.sample_size)) + sample = ab.read() + while sample: + #print([hex(x) for x in sample]) + sys.stdout.buffer.write(sample) + sample = ab.read() + + diff --git a/grahbot.py b/grahbot.py index f45ae83..66a1120 100755 --- a/grahbot.py +++ b/grahbot.py @@ -12,7 +12,9 @@ import json import traceback import os import logging -from aioprometheus import Counter, Service, Gauge +import ffmpegfile +from aioprometheus import Counter, Gauge +from aioprometheus.service import Service log = logging.getLogger(__name__) log.setLevel(logging.DEBUG) ch = logging.StreamHandler() @@ -156,7 +158,7 @@ class HornProm(object): svr.register(self.horn_counter) svr.register(self.horn_voice) - await svr.start(addr="0.0.0.0", port=42069) + await svr.start(addr="", port=42069) log.debug('Started prom server?') while (True): await asyncio.sleep(1) @@ -201,10 +203,13 @@ class GrahDiscordBot(discord.Client, HornClient, HornProm): discord.Client.__init__(self, loop=self.loop, intents=intents) self.loop_forever() + + def is_playing(self): + return False + def terminate_all(self, guild): if self.state.get_voice(guild): - if self.state.get_voice(guild).is_playing(): - self.state.get_voice(guild).stop() + self.state.get_voice(guild)._player.source.terminate_all() def get_sample_name(self): selections = os.listdir(self.config['airhorn_directory']) @@ -326,9 +331,9 @@ class GrahDiscordBot(discord.Client, HornClient, HornProm): async def msg_play_filename(self, filename, guild_id, exclusive=False): guild = discord.utils.get(self.guilds, id=guild_id) if self.state.get_voice(guild): - self.terminate_all(guild=guild) - self.state.get_voice(guild).play(discord.FFmpegOpusAudio(filename)) - #self.player.start() + if exclusive: + self.terminate_all(guild=guild) + await self.state.get_voice(guild)._player.source.process_file(filename) else: log.debug('Was asked to play {} but no voice channel'.format(filename)) @@ -344,6 +349,9 @@ class GrahDiscordBot(discord.Client, HornClient, HornProm): await self.state.get_voice(guild).move_to(vc) else: self.state.set_voice(guild, await vc.connect()) + audio_buffer = ffmpegfile.AudioBuffer() + self.state.get_voice(guild).play(audio_buffer) + log.info('Joined voice channel {} {} (id: {})'.format(guild, vc.name, vc.id)) log.debug('Voice now {}'.format(self.state.get_voice(guild))) -- 2.30.2