Compare commits

..

6 commits

Author SHA1 Message Date
4d85ecb5d2 [builtins/wait] Allow halting one of the concurrent channels on purpose 2025-01-27 22:24:35 -05:00
19e3d65e19 Concurrency get!
+ Your cursed setup can now do multiple things at once (GIL and 
unblocked main thread willing), tune with --parallel
+ Swaps "immediate" option for it's inverse "wait" in most plugins
+ Do some expensive ops in a threadpool
2025-01-27 22:08:11 -05:00
8b01284c28 [events/control] Make control events easier to generate
On-bus format is the same, so this is not a breaking change~

Enables events to get full access to their click command for more polish
2025-01-27 21:15:29 -05:00
7fffbe0954 [builtins/mkevent] Builtin that allows creating events from kdl! 2025-01-27 21:04:10 -05:00
405686f138 [builtins/command] Add unused text to ctx 2025-01-26 18:45:24 -05:00
1f1f0932b7 [plugins] Fix missing sample rate handling after core.audio rework 2025-01-26 18:44:42 -05:00
13 changed files with 158 additions and 41 deletions

View file

@ -15,12 +15,14 @@ logger = logging.getLogger(__name__)
@click.option('--bus-port', default='8080')
@click.option('--web-bind', default='localhost')
@click.option('--web-port', default='8000')
def start(config_file, bus_bind=None, bus_port=None, web_bind=None, web_port=None):
@click.option('--parallel', default=5)
def start(config_file, bus_bind=None, bus_port=None, web_bind=None, web_port=None, parallel=None):
"""Start audiencekit server"""
logger.info('Hewwo!!')
main = MainProcess(config_file,
bus_conf=(bus_bind, bus_port),
web_conf=(web_bind, web_port))
web_conf=(web_bind, web_port),
max_concurrent=parallel)
try:
asyncio.run(main.run())
except KeyboardInterrupt:

View file

@ -76,10 +76,11 @@ class EventCommandFactory(click.MultiCommand):
param = click.Option(param_decls=[f'--{field.name}'], type=param_type, default=field.default, show_default=True)
params.append(param)
if target_event.__dict__.get('__cli__'):
params.extend(target_event.__cli__())
command = click.Command(name, callback=mkevent_generic, params=params, help=target_event.__doc__)
if target_event.__dict__.get('_cli'):
command = target_event._cli(command)
return command

View file

@ -53,11 +53,12 @@ def import_or_reload_mod(module_name, default_package=None, external=False):
class MainProcess:
def __init__(self, config_path, bus_conf=(None, None), web_conf=(None, None)):
def __init__(self, config_path, bus_conf, web_conf, max_concurrent=5):
self._running = False
self.config_path = config_path
self.bus_conf = bus_conf
self.web_conf = web_conf
self.max_concurrent = max_concurrent
self.chat_processes = {}
self.plugins = {}
@ -294,6 +295,7 @@ class MainProcess:
await self.user_setup()
# Start plumbing tasks
user_tasks.add(loop.create_task(self.tick_plugins()))
for i in range(0, self.max_concurrent):
user_tasks.add(loop.create_task(self.handle_events()))
logger.info(f'Ready to rumble! Press Ctrl+C to shut down')

View file

@ -1,20 +1,47 @@
from dataclasses import dataclass, field
import json
import click
from ovtk_audiencekit.events import Event
@dataclass
class Control(Event):
"""Generic inter-bus communication"""
target: str
target: str = None
data: dict = field(default_factory=dict)
def __init__(self, via, target, **kwargs):
super().__init__(via)
self.target = target
self.data = kwargs
def __repr__(self):
return f"Control message : target = {self.target}, data = {self.data}"
def serialize(self):
return json.dumps({
'type': [cls.__name__ for cls in self.__class__.__mro__],
'data': {**self.data, 'target': self.target},
'data': {**self.data, **self.to_dict()},
})
@classmethod
def _cli(cls, cmd):
def parse(ctx, param, value):
if value:
return json.loads(value)
super_cb = cmd.callback
@click.pass_context
def expand(ctx, *args, json=None, **kwargs):
if json:
kwargs = {**json, **kwargs}
return ctx.invoke(super_cb, *args, **kwargs)
cmd.params.append(
click.Option(['--json', '-d'], callback=parse, help="Add arbitrary data in JSON format")
)
cmd.callback = expand
return cmd

View file

@ -43,14 +43,13 @@ class Message(Event):
return super().hydrate(user_type=user_type, **kwargs)
@classmethod
def __cli__(cls):
def _cli(cls, cmd):
def dono(ctx, param, value):
if value:
return [value, value]
return [
click.Option(['--monitization', '-m'], type=click.FLOAT, callback=dono),
]
cmd.params.append(click.Option(['--monitization', '-m'], type=click.FLOAT, callback=dono))
return cmd
@dataclass

View file

@ -28,11 +28,12 @@ class Subscription(Event):
return f"Subcription from {self.user_name or 'anonymous'} to {recipent} - tier = {self.tier}"
@classmethod
def __cli__(cls):
def _cli(cls, cmd):
def userfactory(ctx, param, value):
if value:
return [User(user_name=name, user_id=name) for name in value]
return [
click.Option(['--gifted_to', '-g'], type=click.STRING, callback=userfactory, multiple=True),
]
cmd.params.append(
click.Option(['--gifted_to', '-g'], type=click.STRING, callback=userfactory, multiple=True)
)
return cmd

View file

@ -22,14 +22,16 @@ class AudioAlert(PluginBase):
sample_rate = next((rate for rate in [44100, 48000] if Stream.check_rate(self.output_index, 1, rate)))
except StopIteration:
self.logger.warn('Target audio device does not claim to support common sample rates! Attempting playback at native rate of audio')
self.sample_rate = sample_rate
def run(self, path, speed=1, keep_pitch=False, immediate=True, poly=1, **kwargs):
async def run(self, path, speed=1, keep_pitch=False, wait=False, poly=1, **kwargs):
poly = int(poly)
key = f'{path}@{speed}{"X" if keep_pitch else "x"}'
clip = self.clips.get(key, [None, None])[0]
if clip is None:
clip = Clip(path, speed=speed, keep_pitch=keep_pitch, force_stereo=self.force_stereo)
clip = Clip(path, speed=speed, keep_pitch=keep_pitch,
samplerate=self.sample_rate, force_stereo=self.force_stereo)
self.clips[key] = [clip, maya.now()]
else:
self.clips[key][1] = maya.now()
@ -51,12 +53,12 @@ class AudioAlert(PluginBase):
stream_dq.append(stream)
if immediate:
task = asyncio.create_task(stream.aplay())
task.add_done_callback(self.tasks.remove)
self.tasks.add(task)
if wait:
await stream.aplay()
else:
stream.play()
task = asyncio.create_task(stream.aplay())
task.add_done_callback(self.tasks.discard)
self.tasks.add(task)
def close(self):
self._cleanup_task.cancel()

View file

@ -19,6 +19,11 @@ class TextToSpeechPlugin(PluginBase):
self.speaker_wav = speaker_wav
self.output_index = Stream.find_output_index(output)
try:
sample_rate = next((rate for rate in [44100, 48000] if Stream.check_rate(self.output_index, 1, rate)))
except StopIteration:
self.logger.warn('Target audio device does not claim to support common sample rates! Attempting playback at native rate of audio')
self.sample_rate = sample_rate
conf_overrides = {k[2:]: v for k, v in kwargs.items() if k.startswith('o_')}
@ -53,6 +58,12 @@ class TextToSpeechPlugin(PluginBase):
use_cuda=self.cuda,
)
self.tasks = set()
def close(self):
for task in self.tasks:
task.cancel()
def make_tts_wav(self, text, filename=None):
if filename is None:
filename = os.path.join(self.cache_dir, f'{uuid.uuid1()}.wav')
@ -65,25 +76,29 @@ class TextToSpeechPlugin(PluginBase):
self.synthesizer.save_wav(wav, filename)
return filename
async def run(self, text, *args, _ctx={}, wait=True, **kwargs):
async def run(self, text, *args, _ctx={}, wait=False, **kwargs):
try:
# Force punctuation (keep AI from spinning off into random noises)
if not any([text.endswith(punc) for punc in '.!?:']):
text += '.'
filename = self.make_tts_wav(text)
# Do TTS processing in a thread to avoid blocking main loop
filename = await asyncio.get_running_loop().run_in_executor(None, self.make_tts_wav, text)
# TODO: Play direct from memory
clip = Clip(filename, force_stereo=True)
clip = Clip(filename, force_stereo=True, samplerate=self.sample_rate)
stream = Stream(clip, self.output_index)
if wait:
async def play():
try:
await stream.aplay()
finally:
stream.close()
os.remove(os.path.join(self.cache_dir, filename))
asyncio.create_task(play())
else:
stream.play()
stream.close()
os.remove(os.path.join(self.cache_dir, filename))
task = asyncio.create_task(play())
self.tasks.add(task)
task.add_done_callback(self.tasks.discard)
if wait:
await task
except Exception as e:
self.logger.error(f"Failed to make speech from input: {e}")
if source_event := _ctx.get('event'):

View file

@ -87,7 +87,7 @@ class Command:
args = emoteless.split()[1:]
parsed, unknown = self._parser.parse_known_args(args)
parsed_asdict = vars(parsed)
return parsed_asdict
return parsed_asdict, unknown
class CommandPlugin(PluginBase):
@ -127,9 +127,10 @@ class CommandPlugin(PluginBase):
continue
if command.invoked(event):
try:
args = command.parse(event.text)
args, unknown = command.parse(event.text)
self.logger.debug(f"Parsed args for {command.name}: {args}")
ctx = dict(event=event, **args)
self.logger.debug(f"Remaining text: {unknown}")
ctx = dict(event=event, rest=' '.join(unknown), **args)
await self.execute_kdl(actionnode.nodes, _ctx=ctx)
except argparse.ArgumentError as e:
msg = SysMessage(self._name, f"{e}. See !help {command.name}", replies_to=event)

View file

@ -0,0 +1,53 @@
import dataclasses
from enum import Enum
from ovtk_audiencekit.core import PluginBase
from ovtk_audiencekit.events import Event
from ovtk_audiencekit.utils import get_subclasses
class EvFactory:
def __init__(self, ev_class):
self.target_class = ev_class
self.arg_convs = []
self.kwarg_convs = {}
for field in dataclasses.fields(self.target_class):
required = all(isinstance(default, dataclasses.MISSING.__class__) for default in [field.default, field.default_factory])
if issubclass(field.type, Enum):
conv = lambda value: field.type.__members__[value]
else:
conv = None
if required:
self.arg_convs.append(conv)
else:
self.kwarg_convs[field.name] = conv
def make(self, via, *args, **kwargs):
args = [
conv(arg) if conv else arg
for arg, conv in zip(args, self.arg_convs)
]
kwargs = {
key: self.kwarg_convs[key](value) if self.kwarg_convs.get(key) else value
for key, value in kwargs.items()
}
return self.target_class(via, *args, **kwargs)
class MakeEventPkugin(PluginBase):
"""Create a new event and send it to the event bus"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.event_factories = {}
for event_class in get_subclasses(Event):
event_name = event_class.__name__
self.event_factories[event_name] = EvFactory(event_class)
def run(self, event_name, *args, _children=[], _ctx={}, **kwargs):
event_factory = self.event_factories.get(event_name)
if event_factory is None:
raise ValueError(f'Unknown event type "{event_name}"')
ev = event_factory.make('kdl', *args, **kwargs)
self.send_to_bus(ev)

View file

@ -36,14 +36,14 @@ class ScenePlugin(PluginBase):
self.blueprint.add_url_rule('/<name>/<cmd>', 'api-sceneset', self.ui_setscene)
self.blueprint.add_url_rule('/monitor', 'monitor', self.ui_monitor_ws, is_websocket=True)
async def run(self, name, _children=None, _ctx={}, active=None, group=None, immediate=True, oneshot=False, **kwargs):
async def run(self, name, _children=None, _ctx={}, active=None, group=None, oneshot=False, **kwargs):
if _children is None:
raise UsageError('Empty scene definition! Did you mean scene.set?')
await self.define(name, group, _children, default_active=active, oneshot=oneshot, ctx=_ctx)
async def set(self, name, _children=None, _ctx={}, active=True, immediate=True):
await self.switch(name, active, is_immediate=immediate, ctx=_ctx)
async def set(self, name, _children=None, _ctx={}, active=True, wait=False):
await self.switch(name, active, is_immediate=not wait, ctx=_ctx)
async def define(self, name, group, children, default_active=False, oneshot=False, ctx={}):
if self.scenes.get(name) is not None:

View file

@ -0,0 +1,9 @@
from ovtk_audiencekit.core import PluginBase
import asyncio
class WaitPlugin(PluginBase):
"""Halt execution for a bit"""
async def run(self, seconds=0, minutes=0, **kwargs): # If you want `hours`, why??????
await asyncio.sleep(seconds + (minutes * 60))

View file

@ -9,5 +9,10 @@ from .Choice import ChoicePlugin as choice
from .Set import SetPlugin as set
from .Scene import ScenePlugin as scene
from .Log import LogPlugin as log
from .Mkevent import MakeEventPkugin as mkevent
from .Wait import WaitPlugin as wait
__all__ = ['trigger', 'reply', 'command', 'cue', 'write', 'exec', 'chance', 'choice', 'set', 'scene', 'log']
__all__ = [
'trigger', 'reply', 'command', 'cue', 'write', 'exec', 'chance', 'choice',
'set', 'scene', 'log', 'mkevent', 'wait',
]