Add an audio alert plugin

This commit is contained in:
Derek Schmidt 2021-05-15 23:00:40 -07:00
parent 0862940ff4
commit 545031ef09
2 changed files with 149 additions and 0 deletions

View file

@ -52,3 +52,30 @@ enabled = false
# # goes here!
# # """
# template="Pog count: {pog}"
# # Plays a sound when specific criteria are met
# [[plugin.AudioAlert]]
# # The output device to use (run `python -m plugins.AudioAlert` to get a list of valid devices)
# output = 'ALSA:default'
# # The path to a sound you want to play. Must be in wave format. Depending on your output target, you may have to match channel count and sample rates to the output
# soundpath = '../your-cool-sound.wav'
# # An array of criteria. The sound will play if a message meets any of these criteria.
# # Criteria can have any combination of three selectors:
# # + event: The name of an event linked to the message
# # + monitization: The amount of money attached to the message in USD. Can be an exact value or a range (like '1-10')
# # + regex: A regex to match against the message text. This will partial match, so use start of string and end of string selectors if you want to match the whole message!
# #
# # To play a sound if a user donates exactly 4 dolalrs and twenty cents and said "blaze it" or "blazeit", you'd add
# # criteria = [
# # { monitization = 4.20, regex = 'blaze\s?it' },
# # ]
# # or if you wanted a the same sound to play for either a monitization between 1 and 5 dollars or a sub, you could have
# # critera = [
# # { monitization = '1-5' },
# # { event = 'SUB' },
# # ]
# critera = []
# # How much of the sample to load at a time. Must be a power of two. Increase this if you have glitchy playback!
# buffer_length = 4096
# # Prevent cuttoff by waiting this number of buffers before closing the stream. Depending on your output target, this may not be neccesary
# cutoff_prevention_buffers = 4

122
plugins/AudioAlert.py Normal file
View file

@ -0,0 +1,122 @@
import re
from contextlib import redirect_stdout
import sys
import pyaudio as pya
import soundfile
import numpy as np
from .PluginBase import PluginBase
class Plugin(PluginBase):
def __init__(self, critera=None, soundpath=None, output=None, buffer_length=1024, cutoff_prevention_buffers=0):
super().__init__()
self._criteria = critera
self._soundpath = soundfile
self._output = output
self._cutoff_prevent_length = cutoff_prevention_buffers
self._buffer_length = buffer_length
self._sound = soundfile.SoundFile(soundpath)
self._stream = None
self._cutoff_prevent_count = 0
def tick(self, dt):
pass
def handle_event(self, event):
pass
def on_message(self, message):
if self.should_alert(message):
self._play()
return message
def should_alert(self, message):
for criterion in self._criteria:
if criterion.get('event'):
if not (message.for_event is not None and message.for_event.name == criterion['event']):
continue
if criterion.get('monitization'):
if '-' in criterion['monitization']:
lower_bound, upper_bound = (float(bound) for bound in criterion['monitization'].split('-'))
if not (message.monitization is not None and message.monitization >= lower_bound and message.monitization <= upper_bound):
continue
else:
if not (message.monitization is not None and message.monitization == float(criterion['monitization'])):
continue
if criterion.get('regex'):
if re.search(criterion['regex'], message.text) is None:
continue
return True
def prepare_stream(self):
print('Starting portaudio - prepare for debug spam!')
pyaudio = pya.PyAudio()
print('-' * 10)
if self._output is not None:
if ':' in self._output:
host_api_name, output_name = self._output.split(':', 1)
else:
host_api_name = self._output
output_name = None
for i in range(pyaudio.get_host_api_count()):
host_api_info = pyaudio.get_host_api_info_by_index(i)
if host_api_info['name'] == host_api_name:
if output_name is None:
output_index = host_api_info['defaultOutputDevice']
break
else:
for j in range(host_api_info['deviceCount']):
device_info = pyaudio.get_device_info_by_host_api_device_index(i, j)
if device_info['name'] == output_name:
output_index = device_info['index']
break
else:
raise ValueError(f'Could not find requested output device: {output_name}')
break
else:
raise ValueError(f'Could not find requested audio API: {host_api_name}')
else:
output_index = None
self._stream = pyaudio.open(
output_device_index=output_index,
format=pya.paFloat32,
channels=self._sound.channels,
rate=self._sound.samplerate,
frames_per_buffer=self._buffer_length,
output=True,
stream_callback=self._read_callback)
def _play(self):
self._sound.seek(0)
self._cutoff_prevent_count = 0
if self._stream is None:
self.prepare_stream()
if not self._stream.is_active():
self._stream.stop_stream()
self._stream.start_stream()
def _read_callback(self, in_data, frame_count, time_info, status):
if self._sound.tell() == self._sound.frames:
self._cutoff_prevent_count += 1
return np.zeros((frame_count, self._sound.channels)), pya.paContinue if self._cutoff_prevent_count < self._cutoff_prevent_length else pya.paComplete
return self._sound.read(frames=frame_count, dtype='float32', fill_value=0), pya.paContinue
if __name__ == '__main__':
p = pya.PyAudio()
print('-' * 10)
for i in range(p.get_host_api_count()):
host_api_info = p.get_host_api_info_by_index(i)
for j in range(host_api_info['deviceCount']):
device_info = p.get_device_info_by_host_api_device_index(i, j)
if device_info['maxOutputChannels'] > 0:
print(f"{host_api_info['name']}:{device_info['name']}")