--- /dev/null
+import subprocess
+import sys
+import os
+import traceback
+import random
+import discord
+import struct
+import asyncio
+import logging
+import threading
+import numpy
+import io
+
+log = logging.getLogger(__name__)
+log.setLevel(logging.DEBUG)
+ch = logging.StreamHandler()
+ch.setFormatter(logging.Formatter('|%(levelname)s|%(asctime)s|%(module)s|%(lineno)d %(message)s'))
+log.addHandler(ch)
+
+
+MAX_LENGTH = int(2*2*0.020*48000)
+
+class AudioChunk(object):
+ # 16 bits channel, 2 channels, 20 ms, 48khz
+ MAX_LENGTH = int(2*2*0.020*48000)
+ def __init__(self, init_bytes=None, init_samples=None):
+ if init_bytes is not None:
+ init_bytes = self.pad_out(init_bytes)
+ self.samples = numpy.array([x[0] for x in struct.iter_unpack('<h', init_bytes)])
+ elif init_samples is not None:
+ self.samples = init_samples
+
+ @property
+ def sample_buffer(self):
+ #log.debug(self.samples)
+ #log.debug(list(self.samples))
+ return struct.pack('<'+'h'*self.samples.size, *list(self.samples))
+
+ @property
+ def length(self):
+ return self.samples.size
+
+ def pad_out(self, stream):
+ return stream + (self.MAX_LENGTH-len(stream))//2*b'\x00\x00'
+
+ def read(self):
+ # pad out
+ return self.sample_buffer
+ #return self.pad_out(self.sample_buffer)
+
+ def mix_stream(self, new_stream):
+ mixed_ints = self.samples + new_stream.samples
+ retval = AudioChunk(init_samples=numpy.clip(mixed_ints, -0x7fff-1, 0x7fff-1))
+ #our_ints = [x[0] for x in struct.iter_unpack('<h', self.read())]
+ #new_ints = [x[0] for x in struct.iter_unpack('<h', new_stream.read())]
+ #sys.stderr.write('{}\n{}\n'.format(our_ints, new_ints))
+ # simple add
+ #mixed_ints = [x[0] + x[1] for x in zip(our_ints, new_ints)]
+ # cut off clipped
+ #cut_top = [min(x, 0x7fff-1) for x in mixed_ints]
+ #cut_bottom = [max(x, -0x7fff-1) for x in cut_top]
+ # return val
+ #retval = AudioChunk(struct.pack('<'+'h'*len(cut_bottom), *cut_bottom))
+ return retval
+
+class AudioBuffer(discord.AudioSource):
+ DECK_SIZE = 5
+ def __init__(self):
+ self.samples = []
+ self.on_deck = []
+ self.count = 0
+ self.sample_lock = threading.Lock()
+ self.deck_lock = threading.Lock()
+
+ def is_opus(self):
+ return False
+
+ def cleanup(self):
+ self.samples = []
+
+ def read(self):
+ #log.debug('Deck size: {}'.format(self.deck_size))
+ # block unless we're the only ones reading the deck
+ self.deck_lock.acquire()
+ # opportunistically try to fill deck
+ self.fill_deck()
+ # this will blow up if we run out of buffer
+ sample = self.on_deck.pop(0)
+ # allow for others to modify deck now
+ self.deck_lock.release()
+ retval = sample.read()
+ return retval
+
+ @property
+ def sample_size(self):
+ return len(self.samples)
+
+ @property
+ def deck_size(self):
+ return len(self.on_deck)
+
+ def get_next_sample(self):
+ if self.sample_size > 0:
+ retval = self.samples.pop(0)
+
+ else:
+ # dead air
+ #log.debug('dead air fill')
+ retval = AudioChunk(b'')
+ return retval
+
+ def fill_deck(self):
+ # block if you got nothin
+ if self.deck_size == 0:
+ blocking = True
+ log.debug('Filling deck, blocking: {}'.format(blocking))
+ else:
+ blocking = False
+ # only try, do nothing if we don't get it
+ got_lock = self.sample_lock.acquire(blocking=blocking)
+ if got_lock:
+ if self.deck_size < self.DECK_SIZE:
+ # fill with number missing
+ for sample_index in range(self.DECK_SIZE - self.deck_size):
+ self.on_deck.append(self.get_next_sample())
+ self.sample_lock.release()
+
+ def terminate_all(self):
+ self.sample_lock.acquire()
+ self.samples = []
+ self.sample_lock.release()
+
+ async def process_file(self, filename):
+ rand_str = str(hex(random.randint(0, 10000)))
+ #fifo_path = '/tmp/fifo{}'.format(rand_str)
+ locked = False
+ try:
+ #os.mkfifo(fifo_path)
+ args = ['ffmpeg', '-i', '{}'.format(filename)]
+ args += ['-f', 's16le', '-acodec', 'pcm_s16le', '-ac', '2', '-ar', '48000', '-']
+ p = await asyncio.create_subprocess_exec(*args, stdin=None, stderr=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE)
+ #return_fifo = open(fifo_path, 'rb')
+ #return_bytes = await asyncio.to_thread(return_fifo.read)
+ #return_bytes = return_fifo.read()
+ stdout, stderr = await p.communicate()
+ #log.debug(stdout)
+ #log.debug(stderr)
+ return_bytes = stdout
+ await p.wait()
+ #return_bytes = p.communicate()[0]
+ byte_length = len(return_bytes)
+ sample_index = 0 #len(self.samples)
+ min_deck_size = self.DECK_SIZE
+ byte_start = 0
+ return_byte_length = len(return_bytes)
+ chunks_to_append = []
+ byte_reader = io.BytesIO(return_bytes)
+ # byte processing and chunk generation
+ #while byte_length > 0:
+ #log.debug(byte_length)
+ # keeps us from tailing off the end of the bytestream
+# if MAX_LENGTH > byte_length:
+# sample_bytes = byte_length
+# else:
+# sample_bytes = MAX_LENGTH
+# log.debug('byte start {} sample bytes {}'.format(byte_start, sample_bytes))
+ # read in the next chunk, store for now
+
+ next_bytes = byte_reader.read(MAX_LENGTH)
+ while next_bytes:
+ chunk = AudioChunk(init_bytes=next_bytes)
+ #log.debug(chunk, chunk.samples)
+ chunks_to_append.append(chunk)
+ next_bytes = byte_reader.read(MAX_LENGTH)
+
+ # chomp
+ #byte_start += sample_bytes
+ #byte_length = return_byte_length - byte_start
+
+ # chunk append/insertion (along with mixing)
+ # this is the part that actually needs locked access
+ locked = self.sample_lock.acquire()
+ sample_index = 0
+ log.debug('starting the chunk append now')
+ for chunk in chunks_to_append:
+ if self.deck_size < min_deck_size:
+ min_deck_size = self.deck_size
+
+ if self.sample_size == sample_index:# or True:
+ #log.debug('appending chunk {} {}'.format(chunk, chunk.samples))
+ self.samples.append(chunk)
+ # replace
+ else:
+ #log.debug('mixing chunk {}'.format(chunk))
+ self.samples[sample_index] = self.samples[sample_index].mix_stream(chunk)
+ sample_index +=1
+ #log.debug('finished chunk append final sample_index {} sample_size {}'.format(sample_index, self.sample_size))
+
+
+
+ except Exception as e:
+ #log.debug('sample size is {} sample_index is {}'.format(self.sample_size, sample_index))
+ traceback.print_exc()
+ finally:
+ if locked:
+ log.debug('min deck size: {}'.format(min_deck_size))
+ self.sample_lock.release()
+ #os.unlink(fifo_path)
+
+ def print_buffer(self):
+ for sample in self.samples:
+ sys.stdout.buffer.write(sample.read())
+
+if __name__ == '__main__':
+ ab = AudioBuffer()
+ ab.process_file(sys.argv[1])
+ for x in range(100):
+ #sys.stderr.write('step {}'.format(x))
+ sys.stdout.buffer.write(ab.read())
+ ab.process_file(sys.argv[2])
+ sys.stderr.write('{}'.format(ab.sample_size))
+ sample = ab.read()
+ while sample:
+ #print([hex(x) for x in sample])
+ sys.stdout.buffer.write(sample)
+ sample = ab.read()
+
+
import traceback
import os
import logging
-from aioprometheus import Counter, Service, Gauge
+import ffmpegfile
+from aioprometheus import Counter, Gauge
+from aioprometheus.service import Service
log = logging.getLogger(__name__)
log.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
svr.register(self.horn_counter)
svr.register(self.horn_voice)
- await svr.start(addr="0.0.0.0", port=42069)
+ await svr.start(addr="", port=42069)
log.debug('Started prom server?')
while (True):
await asyncio.sleep(1)
discord.Client.__init__(self, loop=self.loop, intents=intents)
self.loop_forever()
+
+ def is_playing(self):
+ return False
+
def terminate_all(self, guild):
if self.state.get_voice(guild):
- if self.state.get_voice(guild).is_playing():
- self.state.get_voice(guild).stop()
+ self.state.get_voice(guild)._player.source.terminate_all()
def get_sample_name(self):
selections = os.listdir(self.config['airhorn_directory'])
async def msg_play_filename(self, filename, guild_id, exclusive=False):
guild = discord.utils.get(self.guilds, id=guild_id)
if self.state.get_voice(guild):
- self.terminate_all(guild=guild)
- self.state.get_voice(guild).play(discord.FFmpegOpusAudio(filename))
- #self.player.start()
+ if exclusive:
+ self.terminate_all(guild=guild)
+ await self.state.get_voice(guild)._player.source.process_file(filename)
else:
log.debug('Was asked to play {} but no voice channel'.format(filename))
await self.state.get_voice(guild).move_to(vc)
else:
self.state.set_voice(guild, await vc.connect())
+ audio_buffer = ffmpegfile.AudioBuffer()
+ self.state.get_voice(guild).play(audio_buffer)
+
log.info('Joined voice channel {} {} (id: {})'.format(guild, vc.name, vc.id))
log.debug('Voice now {}'.format(self.state.get_voice(guild)))