Compare commits

..

3 commits

Author SHA1 Message Date
Derek
220e5f8a25 WIP secrets refactor 2023-07-15 08:39:44 -04:00
Derek
65f4499bbd [Chat] Port Misskey to plugin 2023-07-15 08:39:38 -04:00
Derek
a6d1162665 [Chat] Port fakechat to plugin 2023-07-15 08:30:30 -04:00
95 changed files with 3318 additions and 3796 deletions

7
.gitignore vendored
View file

@ -1,7 +1,4 @@
.venv/
.pdm-python
.pdm-build/
__pycache__/
/dist/
*.secret* *.secret*
secrets.kdl secrets.kdl
__pycache__/
/*-log

41
Pipfile Normal file
View file

@ -0,0 +1,41 @@
[[source]]
url = "https://pypi.org/simple"
verify_ssl = true
name = "pypi"
[packages]
requests = "*"
websockets = "*"
miniirc = "*"
num2words = "*"
pyaudio = "==0.2.*"
numpy = "*"
click = "*"
owoify-py = "==2.*"
kdl-py = "*"
maya = "*"
multipledispatch = "*"
blessed = "*"
appdirs = "*"
watchdog = "*"
mido = "*"
python-rtmidi = "*"
librosa = "==0.8.*"
pytsmod = "*"
quart = "==0.17.*"
aioscheduler = "*"
TTS = "==0.9.*"
torch = "==1.13.*"
simpleobsws = "*"
python-socketio = {extras = ["client"], version = "*"}
aioprocessing = "*"
[requires]
python_version = "3.10"
[dev-packages]
pipenv-setup = "*"
[scripts]
start = "python audiencekit.py start"
ws = "python audiencekit.py ws"

2447
Pipfile.lock generated Normal file

File diff suppressed because it is too large Load diff

View file

@ -48,13 +48,13 @@ Beside the plugin system, audiencekit's biggest feature is that it streams the e
## Manual / Development Install ## Manual / Development Install
1. Install dependencies 1. Install dependencies
You'll need Python 3.10, PDM, and some audio libraries. Not to fear though, there is [instructions on how to do that for your specific OS!](https://git.skeh.site/skeh/ovtk_audiencekit/wiki/Dependency-Setup) You'll need Python 3.10 or above, pipenv, JACK and PortAudio libraries. Not to fear though, there is [instructions on how to do that for your specific OS!](https://git.skeh.site/skeh/ovtk_audiencekit/wiki/Dependency-Setup)
2. Download the repository and open a terminal inside it 2. Download the repository and open a terminal inside it
Extract the [tar](https://git.skeh.site/skeh/ovtk_audiencekit/archive/main.tar.gz) / [zip](https://git.skeh.site/skeh/ovtk_audiencekit/archive/main.zip), or clone via `git`~ Extract the [tar](https://git.skeh.site/skeh/ovtk_audiencekit/archive/main.tar.gz) / [zip](https://git.skeh.site/skeh/ovtk_audiencekit/archive/main.zip), or clone via `git`~
3. Run `pdm sync` 3. Run `pipenv install`
Some dependencies may fail to install initially, but should be retried using a method that works automatically (your terminal should show "Retrying initially failed dependencies..." or similar). Some dependencies may fail to install initially, but should be retried using a method that works automatically (your terminal should show "Retrying initially failed dependencies..." or similar).
@ -68,15 +68,15 @@ If you want some more tutorializing, head on over to the [Wiki](https://git.skeh
+ Read up a bit on [KDL's syntax][kdl] if you haven't already + Read up a bit on [KDL's syntax][kdl] if you haven't already
+ Crack open your favorite text editor and follow the instructions in the `config.kdl` file + Crack open your favorite text editor and follow the instructions in the `config.kdl` file
+ Open a terminal in the project location and start it using either: + Open a terminal in the project location and start it using either:
+ `pdm run start`, or + `pipenv run start`, or
+ [activate the venv](https://pdm.fming.dev/latest/usage/venv/#activate-a-virtualenv) once per terminal session, and then use `./audiencekit.py start` thereafter. This is recommended for development usage. + `pipenv shell` once per terminal session, and then `./audiencekit.py start` thereafter
+ Test your configuration using the `ws` subcommand (`pdm run ws` or `./audiencekit.py ws`) + Test your configuration using the `ws` subcommand (`pipenv run ws` or `./audiencekit.py ws`)
+ The built-in CLI will help for this and more! Just add "--help" to the end of anything, ie `./audiencekit.py --help`, `pdm run ws mkevent --help`, etc + The built-in CLI will help for this and more! Just add "--help" to the end of anything, ie `./audiencekit.py --help`, `pipenv run ws mkevent --help`, etc
+ Make mistakes and [ask questions](https://birb.space/@skeh)! + Make mistakes and [ask questions](https://birb.space/@skeh)!
----------- -----------
Made with :heart: by the [Vtopia Collective][vtopia] and contributors Made with :heart: by the [Vtopia Collective][vtopia] and contributers
[kdl]: https://kdl.dev [kdl]: https://kdl.dev

View file

@ -1,6 +1,4 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import sys
sys.path.insert(0, 'src')
from ovtk_audiencekit.cli import cli from ovtk_audiencekit.cli import cli
if __name__ == '__main__': if __name__ == '__main__':

View file

@ -1,12 +1,12 @@
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from multiprocessing import Process, Pipe, Manager from multiprocessing import Process, Pipe, Manager
from traceback import format_exception
import sys import sys
import os import os
import json import json
import logging import logging
from ovtk_audiencekit.events import Event from ovtk_audiencekit.events import Event
from ovtk_audiencekit.utils import format_exception
class GracefulShutdownException(Exception): class GracefulShutdownException(Exception):
@ -133,7 +133,7 @@ class ChatProcess(Process, ABC):
return 0 return 0
except Exception as e: except Exception as e:
self.logger.error(f'Uncaught exception in {self._name}: {e}') self.logger.error(f'Uncaught exception in {self._name}: {e}')
self.logger.debug(format_exception(e)) self.logger.debug(''.join(format_exception(None, e, e.__traceback__)))
return 1 return 1
finally: finally:
self.on_exit() self.on_exit()

View file

@ -27,8 +27,6 @@ class TwitchProcess(ChatProcess):
botname=None, emote_res=4.0, botname=None, emote_res=4.0,
# EventSub options # EventSub options
eventsub=True, eventsub_host='wss://ovtk.skeh.site/twitch', eventsub=True, eventsub_host='wss://ovtk.skeh.site/twitch',
# BTTV integration
bttv=False,
# Inheritance boilerplate # Inheritance boilerplate
**kwargs): **kwargs):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
@ -63,7 +61,7 @@ class TwitchProcess(ChatProcess):
self.eventsub = TwitchEventSub(self.api, eventsub_host) self.eventsub = TwitchEventSub(self.api, eventsub_host)
self._sources.append(self.eventsub) self._sources.append(self.eventsub)
self.bttv = BTTV(target_data['user']['id']) if bttv else None self.bttv = BTTV(target_data['user']['id'])
def loop(self, next_state): def loop(self, next_state):
@ -100,7 +98,7 @@ class TwitchProcess(ChatProcess):
for event in chain(*(source.read(0.1) for source in self._sources)): for event in chain(*(source.read(0.1) for source in self._sources)):
# Retarget event # Retarget event
event.via = self._name event.via = self._name
if self.bttv and isinstance(event, Message): if isinstance(event, Message):
event = self.bttv.hydrate(event) event = self.bttv.hydrate(event)
self.publish(event) self.publish(event)
return 0 return 0

View file

@ -2,7 +2,7 @@ import json
import logging import logging
from ovtk_audiencekit.utils import NonBlockingWebsocket from ovtk_audiencekit.utils import NonBlockingWebsocket
from ovtk_audiencekit.core.Data import ovtk_user_id from ovtk_audiencekit.core.Config import ovtk_user_id
from ovtk_audiencekit.events import Follow from ovtk_audiencekit.events import Follow
from ..Events import ChannelPointRedemption from ..Events import ChannelPointRedemption

View file

@ -1,5 +1,4 @@
import logging import logging
import warnings
import click import click
from blessed import Terminal from blessed import Terminal
@ -16,11 +15,8 @@ class CustomFormatter(logging.Formatter):
logging.CRITICAL: term.white_on_red_bold logging.CRITICAL: term.white_on_red_bold
} }
def __init__(self, show_time, show_loc): def __init__(self, show_time):
if show_loc:
format = "%(levelname)s:%(name)s (%(filename)s:%(lineno)d): %(message)s" format = "%(levelname)s:%(name)s (%(filename)s:%(lineno)d): %(message)s"
else:
format = "%(levelname)s:%(name)s: %(message)s"
if show_time: if show_time:
format = "%(asctime)s:" + format format = "%(asctime)s:" + format
super().__init__(format) super().__init__(format)
@ -47,7 +43,7 @@ def cli(loglevel, show_time=False):
logger.setLevel(loglevel) logger.setLevel(loglevel)
log_handler = logging.StreamHandler() log_handler = logging.StreamHandler()
log_handler.setLevel(loglevel) log_handler.setLevel(loglevel)
log_handler.setFormatter(CustomFormatter(show_time, loglevel==logging.DEBUG)) log_handler.setFormatter(CustomFormatter(show_time))
logger.addHandler(log_handler) logger.addHandler(log_handler)
# Quiet the depencency loggers # Quiet the depencency loggers
logging.getLogger('websockets.server').setLevel(logging.WARN) logging.getLogger('websockets.server').setLevel(logging.WARN)
@ -57,8 +53,3 @@ def cli(loglevel, show_time=False):
logging.getLogger('simpleobsws').setLevel(logging.INFO) logging.getLogger('simpleobsws').setLevel(logging.INFO)
logging.getLogger('quart.serving').setLevel(logging.WARN) logging.getLogger('quart.serving').setLevel(logging.WARN)
logging.getLogger('numba').setLevel(logging.WARN) logging.getLogger('numba').setLevel(logging.WARN)
logging.getLogger('hypercorn.error').setLevel(logging.WARN)
logging.getLogger('hypercorn.access').setLevel(logging.WARN)
# Quiet warnings
if loglevel > logging.DEBUG:
warnings.filterwarnings("ignore")

View file

@ -0,0 +1,20 @@
import logging
import click
from ovtk_audiencekit.core import MainProcess
from .group import cli
logger = logging.getLogger(__name__)
@cli.command()
@click.argument('config_file', type=click.Path('r'), default='config.kdl')
@click.option('--port', default='8080')
@click.option('--bind', default='127.0.0.1')
def start(config_file, port=None, bind=None):
"""Start audiencekit server"""
logger.info('Hewwo!!')
main = MainProcess(config_file, port, bind)
main.run()
logger.info('Bye bye~')

View file

@ -25,104 +25,8 @@ os.close(old_stderr)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class Clip:
def __init__(self, path, samplerate=None, speed=1, keep_pitch=True, force_stereo=True):
self.path = path
raw, native_rate = librosa.load(self.path, sr=None, dtype='float32', mono=False)
self.channels = raw.shape[0] if len(raw.shape) == 2 else 1 def check_rate(index, channels, rate):
if force_stereo and self.channels == 1:
raw = np.resize(raw, (2,*raw.shape))
self.channels = 2
self.samplerate = samplerate or native_rate
if native_rate != self.samplerate:
raw = librosa.resample(raw, native_rate, self.samplerate, fix=True, scale=True)
self.raw = np.ascontiguousarray(self._stereo_transpose(raw), dtype='float32')
if speed != 1:
self.stretch(speed, keep_pitch=keep_pitch)
@property
def length(self):
return self.raw.shape[0] / self.samplerate
def _stereo_transpose(self, ndata):
return ndata if self.channels == 1 else ndata.T
def stretch(self, speed, keep_pitch=True):
if keep_pitch:
stretched = tsm.wsola(self._stereo_transpose(self.raw), speed)
else:
stretched = librosa.resample(self._stereo_transpose(self.raw), self.samplerate * (1 / speed), self.samplerate, fix=False, scale=True)
self.raw = np.ascontiguousarray(self._stereo_transpose(stretched), dtype='float32')
def save(self, filename):
soundfile.write(filename, self._stereo_transpose(self.raw), self.samplerate)
class Stream:
def __init__(self, clip, output_index, buffer_length=4096):
self.clip = clip
self.pos = 0
self.playing = False
self._end_event = AioEvent()
self._stream = pyaudio.open(
output_device_index=output_index,
format=pya.paFloat32,
channels=self.clip.channels,
rate=self.clip.samplerate,
frames_per_buffer=buffer_length,
output=True,
stream_callback=self._read_callback,
start=False)
def _play(self):
self.playing = True
self.pos = 0
if not self._stream.is_active():
self._stream.start_stream()
def play(self):
self._end_event.clear()
self._play()
self._end_event.wait(timeout=self.clip.length)
async def aplay(self):
self._end_event.clear()
self._play()
try:
await self._end_event.coro_wait(timeout=self.clip.length)
except asyncio.CancelledError:
self.playing = False
self._stream.stop_stream()
def close(self):
self._stream.stop_stream()
self._stream.close()
def _read_callback(self, in_data, frame_count, time_info, status):
if self.clip.channels > 1:
buffer = np.zeros((frame_count, self.clip.channels), dtype='float32')
else:
buffer = np.zeros((frame_count,), dtype='float32')
if self.playing:
newpos = self.pos + frame_count
clip_chunk = self.clip.raw[self.pos:newpos]
self.pos = newpos
buffer[0:clip_chunk.shape[0]] = clip_chunk
if self.pos >= self.clip.raw.shape[0]:
self.playing = False
self._end_event.set()
return buffer, pya.paContinue
@staticmethod
def check_rate(index, channels, rate):
try: try:
return pyaudio.is_format_supported(rate, return pyaudio.is_format_supported(rate,
output_channels=channels, output_channels=channels,
@ -131,6 +35,99 @@ class Stream:
except ValueError: except ValueError:
return False return False
alt_rates = [44100, 48000]
class Clip:
def __init__(self, path, output_index, buffer_length=2048, speed=1, force_stereo=True):
_raw, native_rate = librosa.load(path, sr=None, dtype='float32', mono=False)
self._channels = _raw.shape[0] if len(_raw.shape) == 2 else 1
if force_stereo and self._channels == 1:
_raw = np.resize(_raw, (2,*_raw.shape))
self._channels = 2
target_samplerate = native_rate
if not check_rate(output_index, self._channels , native_rate):
try:
target_samplerate = next((rate for rate in alt_rates if check_rate(output_index, self._channels , rate)))
except StopIteration:
logger.warn('Target audio device does not claim to support any sample rates! Attempting playback at native rate')
self._samplerate = target_samplerate
if native_rate != self._samplerate:
_raw = librosa.resample(_raw, native_rate, self._samplerate, fix=True, scale=True)
self._raw = np.ascontiguousarray(self._stereo_transpose(_raw), dtype='float32')
if speed != 1:
self.stretch(speed)
self._pos = 0
self._playing = False
self._end_event = AioEvent()
self._stream = pyaudio.open(
output_device_index=output_index,
format=pya.paFloat32,
channels=self._channels,
rate=self._samplerate,
frames_per_buffer=buffer_length,
output=True,
stream_callback=self._read_callback,
start=False)
@property
def length(self):
return self._raw.shape[0] / self._samplerate
def _stereo_transpose(self, ndata):
return ndata if self._channels == 1 else ndata.T
def stretch(self, speed):
stretched = tsm.wsola(self._stereo_transpose(self._raw), speed)
self._raw = np.ascontiguousarray(self._stereo_transpose(stretched), dtype='float32')
def save(self, filename):
soundfile.write(filename, self._stereo_transpose(self._raw), self._samplerate)
def _play(self):
self._playing = True
self._pos = 0
if not self._stream.is_active():
self._stream.start_stream()
def play(self):
self._play()
self._end_event.wait(timeout=self.length)
async def aplay(self):
self._end_event.clear()
self._play()
try:
await self._end_event.coro_wait(timeout=self.length)
except asyncio.CancelledError:
self._playing = False
self._stream.stop_stream()
def close(self):
self._stream.close()
def _read_callback(self, in_data, frame_count, time_info, status):
if self._channels > 1:
buffer = np.zeros((frame_count, self._channels), dtype='float32')
else:
buffer = np.zeros((frame_count,), dtype='float32')
if self._playing:
newpos = self._pos + frame_count
clip_chunk = self._raw[self._pos:newpos]
self._pos = newpos
buffer[0:clip_chunk.shape[0]] = clip_chunk
if self._pos >= self._raw.shape[0]:
self._playing = False
self._end_event.set()
return buffer, pya.paContinue
@staticmethod @staticmethod
def find_output_index(output): def find_output_index(output):
if output is None: if output is None:

View file

@ -0,0 +1,42 @@
import os
import random
import appdirs
from kdl import ParseConfig
def csv_parser(text, fragment):
data = fragment.fragment[1:-1]
return [field.strip() for field in data.split(',')]
def semisv_parser(text, fragment):
data = fragment.fragment[1:-1]
return [field.strip() for field in data.split(';')]
class Secret(String):
pass
def secrets_parser(text, fragment):
name = fragment.fragment[1:-1]
return Secret(name)
customParsers = {
'list': csv_parser,
'csv': csv_parser,
'semisv': semisv_parser,
'secret': secrets_parser,
}
kdl_parse_config = ParseConfig(valueConverters=customParsers)
kdl_reserved = ['secrets', 'chat', 'plugin', 'import']
CACHE_DIR = appdirs.user_cache_dir('audiencekit', 'ovtk')
DATA_DIR = appdirs.user_data_dir('audiencekit', 'ovtk')
_user_id_path = os.path.join(DATA_DIR, 'ovtk_user_id')
if not os.path.isfile(_user_id_path):
ovtk_user_id = '%0x' % random.getrandbits(16 * 4) # Non-cryptographic (just needs to be unique)
os.makedirs(os.path.dirname(_user_id_path), exist_ok=True)
with open(_user_id_path, 'w+') as f:
f.write(ovtk_user_id)
else:
with open(_user_id_path, 'r') as f:
ovtk_user_id = f.read()

View file

@ -0,0 +1,241 @@
import importlib
from multiprocessing import Lock
import asyncio
from datetime import datetime, timedelta
from traceback import format_exception
import logging
import os
import kdl
from click import progressbar
from aioscheduler import TimedScheduler
from quart import Quart
from ovtk_audiencekit.core import WebsocketServerProcess
from ovtk_audiencekit.core.Config import kdl_parse_config, kdl_reserved
from ovtk_audiencekit.events import Event, Delete
from ovtk_audiencekit.chats.ChatProcess import ShutdownRequest
from ovtk_audiencekit.plugins import builtins
from ovtk_audiencekit.plugins.PluginBase import PluginError
logger = logging.getLogger(__name__)
def parse_kdl_deep(path, relativeto=None):
if relativeto:
path = os.path.normpath(os.path.join(relativeto, path))
with open(path, 'r') as f:
try:
config = kdl.parse(f.read(), kdl_parse_config)
except kdl.errors.ParseError as e:
e.file = path
raise e
for node in config.nodes:
if node.name == 'import':
yield from parse_kdl_deep(node.args[0], relativeto=os.path.dirname(path))
else:
yield node
class MainProcess:
def __init__(self, config_path, port, bind):
super().__init__()
self.config_path = config_path
self.port = port
self.bind = bind
self.chat_processes = {}
self.plugins = {}
@staticmethod
def get_external_module_names(node, store):
if len(node.args) == 0:
raise ValueError(f'Invalid arguments - usage: {node.name} module')
nameparts = node.args[0].split(':')
module_name = nameparts[0]
instance_name = nameparts[1] if len(nameparts) == 2 else module_name
if store.get(instance_name):
if instance_name != module_name:
raise ValueError(f"Multiple nodes named {instance_name}, please specify unique names as the second argument")
else:
raise ValueError(f"Multiple definitions of {instance_name} exist, please specify unique names as the second argument")
return module_name, instance_name
async def handle_events(self):
while True:
event = await self.event_queue.get()
logger.info(event)
if isinstance(event, Event):
for plugin_name, plugin in list(self.plugins.items()):
try:
event = plugin.on_bus_event(event)
if asyncio.iscoroutinefunction(plugin.on_bus_event):
event = await event
except PluginError as e:
if e.fatal:
logger.critical(f'Failure when processing {e.source} ({e}) - disabling...')
else:
logger.warning(f'Encounterd error when processing {e.source} ({e})')
logger.debug(''.join(format_exception(None, e, e.__traceback__)))
if e.fatal:
self.plugins[e.source].__del__()
del self.plugins[e.source]
except Exception as e:
logger.critical(f'Failure when processing {plugin_name} ({e}) - disabling...')
logger.debug(''.join(format_exception(None, e, e.__traceback__)))
self.plugins[plugin_name].__del__()
del self.plugins[plugin_name]
if event is None:
break
else:
self.server_process.message_pipe.send(event)
logger.debug(f'Event after plugin chain - {event}')
elif isinstance(event, Delete):
self.server_process.message_pipe.send(event)
else:
logger.error(f'Unknown data in event loop - {event}')
def setup(self):
config = kdl.Document(list(parse_kdl_deep(self.config_path)))
stdin_lock = Lock()
# Load secrets
secrets = {}
if node := config.get('secrets'):
for module in node.nodes:
fields = secrets.get(module.name, {})
for node in module.nodes:
fields[node.name] = node.args[0] if len(node.args) == 1 else node.args
secrets[module.name] = fields
# Dynamically import chats
with progressbar(list(config.getAll('chat')), label="Preparing modules (chats)", item_show_func=lambda i: i and i.args[0]) as bar:
for node in bar:
module_name, chat_name = self.get_external_module_names(node, self.chat_processes)
secrets_for_mod = secrets.get(module_name, {})
try:
chat_module = importlib.import_module(f'.{module_name}', package='ovtk_audiencekit.chats')
chat_process = chat_module.Process(stdin_lock, chat_name, **node.props, **secrets_for_mod)
self.chat_processes[chat_name] = chat_process
except Exception as e:
raise ValueError(f'Failed to initalize {module_name} module "{chat_name}" - {e}')
if len(self.chat_processes.keys()) == 0:
logger.warning('No chats configured!')
# Start chat processes
for process in self.chat_processes.values():
process.start()
# Load plugins
self.event_queue = asyncio.Queue()
## Builtins
for node_name in builtins.__all__:
self.plugins[node_name] = builtins.__dict__[node_name](self.chat_processes, self.event_queue, node_name)
## Dynamic
with progressbar(list(config.getAll('plugin')), label="Preparing modules (plugins)", item_show_func=lambda i: i and i.args[0]) as bar:
for node in bar:
module_name, plugin_name = self.get_external_module_names(node, self.plugins)
secrets_for_mod = secrets.get(module_name, {})
try:
plugin_module = importlib.import_module(f'.{module_name}', package='ovtk_audiencekit.plugins')
plugin = plugin_module.Plugin(self.chat_processes, self.event_queue, plugin_name,
**node.props, **secrets_for_mod, _children=node.nodes)
self.plugins[plugin_name] = plugin
# Register UI with webserver
self.webserver.register_blueprint(plugin.blueprint, url_prefix=f'/{plugin_name}')
except Exception as e:
raise ValueError(f'Failed to initalize {module_name} plugin "{plugin_name}" - {e}')
# Run plugin definitions
with progressbar(list(config.nodes), label=f"Executing {self.config_path}") as bar:
for node in bar:
if node.name in kdl_reserved:
continue
plugin_name = node.name
plugin_module = self.plugins.get(plugin_name)
if plugin_module is None:
logger.error(f'Unknown plugin: {node.name}')
else:
asyncio.get_event_loop().run_until_complete(plugin_module._run(*node.args, **node.props, _children=node.nodes))
# Register watchable handles
self.pipes = [process.event_pipe for process in self.chat_processes.values()]
self.pipes.append(self.server_process.message_pipe)
async def tick_plugins(self):
for plugin in self.plugins.values():
res = plugin.tick(0.5)
if asyncio.iscoroutinefunction(plugin.tick):
await res
self._skehdule.schedule(self.tick_plugins(), datetime.utcnow() + timedelta(seconds=0.5))
def run(self):
# Start websocket server
self.server_process = WebsocketServerProcess(self.port, self.bind)
self.server_process.start()
self.webserver = Quart(__name__)
loop = asyncio.get_event_loop()
sys_tasks = []
try:
# Do initial setup
self.setup()
self._skehdule = TimedScheduler()
sys_tasks.append(loop.create_task(self.tick_plugins()))
sys_tasks.append(loop.create_task(self.handle_events()))
async def start_scheduler():
self._skehdule.start()
sys_tasks.append(loop.create_task(start_scheduler()))
async def start_uiwebserver():
try:
# HACK: eats the KeyboardInterrupt - maybe others too
await self.webserver.run_task()
except KeyboardInterrupt:
pass
except Exception as e:
logger.critical(f'Failure in web process - {e}')
logger.debug(''.join(format_exception(None, e, e.__traceback__)))
finally:
raise KeyboardInterrupt()
sys_tasks.append(loop.create_task(start_uiwebserver()))
def get_event(pipe):
event = pipe.recv()
self.event_queue.put_nowait(event)
for pipe in self.pipes:
# REVIEW: This does not work on windows!!!!
loop.add_reader(pipe.fileno(), lambda pipe=pipe: get_event(pipe))
logger.info('Ready to rumble!')
loop.run_forever()
except KeyboardInterrupt:
pass
except kdl.errors.ParseError as e:
if (e.file):
logger.critical(f'Invalid configuration in {e.file} at {e.line}:{e.col} - {e.msg}')
else:
logger.critical(f'Invalid configuration - {e.msg}')
except Exception as e:
logger.critical(f'Failure in core process - {e}')
logger.debug(''.join(format_exception(None, e, e.__traceback__)))
finally:
logger.warn('Closing up shop...')
for task in sys_tasks:
task.cancel()
for process in self.chat_processes.values():
process.control_pipe.send(ShutdownRequest('root'))
process.join(5)
if process.exitcode is None:
process.terminate()
self.server_process.terminate()

View file

@ -12,7 +12,7 @@ logger = logging.getLogger(__name__)
class WebsocketServerProcess(Process): class WebsocketServerProcess(Process):
def __init__(self, bind, port): def __init__(self, port, bind):
super().__init__() super().__init__()
self._bind = bind self._bind = bind

View file

@ -1,3 +1,3 @@
from .WebsocketServerProcess import WebsocketServerProcess from .WebsocketServerProcess import WebsocketServerProcess
from .MainProcess import MainProcess from .MainProcess import MainProcess
from .Audio import Clip, Stream from .Clip import Clip

View file

@ -0,0 +1,27 @@
import asyncio
from ovtk_audiencekit.plugins import PluginBase
from ovtk_audiencekit.core import Clip
class AudioAlert(PluginBase):
def __init__(self, *args, output=None, buffer_length=2048, cutoff_prevention_buffers=None, **kwargs):
super().__init__(*args, **kwargs)
if cutoff_prevention_buffers:
self.logger.info('`cutoff_prevention_buffers` are depricated')
self.sounds = {}
self._buffer_length = int(buffer_length)
self._output_index = Clip.find_output_index(output)
def run(self, path, speed=1, immediate=True, **kwargs):
if self.sounds.get(path) is None:
self.sounds[path] = Clip(path,
self._output_index,
buffer_length=self._buffer_length,
speed=speed)
sound = self.sounds.get(path)
if immediate:
asyncio.create_task(sound.aplay())
else:
sound.play()

View file

@ -0,0 +1,60 @@
import random
import asyncio
from enum import Enum, auto
from ovtk_audiencekit.plugins import PluginBase
from ovtk_audiencekit.events import Event, Message, SysMessage
from ovtk_audiencekit.events.Message import USER_TYPE
test_users = [
('Random user', '123123', USER_TYPE.USER),
('Some Guy', '723894', USER_TYPE.PATRON),
('xX_ButtSlayerMan1967_Xx', '324234', USER_TYPE.VIP),
('The hacker known as Anonymous', '1337', USER_TYPE.ANON),
('My name is uncessisarily long why does yt allow this', '123786986', USER_TYPE.USER),
('Taco Bell official (i wish)', '8979823', USER_TYPE.PATRON),
('skeh', '420', USER_TYPE.OWNER),
('chat maid', '6969', USER_TYPE.MODERATOR),
]
test_messages = [
('Another fake message from a fake fan (lol)', None),
('Why play games when you could eat bean', None),
('pog more like log', None),
('now thats what i call epic', None),
('POG', None),
('oh yeah, thats one neat chat', None),
('lmao fake chat', None),
(' i like m y whitespace ', None),
('no i \n\n\n like my\nwhite\nspace', None),
('Thanks for coming to my ted talk. Tonight, I discuss what exactly it means to be your little pogchamp, and how "come here" is actually propoganda in disquise. I am very good at parties i swear, please come to my party p l ea se', None),
('USRE VERY EXCITE POGGGG POG POGGGGGGGGGGGGGGGGGGGGGGGG POGPOGPOGGGG', None),
('spaamamspmapmdpmaspmspsapmspmapsmpasmspmapmpasmspmapmspampsmpaspaspamapmspmapmspmapsmpamspamspmapsmpmaspmapmspamspmapsmpamspmpamspms', None),
('poggy woggy freebie deeby', 0),
('Hey do you want a penny', 0.01),
('show feet', 10),
('whats up guys suspcicously wealthy furry here', 1_000),
]
class FakeChat(PluginBase):
def __init__(self, *args, max_delay=10, max_messages_per_chunk=1, **kwargs):
super().__init__(*args, **kwargs)
self._max_delay = max_delay
self._max_messages_per_chunk = max_messages_per_chunk
self._readtask = asyncio.get_event_loop().create_task(self.loop())
async def loop(self):
while True:
while range(int(random.random() * (self._max_messages_per_chunk + 1))):
author_name, author_id, author_type = random.choice(test_users)
text, monitization = random.choice(test_messages)
fake_message = Message(self._name, text,
author_name, author_id, author_type,
monitization=monitization)
self.send_to_bus(fake_message)
await asyncio.sleep(random.random() * self._max_delay)
def run(self):
pass

View file

@ -0,0 +1 @@
from .FakeChat import FakeChat as Plugin

View file

@ -8,7 +8,7 @@ from requests.exceptions import HTTPError
from owoify.owoify import owoify, Owoness from owoify.owoify import owoify, Owoness
from ovtk_audiencekit.plugins import PluginBase from ovtk_audiencekit.plugins import PluginBase
from ovtk_audiencekit.core.Data import CACHE_DIR from ovtk_audiencekit.core.Config import CACHE_DIR
from ovtk_audiencekit.plugins.builtins.Command import Command, CommandTypes from ovtk_audiencekit.plugins.builtins.Command import Command, CommandTypes
from ovtk_audiencekit.events.Message import Message, SysMessage from ovtk_audiencekit.events.Message import Message, SysMessage
@ -35,7 +35,8 @@ owomap = {
} }
class JailPlugin(PluginBase): class JailPlugin(PluginBase):
def setup(self, min_level='vip', persist=True): def __init__(self, *args, min_level='vip', persist=True, **kwargs):
super().__init__(*args, **kwargs)
self.persist = persist self.persist = persist
self._cache = os.path.join(CACHE_DIR, 'Jail', 'sentences') self._cache = os.path.join(CACHE_DIR, 'Jail', 'sentences')
os.makedirs(os.path.dirname(self._cache), exist_ok=True) os.makedirs(os.path.dirname(self._cache), exist_ok=True)

View file

@ -0,0 +1 @@
from .misskey import MisskeyChannel as Plugin

View file

@ -0,0 +1,76 @@
import json
import random
import asyncio
from enum import Enum, auto
from itertools import chain
import websockets
from ovtk_audiencekit.plugins import PluginBase
from ovtk_audiencekit.events.Message import Message, USER_TYPE
class MisskeyChannel(PluginBase):
def __init__(self, *args, instance=None, channel=None, token=None, **kwargs):
super().__init__(*args, **kwargs)
self._url = f'wss://{instance}/streaming'
if token:
self._url += f'?token={token}'
self.channelId = channel
self._subid = '%0x' % random.getrandbits(16 * 4)
self._readtask = asyncio.get_event_loop().create_task(self.read())
async def read(self):
async for ws in websockets.connect(self._url):
try:
await self.setup(ws)
while True:
note_event = await ws.recv()
try:
misskey_event = json.loads(note_event)
if misskey_event['body']['id'] == self._subid and misskey_event['body']['type'] == 'note':
note = misskey_event['body']['body'] # lol
norm = None
try:
norm = self.normalize_event(note)
except Exception as e:
self.logger.error(f'Failed to process note data: {note}')
if norm:
self.send_to_bus(norm)
except (KeyError, json.JSONDecodeError):
self.logger.error(f'Unknown data in websocket: {note_event}')
except websockets.ConnectionClosed:
self.logger.error(f'Websocket disconnected! Retrying in a bit...')
continue
async def setup(self, ws):
payload = {
'type': 'connect',
'body': {
'channel': 'channel', # LOL
'id': self._subid,
'params': {
'channelId': self.channelId,
}
}
}
await ws.send(json.dumps(payload))
def normalize_event(self, event):
user_name = event['user']['name'] or event['user']['username']
user_id = event['user']['id']
text = event.get('text', '')
attachments = [(file['type'], file['url']) for file in event.get('files', [])]
emojis = {emoji['name']: emoji['url'] for emoji in chain(event.get('emojis', []), event['user'].get('emojis', []))}
if text or attachments:
msg = Message(self._name, text or '',
user_name, user_id, USER_TYPE.USER,
id=event['id'], emotes=emojis or None,
attachments=attachments or None)
return msg
return None
def run(self):
pass

View file

@ -6,10 +6,14 @@ from ovtk_audiencekit.plugins import PluginBase
class OBSWSPlugin(PluginBase): class OBSWSPlugin(PluginBase):
async def setup(self, password=None, uri='ws://localhost:4455'): def __init__(self, *args, password=None, uri='ws://localhost:4455', **kwargs):
super().__init__(*args, **kwargs)
self.uri = uri self.uri = uri
self.obsws = simpleobsws.WebSocketClient(url=uri, password=password) self.obsws = simpleobsws.WebSocketClient(url=uri, password=password)
asyncio.get_event_loop().run_until_complete(self.setup())
async def setup(self):
await self.obsws.connect() await self.obsws.connect()
success = await self.obsws.wait_until_identified() success = await self.obsws.wait_until_identified()
if not success: if not success:

View file

@ -3,7 +3,7 @@ import logging
import json import json
import os import os
from ovtk_audiencekit.core.Data import CACHE_DIR from ovtk_audiencekit.core.Config import CACHE_DIR
from ovtk_audiencekit.plugins import PluginBase from ovtk_audiencekit.plugins import PluginBase
from ovtk_audiencekit.events import Message from ovtk_audiencekit.events import Message
@ -64,7 +64,8 @@ class PhraseCounter:
class PhraseCounterPlugin(PluginBase): class PhraseCounterPlugin(PluginBase):
def setup(self, debounce_time=1, persist=False): def __init__(self, *args, debounce_time=1, persist=False, **kwargs):
super().__init__(*args, **kwargs)
self.debounce_time = debounce_time self.debounce_time = debounce_time
self.persist = persist self.persist = persist
@ -103,7 +104,7 @@ class PhraseCounterPlugin(PluginBase):
return event return event
def run(self, *args, _children=None, _ctx={}, **kwargs): def run(self, *args, _children=None, **kwargs):
if len(_children) != 1: if len(_children) != 1:
raise ValueError('Requires a template child') raise ValueError('Requires a template child')
template = _children[0] template = _children[0]

View file

@ -0,0 +1,133 @@
from dataclasses import asdict
from abc import ABC, abstractmethod
from functools import reduce
from operator import getitem
from string import Formatter
import logging
import asyncio
import kdl
from quart import Blueprint
from ovtk_audiencekit.core.Config import kdl_parse_config
class PluginError(Exception):
def __init__(self, source, message, fatal=True):
self.source = source
self.message = message
self.fatal = fatal
def __str__(self):
return self.message
class GetitemFormatter(Formatter):
def get_field(self, field_name, args, kwargs):
keys = field_name.split('.')
field = reduce(getitem, keys, kwargs)
return (field, keys[0])
class PluginBase(ABC):
plugins = {}
def __init__(self, chat_processes, event_queue, name, _children=None, **kwargs):
super().__init__(**kwargs)
self.chats = chat_processes
self._event_queue = event_queue
self._name = name
self.logger = logging.getLogger(f'plugin.{self._name}')
self.plugins[name] = self
self.blueprint = Blueprint(self._name, __name__)
if _children:
raise ValueError('Module does not accept children')
def __del__(self):
if self.plugins.get(self._name) == self:
del self.plugins[self._name]
# Base class helpers
def broadcast(self, event):
"""Send event to every active chat"""
for proc in self.chats.values():
if proc.readonly:
continue
proc.control_pipe.send(event)
@staticmethod
def _kdl_arg_formatter(text, fragment, args):
key = fragment.fragment[1:-1]
try:
if '{' in key:
return GetitemFormatter().format(key, **args).replace(r'\"', '"')
else:
return reduce(getitem, key.split('.'), args)
except (KeyError, IndexError) as e:
raise ValueError(f'Invalid arg string - "{key}": {e}') from e
def fill_context(self, actionnode, ctx):
config = asdict(kdl_parse_config)
config['valueConverters'] = {
**config['valueConverters'],
'arg': lambda text, fragment, args=ctx: self._kdl_arg_formatter(text, fragment, args),
}
config = kdl.ParseConfig(**config)
newnode = kdl.parse(str(actionnode), config).nodes[0]
return newnode
async def call_plugin_from_kdl(self, node, *args, _ctx={}, **kwargs):
"""
Calls some other plugin as configured by the passed KDL node
If this was done in response to an event, pass it as event in _ctx!
"""
node = self.fill_context(node, _ctx)
target = self.plugins.get(node.name)
if target is None:
self.logger.warning(f'Could not find plugin or builtin with name {node.name}')
else:
return await target._run(*node.args, *args, **node.props, _ctx=_ctx, **kwargs, _children=node.nodes)
def send_to_bus(self, event):
"""
Send an event to the event bus
WARNING: This will cause the event to be processed by other plugins - be careful not to cause an infinite loop!
"""
self._event_queue.put_nowait(event)
async def _run(self, *args, **kwargs):
try:
res = self.run(*args, **kwargs)
if asyncio.iscoroutinefunction(self.run):
return await res
else:
return res
except Exception as e:
if isinstance(e, KeyboardInterrupt):
raise e
raise PluginError(self._name, str(e)) from e
# User-defined
async def tick(self, dt):
"""Called at least every half second - perform time-dependent updates here!"""
pass
async def on_bus_event(self, event):
"""Called for every event from the chats"""
return event
async def on_control_event(self, event):
"""
Called for events targeting this plugin name specifically.
This is normally used for other applications to communicate with this one over the websocket interface
"""
pass
@abstractmethod
async def run(self, _children=None, _ctx={}, **kwargs):
"""
Run plugin action, either due to a definition in the config, or due to another plugin
"""
pass

View file

@ -9,8 +9,10 @@ from ovtk_audiencekit.chats.Twitch import Process as Twitch
class ShoutoutPlugin(PluginBase): class ShoutoutPlugin(PluginBase):
def setup(self, command='so', min_level='vip', def __init__(self, *args, command='so', min_level='vip',
text='Check out {link}!~ They were last streaming {last_game}'): text='Check out {link}!~ They were last streaming {last_game}',
**kwargs):
super().__init__(*args, **kwargs)
self.text = text self.text = text
if command: if command:
self.command = Command(name=command, help='Shoutout another user', required_level=min_level) self.command = Command(name=command, help='Shoutout another user', required_level=min_level)

View file

@ -8,28 +8,24 @@ from TTS.config import load_config
from ovtk_audiencekit.plugins import PluginBase from ovtk_audiencekit.plugins import PluginBase
from ovtk_audiencekit.events import Message, SysMessage from ovtk_audiencekit.events import Message, SysMessage
from ovtk_audiencekit.core import Clip, Stream from ovtk_audiencekit.core import Clip
from ovtk_audiencekit.core.Data import CACHE_DIR from ovtk_audiencekit.core.Config import CACHE_DIR
class TextToSpeechPlugin(PluginBase): class TextToSpeechPlugin(PluginBase):
def setup(self, output=None, cuda=None, def __init__(self, *args, output=None, cuda=None,
engine="tts_models/en/ljspeech/tacotron2-DDC", speaker_wav=None, **kwargs): engine="tts_models/en/ljspeech/tacotron2-DDC", speaker_wav=None,
_children=None, **kwargs):
super().__init__(*args, _children=_children)
self.speaker_wav = speaker_wav self.speaker_wav = speaker_wav
self.output_index = Stream.find_output_index(output) self._output_index = Clip.find_output_index(output)
sample_rate = None
try:
sample_rate = next((rate for rate in [44100, 48000] if Stream.check_rate(self.output_index, 1, rate)))
except StopIteration:
self.logger.warn('Target audio device does not claim to support common sample rates! Attempting playback at native rate of audio')
self.sample_rate = sample_rate
conf_overrides = {k[2:]: v for k, v in kwargs.items() if k.startswith('o_')} conf_overrides = {k[2:]: v for k, v in kwargs.items() if k.startswith('o_')}
self.cache_dir = os.path.join(CACHE_DIR, 'tts') self._cache = os.path.join(CACHE_DIR, 'tts')
os.makedirs(os.path.dirname(self.cache_dir), exist_ok=True) os.makedirs(os.path.dirname(self._cache), exist_ok=True)
self.cuda = cuda self.cuda = cuda
@ -42,7 +38,7 @@ class TextToSpeechPlugin(PluginBase):
vocoder_path, vocoder_config_path = None, None vocoder_path, vocoder_config_path = None, None
if conf_overrides: if conf_overrides:
override_conf_path = os.path.join(self.cache_dir, f'{self._name}_override.json') override_conf_path = os.path.join(self._cache, f'{self._name}_override.json')
config = load_config(config_path) config = load_config(config_path)
for key, value in conf_overrides.items(): for key, value in conf_overrides.items():
@ -61,7 +57,7 @@ class TextToSpeechPlugin(PluginBase):
def make_tts_wav(self, text, filename=None): def make_tts_wav(self, text, filename=None):
if filename is None: if filename is None:
filename = os.path.join(self.cache_dir, f'{uuid.uuid1()}.wav') filename = os.path.join(self._cache, f'{uuid.uuid1()}.wav')
if self.speaker_wav: if self.speaker_wav:
wav = self.synthesizer.tts(text, None, 'en', self.speaker_wav) wav = self.synthesizer.tts(text, None, 'en', self.speaker_wav)
@ -78,20 +74,17 @@ class TextToSpeechPlugin(PluginBase):
text += '.' text += '.'
filename = self.make_tts_wav(text) filename = self.make_tts_wav(text)
# TODO: Play direct from memory # TODO: Play direct from memory
clip = Clip(filename, force_stereo=True, samplerate=self.sample_rate) clip = Clip(filename, self._output_index, force_stereo=False)
stream = Stream(clip, self.output_index)
if wait: if wait:
async def play(): async def play():
await stream.aplay() await clip.aplay()
stream.close() clip.close()
os.remove(os.path.join(self.cache_dir, filename))
asyncio.create_task(play()) asyncio.create_task(play())
else: else:
stream.play() clip.play()
stream.close() clip.close()
os.remove(os.path.join(self.cache_dir, filename))
except Exception as e: except Exception as e:
self.logger.error(f"Failed to make speech from input: {e}") print(e)
if source_event := _ctx.get('event'): if source_event := _ctx.get('event'):
msg = SysMessage(self._name, 'Failed to make speech from input!!') msg = SysMessage(self._name, 'Failed to make speech from input!!')

View file

@ -0,0 +1,3 @@
from .PluginBase import PluginBase, PluginError
__all__ = ['PluginBase', 'PluginError']

View file

@ -16,7 +16,5 @@ class ChancePlugin(PluginBase):
raise ValueError('Chance must be a string (optionally ending in %) or number') raise ValueError('Chance must be a string (optionally ending in %) or number')
if random.random() < chance / 100: if random.random() < chance / 100:
await self.execute_kdl([child for child in _children if child.name != 'or'], _ctx=_ctx) for node in _children:
else: await self.call_plugin_from_kdl(node, _ctx=_ctx)
if elsenode := next((child for child in _children if child.name == 'or'), None):
await self.execute_kdl(elsenode.nodes, _ctx=_ctx)

View file

@ -12,4 +12,4 @@ class ChoicePlugin(PluginBase):
async def run(self, _children=None, _ctx={}, **kwargs): async def run(self, _children=None, _ctx={}, **kwargs):
node = random.choice(_children) node = random.choice(_children)
await self.execute_kdl([node], _ctx=_ctx) await self.call_plugin_from_kdl(node, _ctx=_ctx)

View file

@ -130,7 +130,8 @@ class CommandPlugin(PluginBase):
args = command.parse(event.text) args = command.parse(event.text)
self.logger.debug(f"Parsed args for {command.name}: {args}") self.logger.debug(f"Parsed args for {command.name}: {args}")
ctx = dict(event=event, **args) ctx = dict(event=event, **args)
await self.execute_kdl(actionnode.nodes, _ctx=ctx) for node in actionnode.nodes:
await self.call_plugin_from_kdl(node, _ctx=ctx)
except argparse.ArgumentError as e: except argparse.ArgumentError as e:
msg = SysMessage(self._name, f"{e}. See !help {command.name}", replies_to=event) msg = SysMessage(self._name, f"{e}. See !help {command.name}", replies_to=event)
self.chats[event.via].send(msg) self.chats[event.via].send(msg)

View file

@ -0,0 +1,51 @@
import maya
from ovtk_audiencekit.plugins import PluginBase
class CueEvent:
def __init__(self, oneshot, at=None, **kwargs):
self.oneshot = oneshot
if at:
self._next_run = maya.parse(at)
self._interval = None
else:
self._next_run = maya.now().add(**kwargs)
self._interval = kwargs
def check(self):
if self._next_run <= maya.now():
if self._interval:
self._next_run = maya.now().add(**self._interval)
return True
return False
class CuePlugin(PluginBase):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.cue_events = {}
def run(self, name=None, _children=None, **kwargs):
if not _children:
raise ValueError('Cue defined without any events')
if name is None:
name = f"cue-{len(self.cue_events.keys())}"
for eventnode in _children:
at = eventnode.args[0] if len(eventnode.args) == 1 else None
oneshot = eventnode.name in ['once', 'after']
cue_event = CueEvent(oneshot, at=at, **eventnode.props)
actions = [lambda node=node: self.call_plugin_from_kdl(node) for node in eventnode.nodes];
self.cue_events[name] = (cue_event, actions)
async def tick(self, dt):
for key, (event, actions) in list(self.cue_events.items()):
if event.check():
for action in actions:
await action()
if event.oneshot:
del self.cue_events[key]

View file

@ -9,7 +9,7 @@ class ExecPlugin(PluginBase):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self.warned = False self.warned = False
def run(self, cmd, reply=False, to_arg=None, _ctx={}, **kwargs): def run(self, cmd, reply=False, _ctx={}, **kwargs):
if not self.warned: if not self.warned:
self.logger.warning('Executing unchecked input is potentially dangerous! Check your (arg) inputs, if any, *very* carefully') self.logger.warning('Executing unchecked input is potentially dangerous! Check your (arg) inputs, if any, *very* carefully')
self.warned = True self.warned = True
@ -17,13 +17,11 @@ class ExecPlugin(PluginBase):
out = subprocess.run(cmd.split(' '), capture_output=True, text=True) out = subprocess.run(cmd.split(' '), capture_output=True, text=True)
if out.returncode != 0: if out.returncode != 0:
self.logger.warning(f'Command returned {out.returncode}: {out.stderr}') self.logger.error(f'Command retruned {out.returncode}: {out.stderr}')
return return
else: else:
self.logger.debug(f'Command returned {out.returncode}: {out.stdout}') self.logger.info(f'Command retruned {out.returncode}: {out.stdout}')
if reply and _ctx['event']: if reply and _ctx['event']:
msg = SysMessage(self._name, out.stdout) msg = SysMessage(self._name, out.stdout)
self.chats[_ctx['event'].via].send(msg) self.chats[_ctx['event'].via].send(msg)
if to_arg:
_ctx[to_arg] = out.stdout

View file

@ -0,0 +1,17 @@
import mido
from ovtk_audiencekit.plugins import PluginBase
class MidiPlugin(PluginBase):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.output_port = mido.open_output()
def run(self, type, _ctx={}, **kwargs):
if type == 'sysex':
data = kwargs['data']
msg = mido.Message('sysex', data=bytes(data, encoding='utf-8'), time=0)
self.output_port.send(msg)
else:
raise NotImplementedError('TODO: note on/off and cc')

View file

@ -0,0 +1,24 @@
from ovtk_audiencekit.plugins import PluginBase
class RememberPlugin(PluginBase):
"""Saves the response of other plugins in the local context (can be fetched with the custom arg type)"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
async def run(self, name, _children=None, _ctx={}, **kwargs):
if _children is None:
self.logger.warn('No children - this does nothing!')
return
responses = []
for child in _children:
res = await self.call_plugin_from_kdl(child, _ctx=_ctx)
responses.append(res)
if len(responses) == 1:
responses = responses[0]
_ctx[name] = responses

View file

@ -108,15 +108,16 @@ class TriggerPlugin(PluginBase):
unknown_args[key] = value unknown_args[key] = value
trigger = Trigger(**args, attr_checks=unknown_args) trigger = Trigger(**args, attr_checks=unknown_args)
handler = lambda ctx: self.execute_kdl(_children, _ctx=ctx) actions = [lambda ctx, node=node: self.call_plugin_from_kdl(node, _ctx=ctx) for node in _children]
self.triggers.append((trigger, handler, _ctx)) self.triggers.append((trigger, actions, _ctx))
async def on_bus_event(self, event): async def on_bus_event(self, event):
for trigger, handler, ctx in self.triggers: for trigger, actions, ctx in self.triggers:
if trigger.matches(event, self.last_msg): if trigger.matches(event, self.last_msg):
_ctx = {**ctx, 'event': event} _ctx = {**ctx, 'event': event}
await handler(_ctx) for action in actions:
await action(_ctx)
if isinstance(event, Message): if isinstance(event, Message):
self.last_msg = event self.last_msg = event
return event return event

View file

@ -10,8 +10,7 @@ class WritePlugin(PluginBase):
def run(self, text, target, append=False, **kwargs): def run(self, text, target, append=False, **kwargs):
text += '\n' text += '\n'
base_name = os.path.dirname(text)
base_name = os.path.dirname(target)
if base_name: if base_name:
os.makedirs(base_name, exist_ok=True) os.makedirs(base_name, exist_ok=True)

View file

@ -8,8 +8,6 @@ from .Chance import ChancePlugin as chance
from .Choice import ChoicePlugin as choice from .Choice import ChoicePlugin as choice
from .Midi import MidiPlugin as midi from .Midi import MidiPlugin as midi
from .WebSocket import WebSocketPlugin as ws from .WebSocket import WebSocketPlugin as ws
from .Set import SetPlugin as set from .Remember import RememberPlugin as remember
from .Scene import ScenePlugin as scene
from .Log import LogPlugin as log
__all__ = ['trigger', 'reply', 'command', 'cue', 'write', 'exec', 'chance', 'choice', 'midi', 'ws', 'set', 'scene', 'log'] __all__ = ['trigger', 'reply', 'command', 'cue', 'write', 'exec', 'chance', 'choice', 'midi', 'ws', 'remember']

View file

@ -1,4 +1,5 @@
from multiprocessing import Process, Pipe from multiprocessing import Process, Pipe
from traceback import format_exception
import logging import logging
import asyncio import asyncio
@ -43,18 +44,18 @@ class NonBlockingWebsocket(Process):
self._pipe.send(data) self._pipe.send(data)
def run(self): def run(self):
loop = asyncio.new_event_loop()
# Setup ws client # Setup ws client
loop.run_until_complete(self._setup()) asyncio.get_event_loop().run_until_complete(self._setup())
# Make an awaitable object that flips when the pipe's underlying file descriptor is readable # Make an awaitable object that flips when the pipe's underlying file descriptor is readable
pipe_ready = asyncio.Event() pipe_ready = asyncio.Event()
loop.add_reader(self._pipe.fileno(), pipe_ready.set) asyncio.get_event_loop().add_reader(self._pipe.fileno(), pipe_ready.set)
# Make and start our infinite tasks # Make and start our infinite tasks
loop.create_task(self._send(pipe_ready)) asyncio.get_event_loop().create_task(self._send(pipe_ready))
loop.create_task(self._read()) asyncio.get_event_loop().create_task(self._read())
# Keep the asyncio code running in this thread until explicitly stopped # Keep the asyncio code running in this thread until explicitly stopped
try: try:
loop.run_forever() asyncio.get_event_loop().run_forever()
except KeyboardInterrupt: except KeyboardInterrupt:
return 0 return 0

View file

@ -1,4 +1,3 @@
from .NonBlockingWebsocket import NonBlockingWebsocket from .NonBlockingWebsocket import NonBlockingWebsocket
from .make_sync import make_sync from .make_sync import make_sync
from .get_subclasses import get_subclasses from .get_subclasses import get_subclasses
from .format_exception import format_exception

2125
pdm.lock generated

File diff suppressed because it is too large Load diff

View file

@ -1,60 +0,0 @@
[project]
name = "ovtk_audiencekit"
version = "0.1.0"
description = ""
authors = [
{name = "Skeh", email = "im@skeh.site"},
]
dependencies = [
"click",
"kdl-py",
"quart==0.18.*",
"hypercorn",
"requests",
"websockets",
"aioprocessing",
"aioscheduler",
"pyaudio==0.2.*",
"librosa==0.8.*",
"pytsmod",
"numpy",
"multipledispatch",
"blessed",
"appdirs",
"maya",
"mido",
"python-rtmidi",
"simpleobsws",
"python-osc>=1.9.0",
]
requires-python = ">=3.10,<3.11"
readme = "README.md"
license = {text = "GPLv2"}
[project.optional-dependencies]
tts = [
"TTS==0.9.*",
"torch==1.13.*",
]
phrasecounter = ["num2words"]
jail = ["owoify-py==2.*"]
twitch = ["miniirc"]
[tool.pdm.dev-dependencies]
dev = [
"pipenv-setup",
]
[build-system]
requires = ["pdm-backend"]
build-backend = "pdm.backend"
[tool.pdm.scripts]
start = "python audiencekit.py start"
ws = "python audiencekit.py ws"
[tool.pdm]
[[tool.pdm.source]]
url = "https://pypi.org/simple"
verify_ssl = true
name = "pypi"

View file

@ -1,79 +0,0 @@
import random
from enum import Enum, auto
from ovtk_audiencekit.chats import ChatProcess
from ovtk_audiencekit.events import Event, Message, SysMessage
from ovtk_audiencekit.events.Message import USER_TYPE
class STATES(Enum):
PAUSED = auto()
RUNNING = auto()
class StartStop(Event):
pass
class FakeChat(ChatProcess):
def __init__(self, *args, max_delay=10, max_messages_per_chunk=1, start_paused=True, max_monitization=None, **kwargs):
super().__init__(*args, **kwargs)
self._max_delay = max_delay
self._max_messages_per_chunk = max_messages_per_chunk
self._max_monitization = max_monitization
self.state = STATES.PAUSED if start_paused else STATES.RUNNING
def process_messages(self, event, next_state):
if isinstance(event, StartStop):
running = not self.state == STATES.RUNNING
text = 'Fake chat activated!' if running else 'Disabled fake chat'
sys_msg = SysMessage(self._name, text)
self.publish(sys_msg)
return STATES.RUNNING if running else STATES.PAUSED
def loop(self, next_state):
if self.state == STATES.PAUSED:
return None
while range(int(random.random() * (self._max_messages_per_chunk + 1))):
author_name, author_id, author_type = random.choice([
('Random user', '123123', USER_TYPE.PATRON),
('Some Guy', '723894', USER_TYPE.PATRON),
('xX_ButtSlayerMan1967_Xx', '324234', USER_TYPE.PATRON),
('My name is uncessisarily long why does yt allow this', '123786986', USER_TYPE.PATRON),
('Taco Bell official (i wish)', '8979823', USER_TYPE.PATRON),
('skeh', '1337', USER_TYPE.OWNER),
('rando_mod', '6969', USER_TYPE.MODERATOR),
])
text = random.choice([
'Some fake user saying some shid',
'Another fake message from a fake fan (lol)',
'Why play games when you could eat bean',
'pog more like log',
'playing the game :drake_dislike:\nspending hours getting the game to run well :drake_like:',
'now thats what i call epic',
'POG',
'cheese',
'oh yeah, thats one neat chat',
'lmao fake chat',
'nice chat you got there, be a shame if someone spammed it',
' i like m y whitespace ',
'no i \n\n\n like my\nwhite\nspace',
'this fake user is chatty and say a lot of various things, but its still a coherent sentance somehow',
'Thanks for coming to my ted talk. Tonight, I discuss what exactly it means to be your little pogchamp, and how "come here" is actually propoganda in disquise. I am very good at parties i swear, please come to my party p l ea se',
'USRE VERY EXCITE POGGGG POG POGGGGGGGGGGGGGGGGGGGGGGGG POGPOGPOGGGG',
'spaamamspmapmdpmaspmspsapmspmapsmpasmspmapmpasmspmapmspampsmpaspaspamapmspmapmspmapsmpamspamspmapsmpmaspmapmspamspmapsmpamspmpamspms',
])
if self._max_monitization and random.random() > 0.5:
amount = random.random() * self._max_monitization
monitization = (amount, amount)
else:
monitization = None
fake_message = Message(self._name, text,
author_name, author_id, author_type,
monitization=monitization)
self.publish(fake_message)
return random.random() * self._max_delay

View file

@ -1 +0,0 @@
from .FakeChat import FakeChat as Process

View file

@ -1 +0,0 @@
from .misskey import MisskeyProcess as Process

View file

@ -1,80 +0,0 @@
import json
import random
import logging
from enum import Enum, auto
from itertools import chain
from ovtk_audiencekit.chats import ChatProcess
from ovtk_audiencekit.utils import NonBlockingWebsocket
from ovtk_audiencekit.events.Message import Message, USER_TYPE
logger = logging.getLogger(__name__)
class STATES(Enum):
CONNECTING = auto()
READING = auto()
class MisskeyProcess(ChatProcess):
def __init__(self, *args, instance=None, channel=None, token=None, **kwargs):
super().__init__(*args, **kwargs)
self._url = f'wss://{instance}/streaming'
if token:
self._url += f'?token={token}'
self.channelId = channel
self._subid = '%0x' % random.getrandbits(16 * 4)
self._state_machine = self.bind_to_states(STATES)
self.state = STATES.CONNECTING
def normalize_event(self, event):
user_name = event['user']['name'] or event['user']['username']
user_id = event['user']['id']
text = event.get('text', '')
attachments = [(file['type'], file['url']) for file in event.get('files', [])]
emojis = {emoji['name']: emoji['url'] for emoji in chain(event.get('emojis', []), event['user'].get('emojis', []))}
if text or attachments:
msg = Message(self._name, text or '',
user_name, user_id, USER_TYPE.USER,
id=event['id'], emotes=emojis or None,
attachments=attachments or None)
return msg
return None
def on_connecting(self, next_state):
self._ws = NonBlockingWebsocket(self._url)
self._ws.start()
payload = {
'type': 'connect',
'body': {
'channel': 'channel', # lol
'id': self._subid,
'params': {
'channelId': self.channelId,
}
}
}
self._ws.send(json.dumps(payload))
return STATES.READING
def on_reading(self, next_state, timeout=0.1):
if self._ws.poll(timeout):
note_event = self._ws.recv()
try:
misskey_event = json.loads(note_event)
if misskey_event['body']['id'] == self._subid and misskey_event['body']['type'] == 'note':
note = misskey_event['body']['body'] # LOL
norm = None
try:
norm = self.normalize_event(note)
except Exception as e:
logger.error(f'Failed to process note data: {note}')
if norm:
self.publish(norm)
except (KeyError, json.JSONDecodeError):
logger.error(f'Unknown data in websocket: {note_event}')
return 0
def loop(self, next_state):
return self._state_machine(self.state, next_state)

View file

@ -1,29 +0,0 @@
import logging
import asyncio
import click
from ovtk_audiencekit.core import MainProcess
from .group import cli
logger = logging.getLogger(__name__)
@cli.command()
@click.argument('config_file', type=click.Path('r'), default='config.kdl')
@click.option('--bus-bind', default='localhost')
@click.option('--bus-port', default='8080')
@click.option('--web-bind', default='localhost')
@click.option('--web-port', default='8000')
def start(config_file, bus_bind=None, bus_port=None, web_bind=None, web_port=None):
"""Start audiencekit server"""
logger.info('Hewwo!!')
main = MainProcess(config_file,
bus_conf=(bus_bind, bus_port),
web_conf=(web_bind, web_port))
try:
asyncio.run(main.run())
except KeyboardInterrupt:
pass
finally:
logger.info('Suya~')

View file

@ -1,150 +0,0 @@
from functools import reduce
from operator import getitem
from string import Formatter
from dataclasses import dataclass, field
from abc import ABC, abstractmethod
import os
import kdl
import kdl.types
from ovtk_audiencekit.utils import format_exception
@dataclass
class Dynamic(ABC):
source: str
parser: any
def to_kdl(self):
return self.source
@abstractmethod
def compute():
pass
def __repr__(self):
return str(self.source)
def get(instance, key):
if isinstance(instance, dict):
return getitem(instance, key)
else:
try:
return getattr(instance, key)
except KeyError:
return getitem(instance, key)
class Arg(Dynamic):
class GetitemFormatter(Formatter):
def convert_field(self, value, conversion):
if conversion == 'i':
return value.replace('\n', '')
else:
return super().convert_field(value, conversion)
def get_field(self, field_name, args, kwargs):
keys = field_name.split('.')
field = reduce(get, keys, kwargs)
return (field, keys[0])
def compute(self, *args, _ctx={}):
key = self.parser.fragment[1:-1]
try:
if '{' in key:
return Arg.GetitemFormatter().format(key, **_ctx).replace(r'\"', '"')
else:
return reduce(get, key.split('.'), _ctx)
except (KeyError, IndexError) as e:
raise self.parser.error(f'Invalid arg string: {e}') from e
except Exception as e:
raise self.parser.error(f'Exception raised during arg inject: {format_exception(e, traceback=False)}') from e
class Eval(Dynamic):
def compute(self, *args, **kwargs):
contents = self.parser.fragment[1:-1]
try:
return eval(contents, kwargs)
except Exception as e:
raise self.parser.error(f'Exception raised during eval: {format_exception(e, traceback=False)}') from e
def csv_parser(text, parser):
text = parser.fragment[1:-1]
return [field.strip() for field in text.split(',')]
def semisv_parser(text, parser):
text = parser.fragment[1:-1]
return [field.strip() for field in text.split(';')]
customValueParsers = {
'arg': Arg,
't': Arg,
'eval': Eval,
'list': csv_parser,
'csv': csv_parser,
'semisv': semisv_parser,
}
def compute_dynamic(kdl_node, *args, **kwargs):
args = []
for arg in kdl_node.args:
if isinstance(arg, Dynamic):
arg = arg.compute(*args, **kwargs)
args.append(arg)
props = {}
for key, prop in kdl_node.props.items():
if isinstance(prop, Dynamic):
prop = prop.compute(*args, **kwargs)
props[key] = prop
return args, props
@dataclass
class KdlscriptNode(kdl.Node):
alias: str | None = field(default=None, init=False)
sub: str | None = field(default=None, init=False)
def __post_init__(self):
lhs, *sub = self.name.split('.')
if len(sub) == 0:
sub = None
alias, *name = lhs.split(':')
if len(name) == 0:
name = alias
alias = None
elif len(name) == 1:
name = name[0]
else:
raise ValueError("Invalid node name")
self.name = name
self.alias = alias
self.sub = sub
# HACK: Gross disgusting monkey patch
kdl.types.Node = KdlscriptNode
kdl_parse_config = kdl.ParseConfig(valueConverters=customValueParsers)
kdl_reserved = ['secrets', 'chat', 'plugin', 'import']
def parse_kdl_deep(path, relativeto=None):
if relativeto:
path = os.path.normpath(os.path.join(relativeto, path))
with open(path, 'r') as f:
try:
config = kdl.parse(f.read(), kdl_parse_config)
for node in config.nodes:
node.args, node.props = compute_dynamic(node)
except kdl.errors.ParseError as e:
e.file = path
raise e
for node in config.nodes:
if node.name == 'import':
yield from parse_kdl_deep(node.args[0], relativeto=os.path.dirname(path))
else:
yield node

View file

@ -1,18 +0,0 @@
import os
import random
import appdirs
CACHE_DIR = appdirs.user_cache_dir('audiencekit', 'ovtk')
DATA_DIR = appdirs.user_data_dir('audiencekit', 'ovtk')
_user_id_path = os.path.join(DATA_DIR, 'ovtk_user_id')
if not os.path.isfile(_user_id_path):
ovtk_user_id = '%0x' % random.getrandbits(16 * 4) # Non-cryptographic (just needs to be unique)
os.makedirs(os.path.dirname(_user_id_path), exist_ok=True)
with open(_user_id_path, 'w+') as f:
f.write(ovtk_user_id)
else:
with open(_user_id_path, 'r') as f:
ovtk_user_id = f.read()

View file

@ -1,342 +0,0 @@
import importlib
from multiprocessing import Lock
import asyncio
from datetime import datetime, timedelta
import logging
import os
import os.path
import pathlib
import sys
import signal
import kdl
from click import progressbar
from aioscheduler import TimedScheduler
from quart import Quart
import hypercorn
import hypercorn.asyncio
import hypercorn.logging
from ovtk_audiencekit.core import WebsocketServerProcess
from ovtk_audiencekit.core.Config import parse_kdl_deep, kdl_reserved, compute_dynamic
from ovtk_audiencekit.events import Event, Delete
from ovtk_audiencekit.chats.ChatProcess import ShutdownRequest
from ovtk_audiencekit.plugins import builtins, PluginError
from ovtk_audiencekit.utils import format_exception
logger = logging.getLogger(__name__)
class HypercornLoggingShim(hypercorn.logging.Logger):
"""Force bog-standard loggers for Hypercorn"""
def __init__(self, config):
self.access_logger = logging.getLogger('hypercorn.access')
self.error_logger = logging.getLogger('hypercorn.error')
self.access_log_format = config.access_log_format
def import_or_reload_mod(module_name, default_package=None, external=False):
if external:
package = None
import_fragment = module_name
else:
package = default_package
import_fragment = f'.{module_name}'
fq_module = f'{package}{import_fragment}' if not external else import_fragment
if module := sys.modules.get(fq_module):
for submod in [mod for name, mod in sys.modules.items() if name.startswith(fq_module)]:
importlib.reload(submod)
importlib.reload(module)
else:
module = importlib.import_module(import_fragment, package=package)
return module
class MainProcess:
def __init__(self, config_path, bus_conf=(None, None), web_conf=(None, None)):
self._running = False
self.config_path = config_path
self.bus_conf = bus_conf
self.web_conf = web_conf
self.chat_processes = {}
self.plugins = {}
self.event_queue = asyncio.Queue()
# Init websocket server (event bus)
# HACK: Must be done here to avoid shadowing its asyncio loop
self.server_process = WebsocketServerProcess(*self.bus_conf)
self.server_process.start()
# Save sys.path since some config will clobber it
self._initial_syspath = sys.path
def _unload_plugin(self, plugin_name):
plugin = self.plugins[plugin_name]
plugin.close()
del self.plugins[plugin_name]
del plugin
def _get_event_from_pipe(self, pipe):
event = pipe.recv()
self.event_queue.put_nowait(event)
def _setup_webserver(self):
self.webserver = Quart(__name__, static_folder=None, template_folder=None)
listen = ':'.join(self.web_conf)
self.webserver.config['SERVER_NAME'] = listen
@self.webserver.context_processor
async def update_ctx():
return { 'EVBUS': 'ws://' + ':'.join(self.bus_conf) }
self.webserver.jinja_options = {
'block_start_string': '<%', 'block_end_string': '%>',
'variable_start_string': '<<', 'variable_end_string': '>>',
'comment_start_string': '<#', 'comment_end_string': '#>',
}
async def serve_coro():
config = hypercorn.config.Config()
config.bind = listen
config.use_reloader = False
config.logger_class = HypercornLoggingShim
try:
await hypercorn.asyncio.serve(self.webserver, config, shutdown_trigger=self.shutdown_ev.wait)
except Exception as e:
logger.critical(f'Failure in web process - {e}')
logger.debug(format_exception(e))
raise e
# MAGIC: As root tasks are supposed to be infinte loops, raise an exception if hypercorn shut down
raise asyncio.CancelledError()
return serve_coro()
async def handle_events(self):
while True:
event = await self.event_queue.get()
logger.info(event)
if isinstance(event, Event):
for plugin_name, plugin in list(self.plugins.items()):
try:
event = plugin.on_bus_event(event)
if asyncio.iscoroutinefunction(plugin.on_bus_event):
event = await event
except PluginError as e:
if e.fatal:
logger.critical(f'Failure when processing {e.source} ({e}) - disabling...')
else:
logger.warning(f'Encounterd error when processing {e.source} ({e})')
logger.debug(format_exception(e))
if e.fatal:
self._unload_plugin(e.source)
except Exception as e:
self._plugin_error
logger.critical(f'Failure when processing {plugin_name} ({e}) - disabling...')
logger.debug(format_exception(e))
self._unload_plugin(plugin_name)
if event is None:
break
else:
self.server_process.message_pipe.send(event)
logger.debug(f'Event after plugin chain - {event}')
elif isinstance(event, Delete):
self.server_process.message_pipe.send(event)
else:
logger.error(f'Unknown data in event loop - {event}')
async def tick_plugins(self):
while True:
await asyncio.sleep(0.5)
for plugin_name, plugin in list(self.plugins.items()):
try:
res = await plugin._tick(0.5) # Not necesarily honest!
except Exception as e:
logger.critical(f'Failure during background processing for {plugin_name} ({e}) - disabling...')
logger.debug(format_exception(e))
self._unload_plugin(plugin_name)
async def user_setup(self):
config = kdl.Document(list(parse_kdl_deep(self.config_path)))
stdin_lock = Lock()
# Load secrets
secrets = {}
if node := config.get('secrets'):
for module in node.nodes:
fields = secrets.get(module.name, {})
for node in module.nodes:
fields[node.name] = node.args[0] if len(node.args) == 1 else node.args
secrets[module.name] = fields
# Dynamically import chats
with progressbar(list(config.getAll('chat')), label="Preparing modules (chats)", item_show_func=lambda i: i and i.args[0]) as bar:
for node in bar:
if len(node.args) == 0:
continue
module_name = node.args[0]
chat_name = node.alias or module_name
if chat_name in self.plugins:
raise ValueError(f'Definition "{chat_name}" already exists - rename using alias syntax: `alias:chat ...`')
secrets_for_mod = secrets.get(module_name, {})
try:
chat_module = import_or_reload_mod(module_name,
default_package='ovtk_audiencekit.chats',
external=False)
chat_process = chat_module.Process(stdin_lock, chat_name, **node.props, **secrets_for_mod)
self.chat_processes[chat_name] = chat_process
except Exception as e:
raise ValueError(f'Failed to initalize {module_name} module "{chat_name}" - {e}')
if len(self.chat_processes.keys()) == 0:
logger.warning('No chats configured!')
# Start chat processes
for process in self.chat_processes.values():
process.start()
# Bridge pipe to asyncio event loop
pipe = process.event_pipe
# REVIEW: This does not work on windows!!!! add_reader is not implemented
# in a way that supports pipes on either windows loop runners
asyncio.get_event_loop().add_reader(pipe.fileno(), lambda pipe=pipe: self._get_event_from_pipe(pipe))
# Load plugins
global_ctx = {}
## Builtins
for node_name in builtins.__all__:
plugin = builtins.__dict__[node_name](self.chat_processes, self.event_queue, node_name, global_ctx)
self.plugins[node_name] = plugin
self.webserver.register_blueprint(plugin.blueprint)
## Dynamic
with progressbar(list(config.getAll('plugin')), label="Preparing modules (plugins)", item_show_func=lambda i: i and i.args[0]) as bar:
for node in bar:
if len(node.args) == 0:
continue
module_name = node.args[0]
plugin_name = node.alias or module_name
if plugin_name in self.plugins:
raise ValueError(f'Definition "{plugin_name}" already exists - rename using alias syntax: `alias:plugin ...`')
secrets_for_mod = secrets.get(module_name, {})
try:
plugin_module = import_or_reload_mod(module_name,
default_package='ovtk_audiencekit.plugins',
external=False)
plugin = plugin_module.Plugin(self.chat_processes, self.event_queue, plugin_name, global_ctx)
self.plugins[plugin_name] = plugin
await plugin._setup(*node.args[1:], **node.props, **secrets_for_mod)
# Register UI with webserver
self.webserver.register_blueprint(plugin.blueprint)
except Exception as e:
raise ValueError(f'Failed to initalize {module_name} plugin "{plugin_name}" - {e}')
# Run plugin definitions
with progressbar(list(config.nodes), label=f"Executing {self.config_path}") as bar:
for node in bar:
if node.name in kdl_reserved:
continue
plugin_name = node.name
plugin_module = self.plugins.get(plugin_name)
if plugin_module is None:
logger.error(f'Unknown plugin: {node.name}')
else:
await plugin_module._call(node.sub, node.tag, *node.args, **node.props, _ctx=global_ctx, _children=node.nodes)
async def user_shutdown(self):
for process_name, process in list(reversed(self.chat_processes.items())):
pipe = process.event_pipe
process.control_pipe.send(ShutdownRequest('root'))
process.join(5)
if process.exitcode is None:
process.terminate()
asyncio.get_event_loop().remove_reader(pipe.fileno())
del self.chat_processes[process_name]
for plugin_name in list(reversed(self.plugins.keys())):
# NOTE: The plugin will likely stick around in memory for a bit after this,
# as the webserver will still have its quart blueprint attached
self._unload_plugin(plugin_name)
sys.path = self._initial_syspath
async def run(self):
self.shutdown_ev = asyncio.Event()
self.reload_ev = asyncio.Event()
loop = asyncio.get_event_loop()
user_tasks = set()
try:
# System setup
## Bridge websocket server pipe to asyncio loop
## REVIEW: This does not work on windows!!!! add_reader is not implemented
## in a way that supports pipes on either windows loop runners
ws_pipe = self.server_process.message_pipe
loop.add_reader(ws_pipe.fileno(), lambda: self._get_event_from_pipe(ws_pipe))
## Register stdin handler
reader = asyncio.StreamReader()
await loop.connect_read_pipe(lambda: asyncio.StreamReaderProtocol(reader), sys.stdin)
async def discount_repl():
# REVIEW: Not a good UX at the moment (as new logs clobber the terminal entry)
async for line in reader:
line = line.strip()
if line == b'reload':
self.reload_ev.set()
elif line == b'quit':
self.shutdown_ev.set()
self.cli_task = loop.create_task(discount_repl())
## Scheduler for timed tasks
self._skehdule = TimedScheduler(max_tasks=1)
self._skehdule.start()
## UI server
serve_coro = self._setup_webserver()
self.webserver_task = loop.create_task(serve_coro)
logger.debug(f'Listening on: {":".join(self.web_conf)} (UI) and {":".join(self.bus_conf)} (event bus)')
# User (plugin / chat) mode (reloading allowed)
while True:
async with self.webserver.app_context():
await self.user_setup()
# Start plumbing tasks
user_tasks.add(loop.create_task(self.tick_plugins()))
user_tasks.add(loop.create_task(self.handle_events()))
logger.info(f'Ready to rumble! Press Ctrl+C to shut down')
reload_task = loop.create_task(self.reload_ev.wait())
done, pending = await asyncio.wait([*user_tasks, self.webserver_task, reload_task], return_when=asyncio.FIRST_COMPLETED)
if reload_task in done:
logger.warn('Reloading (some events may be missed!)')
logger.debug('Teardown...')
self.reload_ev.clear()
# Shutdown plugins / chats
await self.user_shutdown()
# Stop event plumbing
for task in user_tasks:
task.cancel()
user_tasks.clear()
# HACK: Restart webserver to workaround quart's inability to remove blueprints
# Stop
await self.webserver.shutdown()
self.shutdown_ev.set()
try:
await self.webserver_task
except asyncio.CancelledError:
self.shutdown_ev.clear()
# Start
logger.debug('Startup...')
serve_coro = self._setup_webserver()
self.webserver_task = loop.create_task(serve_coro)
else:
break
except KeyboardInterrupt:
pass
except kdl.errors.ParseError as e:
try:
logger.critical(f'Invalid configuration in {e.file}: line {e.line}, character {e.col} - {e.msg}')
except AttributeError:
logger.critical(f'Invalid configuration - {e.msg}')
except Exception as e:
logger.critical(f'Failure in core process - {e}')
logger.debug(format_exception(e))
finally:
logger.warn('Closing up shop...')
for task in user_tasks:
task.cancel()
await self.user_shutdown()
self.webserver_task.cancel()
self.server_process.terminate()

View file

@ -1,166 +0,0 @@
from abc import ABC, abstractmethod
import logging
import asyncio
import os.path
import sys
import copy
import kdl
import quart
from ovtk_audiencekit.core.Config import kdl_parse_config, compute_dynamic
class PluginError(Exception):
def __init__(self, source, message, fatal=True):
self.source = source
self.message = message
self.fatal = fatal
def __str__(self):
return self.message
class OvtkBlueprint(quart.Blueprint):
def url_for(self, endpoint, *args, **kwargs):
"""url_for method that understands blueprint-relative names under non-request contexts"""
if endpoint.startswith('.'):
endpoint = self.name + endpoint
return quart.url_for(endpoint, *args, **kwargs)
class PluginBase(ABC):
plugins = {}
def __init__(self, chat_processes, event_queue, name, global_ctx, _children=None, **kwargs):
super().__init__(**kwargs)
self.chats = chat_processes
self._event_queue = event_queue
self._name = name
self._global_ctx = global_ctx
self.plugins[name] = self
self.logger = logging.getLogger(f'plugin.{self._name}')
# HACK: This is kinda gross, and probably wont be true for frozen modules
plugin_dir = os.path.dirname(sys.modules[self.__class__.__module__].__file__)
self.blueprint = OvtkBlueprint(self._name, __name__,
url_prefix=f'/{self._name}',
static_url_path='static',
static_folder=os.path.join(plugin_dir, 'static'),
template_folder=os.path.join(plugin_dir, 'templates'))
if _children:
raise ValueError('Module does not accept children')
def __del__(self):
if self.plugins.get(self._name) == self:
del self.plugins[self._name]
async def _call(self, subroutine, tag, *args, **kwargs):
try:
if subroutine:
func = self
for accessor in subroutine:
func = getattr(func, accessor)
else:
func = self.run
res = func(*args, **kwargs)
if asyncio.iscoroutinefunction(func):
res = await res
return res
except Exception as e:
if isinstance(e, KeyboardInterrupt):
raise e
raise PluginError(self._name, str(e)) from e
async def _tick(self, *args, **kwargs):
try:
res = self.tick(*args, **kwargs)
if asyncio.iscoroutinefunction(self.tick):
return await res
else:
return res
except Exception as e:
if isinstance(e, KeyboardInterrupt):
raise e
raise PluginError(self._name, str(e)) from e
async def _setup(self, *args, **kwargs):
try:
res = self.setup(*args, **kwargs)
if asyncio.iscoroutinefunction(self.setup):
return await res
else:
return res
except Exception as e:
if isinstance(e, KeyboardInterrupt):
raise e
raise PluginError(self._name, str(e)) from e
# Base class helpers
def broadcast(self, event):
"""Send event to every active chat"""
for proc in self.chats.values():
if proc.readonly:
continue
proc.control_pipe.send(event)
async def execute_kdl(self, nodes, *py_args, _ctx={}, **py_props):
"""
Run other plugins as configured by the passed KDL nodes collection
If this was done in response to an event, pass it as 'event' in _ctx!
"""
_ctx = copy.deepcopy({**self._global_ctx, **_ctx})
for node in nodes:
try:
args, props = compute_dynamic(node, _ctx=_ctx)
target = self.plugins.get(node.name)
if target is None:
self.logger.warning(f'Could not find plugin or builtin with name {node.name}')
break
result = await target._call(node.sub, node.tag, *args, *py_args, **props, _ctx=_ctx, **py_props, _children=node.nodes)
if node.alias:
_ctx[node.alias] = result
except Exception as e:
self.logger.warning(f'Failed to execute defered KDL: {e}')
break
def send_to_bus(self, event):
"""
Send an event to the event bus
WARNING: This will cause the event to be processed by other plugins - be careful not to cause an infinite loop!
"""
self._event_queue.put_nowait(event)
# User-defined
async def setup(self, *args, **kwargs):
"""Called when plugin is being loaded."""
pass
def close(self):
"""Called when plugin is about to be unloaded. Use this to safely close any resouces if needed"""
pass
async def tick(self, dt):
"""Called at least every half second - perform time-dependent updates here!"""
pass
async def on_bus_event(self, event):
"""Called for every event from the chats"""
return event
async def on_control_event(self, event):
"""
Called for events targeting this plugin name specifically.
This is normally used for other applications to communicate with this one over the websocket interface
"""
pass
@abstractmethod
async def run(self, *args, _children=None, _ctx={}, **kwargs):
"""
Run plugin action, either due to a definition in the config, or due to another plugin
"""
pass

View file

@ -1,82 +0,0 @@
import asyncio
from collections import deque
import maya
from ovtk_audiencekit.plugins import PluginBase
from ovtk_audiencekit.core import Clip, Stream
class AudioAlert(PluginBase):
def setup(self, output=None, timeout_min=1, sample_rate=None, buffer_length=4096, force_stereo=True):
self._cleanup_task = asyncio.create_task(self._cleanup())
self.force_stereo = force_stereo
self.timeout_min = timeout_min
self.clips = {}
self.streams = {}
self.tasks = set()
self.buffer_length = int(buffer_length)
self.output_index = Stream.find_output_index(output)
if sample_rate is None:
try:
sample_rate = next((rate for rate in [44100, 48000] if Stream.check_rate(self.output_index, 1, rate)))
except StopIteration:
self.logger.warn('Target audio device does not claim to support common sample rates! Attempting playback at native rate of audio')
self.sample_rate = sample_rate
def run(self, path, speed=1, keep_pitch=False, immediate=True, poly=1, **kwargs):
poly = int(poly)
key = f'{path}@{speed}{"X" if keep_pitch else "x"}'
clip = self.clips.get(key, [None, None])[0]
if clip is None:
clip = Clip(path, speed=speed, keep_pitch=keep_pitch,
samplerate=self.sample_rate, force_stereo=self.force_stereo)
self.clips[key] = [clip, maya.now()]
else:
self.clips[key][1] = maya.now()
stream_dq = self.streams.get(key, None)
if stream_dq is None:
stream_dq = deque(maxlen=poly)
self.streams[key] = stream_dq
if stream_dq.maxlen != poly:
self.logger.warn('Cannot change poly while streams are active!')
if len(stream_dq) == stream_dq.maxlen:
stream_dq.rotate(1)
stream = stream_dq[0]
else:
stream = Stream(clip, self.output_index,
buffer_length=self.buffer_length)
stream_dq.append(stream)
if immediate:
task = asyncio.create_task(stream.aplay())
task.add_done_callback(self.tasks.remove)
self.tasks.add(task)
else:
stream.play()
def close(self):
self._cleanup_task.cancel()
for task in self.tasks:
task.cancel()
for stream_dq in self.streams.values():
for stream in stream_dq:
stream.close()
async def _cleanup(self):
while True:
await asyncio.sleep(60)
now = maya.now()
for key, [clip, last_used] in list(self.clips.items()):
if now >= last_used.add(minutes=self.timeout_min, seconds=clip.length):
self.logger.debug(f'Dropping {key}')
streams = self.streams.get(key, [])
for stream in streams:
stream.close()
del self.streams[key]
del self.clips[key]

View file

@ -1 +0,0 @@
from .osc import OSCPlugin as Plugin

View file

@ -1,16 +0,0 @@
from pythonosc.udp_client import SimpleUDPClient
from ovtk_audiencekit.plugins import PluginBase
class OSCPlugin(PluginBase):
def setup(self, ip='localhost', port=None):
if port is None:
raise RuntimeError('A unique port must be specified')
self.client = SimpleUDPClient(ip, int(port))
async def run(self, endpoint, *data, _children=None, _ctx={}, **kwargs):
if len(data) == 1:
self.client.send_message(endpoint, data[0])
else:
self.client.send_message(endpoint, data)

View file

@ -1,3 +0,0 @@
from ovtk_audiencekit.core.PluginBase import PluginBase, PluginError
__all__ = ['PluginBase', 'PluginError']

View file

@ -1,128 +0,0 @@
import asyncio
import logging
import datetime
import uuid
import maya
import aioscheduler
from ovtk_audiencekit.plugins import PluginBase
from ovtk_audiencekit.utils import format_exception
logger = logging.getLogger(__name__)
TIME_SEGMENTS = ['seconds', 'minutes', 'hours', 'days']
class Cue:
def __init__(self, repeat, at=None, **kwargs):
self.repeat = repeat
self.enabled = True
if at:
self._next = maya.when(at)
else:
self._next = maya.now().add(**kwargs)
self._interval = kwargs
@property
def next(self):
return self._next.datetime(to_timezone='UTC', naive=True)
def is_obsolete(self):
if self.repeat:
return False
if self._next >= maya.now():
return False
return True
def reschedule(self, fresh=False):
if fresh:
self._next = maya.now().add(**self._interval)
else:
self._next = self._next.add(**self._interval)
# HACK: Compare epochs directly, as maya comps are only second accurate
if not fresh and self._next._epoch <= maya.now()._epoch:
offset = maya.now()._epoch - self._next._epoch
logger.warn(f'Cannot keep up with configured interval - {underrun} underrun. Repetition may fail!')
self._next = maya.now().add(**self._interval)
class CuePlugin(PluginBase):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.cues = {}
self.tasks = {}
self.scheduler = aioscheduler.TimedScheduler()
self.scheduler.start()
self._cleanup_task = asyncio.create_task(self._cleanup())
def run(self, name=None, repeat=False, enabled=None, _children=[], _ctx={}, **kwargs):
if name is None:
name = str(uuid.uuid4())
first_set = self.cues.get(name) is None
if len(_children) > 0:
has_interval = any(kwargs.get(segment) is not None for segment in TIME_SEGMENTS)
if kwargs.get('at') is None and not has_interval:
raise ValueError('Provide a concrete time with `at` or a timer length with `seconds`, `hours`, etc')
if kwargs.get('at') is not None and repeat and not has_interval:
raise ValueError('`repeat` can not be used with solely a concrete time')
cue = Cue(repeat, **kwargs)
async def handler():
# Repetition management
if not cue.enabled:
return
if cue.repeat:
cue.reschedule()
self.tasks[name] = self.scheduler.schedule(handler(), cue.next)
# Run configured actions
try:
await self.execute_kdl(_children, _ctx=_ctx)
except Exception as e:
self.logger.error(f'Failed to complete cue {name}: {e}')
self.logger.debug(format_exception(e))
self.cues[name] = (cue, handler)
self.schedule_exec(name, cue.next, handler())
if enabled is not None:
entry = self.cues.get(name)
if entry is None:
self.logger.warn(f'Cannot find cue with name "{name}"')
return
cue, handler = entry
cue.enabled = enabled
if enabled and not first_set:
cue.reschedule(fresh=True)
self.schedule_exec(name, cue.next, handler())
def schedule_exec(self, name, at, coro):
if existing_task := self.tasks.get(name):
self.scheduler.cancel(existing_task)
try:
self.tasks[name] = self.scheduler.schedule(coro, at)
except ValueError as e:
self.logger.error(f'Cannot schedule cue {name} at {at}: {e}')
def close(self):
self._cleanup_task.cancel()
for task in self.tasks.values():
self.scheduler.cancel(task)
self.scheduler._task.cancel()
async def _cleanup(self):
while True:
await asyncio.sleep(60)
for name, (cue, _) in list(self.cues.items()):
if cue.is_obsolete():
del self.cues[name]
if task := self.tasks.get(name):
self.scheduler.cancel(task)
del self.tasks[name]

View file

@ -1,14 +0,0 @@
from ovtk_audiencekit.plugins import PluginBase
import logging
level_names = ['critical', 'error', 'warning', 'info', 'debug']
class LogPlugin(PluginBase):
def run(self, msg, level="info", **kwargs):
try:
int_level = next((getattr(logging, level_name.upper()) for level_name in level_names if level_name.startswith(level.lower())))
except StopIteration:
self.logger.debug(f'Using default log level for KDL log call since user level "{level}" is not recognized')
int_level = logging.INFO
self.logger.log(int_level, msg)

View file

@ -1,61 +0,0 @@
import asyncio
import mido
from ovtk_audiencekit.plugins import PluginBase
def matches(msg, attrs):
for attr, match_val in attrs.items():
msg_val = getattr(msg, attr)
if msg_val != match_val:
return False
return True
class MidiPlugin(PluginBase):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
loop = asyncio.get_event_loop()
def callback(msg):
asyncio.run_coroutine_threadsafe(self.recv_callback(msg), loop)
self.output_port = mido.open_output()
self.input_port = mido.open_input(callback=callback)
self.listeners = {
'note_off': [],
'note_on': [],
'control_change': [],
'program_change': [],
'sysex': [],
'song_select': [],
'start': [],
'continue': [],
'stop': [],
}
def close(self):
self.input_port.close()
self.output_port.close()
def run(self, type, _ctx={}, _children=None, **kwargs):
if type == 'sysex':
data = kwargs['data']
msg = mido.Message('sysex', data=bytes(data, encoding='utf-8'), time=0)
else:
msg = mido.Message(type, **kwargs, time=0)
self.output_port.send(msg)
async def recv_callback(self, msg):
if hasattr(msg, 'channel'):
msg.channel += 1 # Channels in mido are 0-15, but in spec are 1-16. Adjust to spec
self.logger.debug(f"Recv: {msg}")
for params, handler, ctx in self.listeners[msg.type]:
if matches(msg, params):
_ctx = {**ctx, 'midi': msg}
await handler(_ctx)
def listen(self, type, _ctx={}, _children=None, **kwargs):
kwargs = {k:int(v) for k, v in kwargs.items()}
handler = lambda ctx: self.execute_kdl(_children, _ctx=ctx)
self.listeners[type].append((kwargs, handler, _ctx))

View file

@ -1,153 +0,0 @@
from dataclasses import dataclass, field
import asyncio
from typing import Callable
import quart
import json
import kdl
from ovtk_audiencekit.plugins import PluginBase
from ovtk_audiencekit.utils import format_exception
@dataclass
class Scene:
name: str
group: str
oneshot: bool
enter: Callable
exit: Callable
entry_context: dict = field(default_factory=dict)
tasks: list[asyncio.Task] = field(default_factory=list)
class ScenePlugin(PluginBase):
"""Allows for creating modal configurations"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.scenes = {}
self.active = {}
self._scene_state_changed = asyncio.Event()
self.blueprint.add_url_rule('/', 'ctrlpanel', self.ui_ctrlpanel)
self.blueprint.add_url_rule('/<name>/<cmd>', 'api-sceneset', self.ui_setscene)
self.blueprint.add_url_rule('/monitor', 'monitor', self.ui_monitor_ws, is_websocket=True)
async def run(self, name, _children=None, _ctx={}, active=None, group=None, immediate=True, oneshot=False, **kwargs):
if _children is None and active is None:
raise UsageError('Either define a new scene or set `--active` to true / false')
if _children:
await self.define(name, group, _children, default_active=active, oneshot=oneshot, ctx=_ctx)
else:
await self.switch(name, active, is_immediate=immediate, ctx=_ctx)
async def define(self, name, group, children, default_active=False, oneshot=False, ctx={}):
if self.scenes.get(name) is not None:
raise UsageError(f'Scene with name "{name}" already exists!')
# Categorize nodes
enter_nodes = []
exit_nodes = []
for child in children:
if child.name == 'exit':
exit_nodes.extend(child.nodes)
else:
enter_nodes.append(child)
# Make transisition functions
async def enter(ctx):
await self.execute_kdl(enter_nodes, _ctx=ctx)
scene.entry_context = ctx
async def exit(ctx):
await self.execute_kdl(exit_nodes, _ctx=ctx)
scene = Scene(name, group, oneshot, enter, exit)
self.scenes[name] = scene
if default_active:
await self.switch(name, default_active, is_immediate=True, ctx=ctx)
async def switch(self, name, active, is_immediate=True, ctx={}):
scene = self.scenes.get(name)
if scene is None:
raise UsageError(f'No defined scene with name "{name}"')
if scene.oneshot:
await self._execute(scene, 'enter', is_immediate, ctx)
elif active:
if current := self.active.get(scene.group):
if current == scene:
return
await self._execute(current, 'exit', is_immediate, ctx)
self.active[scene.group] = scene
await self._execute(scene, 'enter', is_immediate, ctx)
else:
if self.active.get(scene.group) == scene:
self.active[scene.group] = None
await self._execute(scene, 'exit', is_immediate, ctx)
self._scene_state_changed.set()
self._scene_state_changed.clear()
async def _execute(self, scene, mode, immediate, ctx):
ctx = {**ctx} # HACK: Copy to avoid leakage from previous group item exit
scene_transision_fn = getattr(scene, mode)
# Wrap to handle context at exec time
async def context_wrapper(ctx):
if mode == 'exit':
ctx = {
**scene.entry_context,
'caller_ctx': ctx,
}
await scene_transision_fn(ctx)
if mode == 'enter':
scene.entry_context = ctx
coro = context_wrapper(ctx)
# Wrap to finish any other pending tasks before running this
if len(scene.tasks):
async def exec_order_wrap(other):
await asyncio.gather(*scene.tasks)
scene.tasks = []
await other
coro = exec_order_wrap(coro)
# Run (or schedule for execution)
if immediate:
try:
await coro
except Exception as e:
self.logger.error(f'Failed to handle "{scene.name}" {mode} transistion: {e}')
self.logger.debug(format_exception(e))
else:
scene.tasks.append(asyncio.create_task(coro))
def _get_state(self):
groups = {}
for scene_name, scene in self.scenes.items():
active = self.active.get(scene.group) == scene
group = scene.group or "default group"
if groups.get(group) is None:
groups[group] = {}
groups[group][scene_name] = active
return groups
async def ui_ctrlpanel(self):
groups = self._get_state()
return await quart.render_template('index.html', init_state=json.dumps(groups))
async def ui_setscene(self, name=None, cmd=None):
active = cmd == 'activate'
await self.switch(name, active, is_immediate=True)
return quart.Response(status=200)
async def ui_monitor_ws(self):
await quart.websocket.accept()
while True:
groups = self._get_state()
await quart.websocket.send(json.dumps(groups))
await self._scene_state_changed.wait()

View file

@ -1 +0,0 @@
from .Plugin import ScenePlugin, Scene

View file

@ -1,85 +0,0 @@
<!DOCTYPE html>
<html lang="en" dir="ltr">
<head>
<meta charset="utf-8">
<title>Test page</title>
<script type="importmap">
{
"imports": { "vue": "https://unpkg.com/vue@3/dist/vue.esm-browser.js" }
}
</script>
<script type="module">
import { createApp, ref, onMounted } from 'vue'
createApp({
setup() {
const groups = ref(JSON.parse('<< init_state|safe >>'))
const inflight = ref([])
onMounted(() => {
const websock = new WebSocket('<<url_for(".monitor")>>');
websock.addEventListener('message', (msg) => {
groups.value = JSON.parse(msg.data)
inflight.value = []
})
})
const toggle = async (group_name, scene_name) => {
if (!groups.value[group_name][scene_name].oneshot) {
if (inflight.value.includes(scene_name)) return
inflight.value.push(scene_name)
}
const next_state = !groups.value[group_name][scene_name]
await fetch(`${scene_name}/${next_state ? 'activate' : 'deactivate'}`, { method: 'GET' })
}
return { groups, inflight, toggle }
},
}).mount('#app')
</script>
</head>
<body id="root">
<div id="app">
<div v-for="(group, group_name) in groups" class="group">
<h3>{{ group_name }}</h3>
<div v-for="(active, scene_name) in group" v-on:click="toggle(group_name, scene_name)"
:class="{ active, pending: inflight.includes(scene_name), scene: true }"
>
<p>{{ scene_name }}</p>
</div>
</div>
</div>
<style type="text/css">
#app {
display: flex;
flex-direction: row;
flex-wrap: wrap;
font-family: sans-serif;
gap: 8px;
}
p {
text-align: center;
}
h3 {
margin-right: 1em;
}
.group {
display: flex;
flex-direction: column;
gap: 4px;
}
.scene {
padding: 12px 24px;
user-select: none;
background-color: lightgray;
flex: 1;
}
.scene.pending {
background-color: lightgoldenrodyellow;
}
.scene.active {
background-color: lightgreen;
}
</style>
</body>
</html>

View file

@ -1,40 +0,0 @@
from ovtk_audiencekit.plugins import PluginBase
from ovtk_audiencekit.core.Config import compute_dynamic
class SetPlugin(PluginBase):
"""Set arbitrary data in the local context (can be fetched with the custom arg type)"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def run(self, *args, _children=[], _ctx={}, **kwargs):
self.proc_node(_ctx, *args, _children=_children, _stack=[], **kwargs)
self.logger.debug(_ctx)
def proc_node(self, target, *args, _children=[], _stack=[], **props):
if len(args) > 0 and len(props) > 0:
raise ValueError("Cannot use both item/list and keyword forms at the same time")
if _children and (len(args) > 0 or len(props) > 0):
raise ValueError("Cannot define value as something and dict at the same time")
if len(args) > 0 and len(_stack) == 0:
raise ValueError("Cannot use item/list short form on top level set")
if len(props) > 0:
for key, value in props.items():
if target.get(key) is not None:
fullkey = '.'.join([n for n, t in _stack] + [key])
self.logger.debug(f'Shadowing {fullkey}')
target[key] = value
elif _children:
for child in _children:
sub = dict()
target[child.name] = sub
stack = [*_stack, (child.name, target)]
key = '.'.join(s for s, t in stack)
args, props = compute_dynamic(child, _ctx=stack[0][1])
self.proc_node(sub, *args, _children=child.nodes, _stack=stack, **props)
elif len(args) > 0:
name, target = _stack[-1]
target[name] = args[0] if len(args) == 1 else args

View file

@ -1,4 +0,0 @@
import traceback as traceback_lib
def format_exception(e, traceback=True):
return ''.join(traceback_lib.format_exception(None, e, e.__traceback__ if traceback else None))