Switch grahbot to alpine for newer python, make a new multi-mixer ffmpeg audio source
authorjweigele <jweigele@local>
Mon, 7 Feb 2022 01:06:05 +0000 (17:06 -0800)
committerjweigele <jweigele@local>
Mon, 7 Feb 2022 01:06:05 +0000 (17:06 -0800)
Dockerfile
ffmpegfile.py [new file with mode: 0644]
grahbot.py

index eda2cc51687d476e2d5998da21dd3ff48e569a11..7ab1f20d0881f362992a874e5dc614fae4c7c918 100644 (file)
@@ -1,13 +1,22 @@
 # Use the official image as a parent image.
-FROM ubuntu
+FROM alpine
 
 # Set the working directory.
 WORKDIR /usr/src/app
 
 # Run the command inside your image filesystem.
-RUN apt-get update
-RUN apt-get install -y --no-install-recommends python3-pip ffmpeg
-RUN pip3 install discord.py asynqp PyNaCl
+RUN apk update
+RUN apk add musl-dev
+RUN apk add python3-dev
+RUN apk add py3-numpy
+RUN apk add ffmpeg
+RUN apk add py3-wheel
+RUN apk add py3-pip
+RUN apk add py3-cffi
+RUN apk add gcc
+RUN apk add make
+RUN pip3 install pynacl
+RUN pip3 install discord.py asynqp
 RUN pip3 install aioprometheus
 
 # Run the specified command within the container.
@@ -15,3 +24,4 @@ CMD [ "/usr/src/app/grahbot.py" ]
 
 # Copy the rest of your app's source code from your host to your image filesystem.
 COPY grahbot.py .
+COPY ffmpegfile.py .
diff --git a/ffmpegfile.py b/ffmpegfile.py
new file mode 100644 (file)
index 0000000..6437503
--- /dev/null
@@ -0,0 +1,228 @@
+import subprocess
+import sys
+import os
+import traceback
+import random
+import discord
+import struct
+import asyncio
+import logging
+import threading
+import numpy
+import io
+
+log = logging.getLogger(__name__)
+log.setLevel(logging.DEBUG)
+ch = logging.StreamHandler()
+ch.setFormatter(logging.Formatter('|%(levelname)s|%(asctime)s|%(module)s|%(lineno)d %(message)s'))
+log.addHandler(ch)
+
+
+MAX_LENGTH = int(2*2*0.020*48000)
+
+class AudioChunk(object):
+    # 16 bits channel, 2 channels, 20 ms, 48khz
+    MAX_LENGTH = int(2*2*0.020*48000)
+    def __init__(self, init_bytes=None, init_samples=None):
+        if init_bytes is not None:
+            init_bytes = self.pad_out(init_bytes)
+            self.samples = numpy.array([x[0] for x in struct.iter_unpack('<h', init_bytes)])
+        elif init_samples is not None:
+            self.samples = init_samples
+
+    @property
+    def sample_buffer(self):
+        #log.debug(self.samples)
+        #log.debug(list(self.samples))
+        return struct.pack('<'+'h'*self.samples.size, *list(self.samples)) 
+
+    @property
+    def length(self):
+        return self.samples.size
+
+    def pad_out(self, stream):
+        return stream + (self.MAX_LENGTH-len(stream))//2*b'\x00\x00'
+
+    def read(self):
+        # pad out
+        return self.sample_buffer
+        #return self.pad_out(self.sample_buffer)
+
+    def mix_stream(self, new_stream):
+        mixed_ints = self.samples + new_stream.samples
+        retval = AudioChunk(init_samples=numpy.clip(mixed_ints, -0x7fff-1, 0x7fff-1))
+        #our_ints = [x[0] for x in struct.iter_unpack('<h', self.read())]
+        #new_ints = [x[0] for x in struct.iter_unpack('<h', new_stream.read())]
+        #sys.stderr.write('{}\n{}\n'.format(our_ints, new_ints))
+        # simple add
+        #mixed_ints = [x[0] + x[1] for x in zip(our_ints, new_ints)]
+        # cut off clipped
+        #cut_top = [min(x, 0x7fff-1) for x in mixed_ints]
+        #cut_bottom = [max(x, -0x7fff-1) for x in  cut_top]
+        # return val
+        #retval = AudioChunk(struct.pack('<'+'h'*len(cut_bottom), *cut_bottom))
+        return retval
+
+class AudioBuffer(discord.AudioSource):
+    DECK_SIZE = 5
+    def __init__(self):
+        self.samples = []
+        self.on_deck = []
+        self.count = 0
+        self.sample_lock = threading.Lock()
+        self.deck_lock = threading.Lock()
+
+    def is_opus(self):
+        return False
+
+    def cleanup(self):
+        self.samples = []
+
+    def read(self):
+        #log.debug('Deck size: {}'.format(self.deck_size))
+        # block unless we're the only ones reading the deck
+        self.deck_lock.acquire()
+        # opportunistically try to fill deck
+        self.fill_deck()
+        # this will blow up if we run out of buffer
+        sample = self.on_deck.pop(0)
+        # allow for others to modify deck now
+        self.deck_lock.release()
+        retval = sample.read()
+        return retval
+        
+    @property
+    def sample_size(self):
+        return len(self.samples)
+
+    @property
+    def deck_size(self):
+        return len(self.on_deck)
+
+    def get_next_sample(self):
+        if self.sample_size > 0:
+            retval = self.samples.pop(0)
+            
+        else:
+            # dead air
+            #log.debug('dead air fill')
+            retval = AudioChunk(b'')
+        return retval
+
+    def fill_deck(self):
+        # block if you got nothin
+        if self.deck_size == 0:
+            blocking = True
+            log.debug('Filling deck, blocking: {}'.format(blocking))
+        else:
+            blocking = False
+        # only try, do nothing if we don't get it
+        got_lock = self.sample_lock.acquire(blocking=blocking)
+        if got_lock:
+            if self.deck_size < self.DECK_SIZE:
+                # fill with number missing
+                for sample_index in range(self.DECK_SIZE - self.deck_size):
+                    self.on_deck.append(self.get_next_sample())
+            self.sample_lock.release()
+
+    def terminate_all(self):
+        self.sample_lock.acquire()
+        self.samples = []
+        self.sample_lock.release()
+
+    async def process_file(self, filename):
+        rand_str = str(hex(random.randint(0, 10000)))
+        #fifo_path = '/tmp/fifo{}'.format(rand_str)
+        locked = False
+        try:
+            #os.mkfifo(fifo_path)
+            args = ['ffmpeg', '-i', '{}'.format(filename)]
+            args += ['-f', 's16le', '-acodec', 'pcm_s16le', '-ac', '2', '-ar', '48000', '-']
+            p = await asyncio.create_subprocess_exec(*args, stdin=None, stderr=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE)
+            #return_fifo = open(fifo_path, 'rb')
+            #return_bytes = await asyncio.to_thread(return_fifo.read)
+            #return_bytes = return_fifo.read()
+            stdout, stderr = await p.communicate()
+            #log.debug(stdout)
+            #log.debug(stderr)
+            return_bytes = stdout
+            await p.wait()
+            #return_bytes = p.communicate()[0]
+            byte_length = len(return_bytes)
+            sample_index = 0 #len(self.samples)
+            min_deck_size = self.DECK_SIZE
+            byte_start = 0
+            return_byte_length = len(return_bytes)
+            chunks_to_append = []
+            byte_reader = io.BytesIO(return_bytes)
+            # byte processing and chunk generation
+            #while byte_length > 0:
+                #log.debug(byte_length)
+                # keeps us from tailing off the end of the bytestream
+#                if MAX_LENGTH > byte_length:
+#                    sample_bytes = byte_length
+#                else:
+#                    sample_bytes = MAX_LENGTH
+#                log.debug('byte start {} sample bytes {}'.format(byte_start, sample_bytes))
+                # read in the next chunk, store for now
+
+            next_bytes = byte_reader.read(MAX_LENGTH)
+            while next_bytes:
+                chunk = AudioChunk(init_bytes=next_bytes)
+                #log.debug(chunk, chunk.samples)
+                chunks_to_append.append(chunk)
+                next_bytes = byte_reader.read(MAX_LENGTH)
+
+                # chomp
+                #byte_start += sample_bytes
+                #byte_length = return_byte_length - byte_start
+
+            # chunk append/insertion (along with mixing)
+            # this is the part that actually needs locked access
+            locked = self.sample_lock.acquire()
+            sample_index = 0
+            log.debug('starting the chunk append now')
+            for chunk in chunks_to_append:
+                if self.deck_size < min_deck_size:
+                    min_deck_size = self.deck_size
+
+                if self.sample_size == sample_index:# or True:
+                    #log.debug('appending chunk {} {}'.format(chunk, chunk.samples))
+                    self.samples.append(chunk)
+                # replace
+                else:
+                    #log.debug('mixing chunk {}'.format(chunk))
+                    self.samples[sample_index] = self.samples[sample_index].mix_stream(chunk)
+                sample_index +=1
+            #log.debug('finished chunk append final sample_index {} sample_size {}'.format(sample_index, self.sample_size))
+            
+
+
+        except Exception as e:
+            #log.debug('sample size is {} sample_index is {}'.format(self.sample_size, sample_index))
+            traceback.print_exc()
+        finally:
+            if locked:
+                log.debug('min deck size: {}'.format(min_deck_size))
+                self.sample_lock.release()
+            #os.unlink(fifo_path)
+
+    def print_buffer(self):
+        for sample in self.samples:
+            sys.stdout.buffer.write(sample.read())
+
+if __name__ == '__main__':
+    ab = AudioBuffer()
+    ab.process_file(sys.argv[1])
+    for x in range(100):
+        #sys.stderr.write('step {}'.format(x))
+        sys.stdout.buffer.write(ab.read())
+    ab.process_file(sys.argv[2])
+    sys.stderr.write('{}'.format(ab.sample_size))
+    sample = ab.read()
+    while sample:
+        #print([hex(x) for x in sample])
+        sys.stdout.buffer.write(sample)
+        sample = ab.read()
+    
+
index f45ae837b83c02cd3ea70fdc8d47f4f4f865236b..66a1120ef2c7e6488235f3cbe1625fe7cd727413 100755 (executable)
@@ -12,7 +12,9 @@ import json
 import traceback
 import os
 import logging
-from aioprometheus import Counter, Service, Gauge
+import ffmpegfile
+from aioprometheus import Counter, Gauge
+from aioprometheus.service import Service
 log = logging.getLogger(__name__)
 log.setLevel(logging.DEBUG)
 ch = logging.StreamHandler()
@@ -156,7 +158,7 @@ class HornProm(object):
         svr.register(self.horn_counter)
         svr.register(self.horn_voice)
         
-        await svr.start(addr="0.0.0.0", port=42069)
+        await svr.start(addr="", port=42069)
         log.debug('Started prom server?')
         while (True):
             await asyncio.sleep(1)
@@ -201,10 +203,13 @@ class GrahDiscordBot(discord.Client, HornClient, HornProm):
         discord.Client.__init__(self, loop=self.loop, intents=intents)
         self.loop_forever()
 
+
+    def is_playing(self):
+        return False
+
     def terminate_all(self, guild):
         if self.state.get_voice(guild):
-            if self.state.get_voice(guild).is_playing():
-                self.state.get_voice(guild).stop()
+            self.state.get_voice(guild)._player.source.terminate_all()
 
     def get_sample_name(self):
         selections = os.listdir(self.config['airhorn_directory'])
@@ -326,9 +331,9 @@ class GrahDiscordBot(discord.Client, HornClient, HornProm):
     async def msg_play_filename(self, filename, guild_id, exclusive=False):
         guild = discord.utils.get(self.guilds, id=guild_id)
         if self.state.get_voice(guild):
-            self.terminate_all(guild=guild)
-            self.state.get_voice(guild).play(discord.FFmpegOpusAudio(filename))
-            #self.player.start()
+            if exclusive:
+                self.terminate_all(guild=guild)
+            await self.state.get_voice(guild)._player.source.process_file(filename)
         else:
             log.debug('Was asked to play {} but no voice channel'.format(filename))
 
@@ -344,6 +349,9 @@ class GrahDiscordBot(discord.Client, HornClient, HornProm):
                         await self.state.get_voice(guild).move_to(vc)
                 else:
                     self.state.set_voice(guild, await vc.connect())
+                    audio_buffer = ffmpegfile.AudioBuffer()
+                    self.state.get_voice(guild).play(audio_buffer)
+
                 log.info('Joined voice channel {} {} (id: {})'.format(guild, vc.name, vc.id))
                 log.debug('Voice now {}'.format(self.state.get_voice(guild)))