Compare commits
1 Commits
main
...
feat/bette
Author | SHA1 | Date |
---|---|---|
Derek | ce9ec54a4e |
|
@ -1,7 +1,4 @@
|
||||||
.venv/
|
|
||||||
.pdm-python
|
|
||||||
.pdm-build/
|
|
||||||
__pycache__/
|
|
||||||
/dist/
|
|
||||||
*.secret*
|
*.secret*
|
||||||
secrets.kdl
|
secrets.kdl
|
||||||
|
__pycache__/
|
||||||
|
/*-log
|
||||||
|
|
|
@ -0,0 +1,42 @@
|
||||||
|
[[source]]
|
||||||
|
url = "https://pypi.org/simple"
|
||||||
|
verify_ssl = true
|
||||||
|
name = "pypi"
|
||||||
|
|
||||||
|
[packages]
|
||||||
|
requests = "*"
|
||||||
|
websockets = "*"
|
||||||
|
miniirc = "*"
|
||||||
|
num2words = "*"
|
||||||
|
pyaudio = "==0.2.*"
|
||||||
|
numpy = "*"
|
||||||
|
click = "*"
|
||||||
|
owoify-py = "==2.*"
|
||||||
|
kdl-py = "*"
|
||||||
|
maya = "*"
|
||||||
|
multipledispatch = "*"
|
||||||
|
blessed = "*"
|
||||||
|
appdirs = "*"
|
||||||
|
watchdog = "*"
|
||||||
|
mido = "*"
|
||||||
|
python-rtmidi = "*"
|
||||||
|
librosa = "==0.8.*"
|
||||||
|
pytsmod = "*"
|
||||||
|
quart = "==0.17.*"
|
||||||
|
aioscheduler = "*"
|
||||||
|
TTS = "==0.9.*"
|
||||||
|
torch = "==1.13.*"
|
||||||
|
simpleobsws = "*"
|
||||||
|
python-socketio = {extras = ["client"], version = "*"}
|
||||||
|
aioprocessing = "*"
|
||||||
|
pytchat = { git="https://github.com/KaitoCross/pytchat.git" }
|
||||||
|
|
||||||
|
[requires]
|
||||||
|
python_version = "3.10"
|
||||||
|
|
||||||
|
[dev-packages]
|
||||||
|
pipenv-setup = "*"
|
||||||
|
|
||||||
|
[scripts]
|
||||||
|
start = "python audiencekit.py start"
|
||||||
|
ws = "python audiencekit.py ws"
|
File diff suppressed because it is too large
Load Diff
14
README.md
14
README.md
|
@ -48,13 +48,13 @@ Beside the plugin system, audiencekit's biggest feature is that it streams the e
|
||||||
## Manual / Development Install
|
## Manual / Development Install
|
||||||
1. Install dependencies
|
1. Install dependencies
|
||||||
|
|
||||||
You'll need Python 3.10, PDM, and some audio libraries. Not to fear though, there is [instructions on how to do that for your specific OS!](https://git.skeh.site/skeh/ovtk_audiencekit/wiki/Dependency-Setup)
|
You'll need Python 3.10 or above, pipenv, JACK and PortAudio libraries. Not to fear though, there is [instructions on how to do that for your specific OS!](https://git.skeh.site/skeh/ovtk_audiencekit/wiki/Dependency-Setup)
|
||||||
|
|
||||||
2. Download the repository and open a terminal inside it
|
2. Download the repository and open a terminal inside it
|
||||||
|
|
||||||
Extract the [tar](https://git.skeh.site/skeh/ovtk_audiencekit/archive/main.tar.gz) / [zip](https://git.skeh.site/skeh/ovtk_audiencekit/archive/main.zip), or clone via `git`~
|
Extract the [tar](https://git.skeh.site/skeh/ovtk_audiencekit/archive/main.tar.gz) / [zip](https://git.skeh.site/skeh/ovtk_audiencekit/archive/main.zip), or clone via `git`~
|
||||||
|
|
||||||
3. Run `pdm sync`
|
3. Run `pipenv install`
|
||||||
|
|
||||||
Some dependencies may fail to install initially, but should be retried using a method that works automatically (your terminal should show "Retrying initially failed dependencies..." or similar).
|
Some dependencies may fail to install initially, but should be retried using a method that works automatically (your terminal should show "Retrying initially failed dependencies..." or similar).
|
||||||
|
|
||||||
|
@ -68,15 +68,15 @@ If you want some more tutorializing, head on over to the [Wiki](https://git.skeh
|
||||||
+ Read up a bit on [KDL's syntax][kdl] if you haven't already
|
+ Read up a bit on [KDL's syntax][kdl] if you haven't already
|
||||||
+ Crack open your favorite text editor and follow the instructions in the `config.kdl` file
|
+ Crack open your favorite text editor and follow the instructions in the `config.kdl` file
|
||||||
+ Open a terminal in the project location and start it using either:
|
+ Open a terminal in the project location and start it using either:
|
||||||
+ `pdm run start`, or
|
+ `pipenv run start`, or
|
||||||
+ [activate the venv](https://pdm.fming.dev/latest/usage/venv/#activate-a-virtualenv) once per terminal session, and then use `./audiencekit.py start` thereafter. This is recommended for development usage.
|
+ `pipenv shell` once per terminal session, and then `./audiencekit.py start` thereafter
|
||||||
+ Test your configuration using the `ws` subcommand (`pdm run ws` or `./audiencekit.py ws`)
|
+ Test your configuration using the `ws` subcommand (`pipenv run ws` or `./audiencekit.py ws`)
|
||||||
+ The built-in CLI will help for this and more! Just add "--help" to the end of anything, ie `./audiencekit.py --help`, `pdm run ws mkevent --help`, etc
|
+ The built-in CLI will help for this and more! Just add "--help" to the end of anything, ie `./audiencekit.py --help`, `pipenv run ws mkevent --help`, etc
|
||||||
+ Make mistakes and [ask questions](https://birb.space/@skeh)!
|
+ Make mistakes and [ask questions](https://birb.space/@skeh)!
|
||||||
|
|
||||||
-----------
|
-----------
|
||||||
|
|
||||||
Made with :heart: by the [Vtopia Collective][vtopia] and contributors
|
Made with :heart: by the [Vtopia Collective][vtopia] and contributers
|
||||||
|
|
||||||
|
|
||||||
[kdl]: https://kdl.dev
|
[kdl]: https://kdl.dev
|
||||||
|
|
|
@ -1,6 +1,4 @@
|
||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
import sys
|
|
||||||
sys.path.insert(0, 'src')
|
|
||||||
from ovtk_audiencekit.cli import cli
|
from ovtk_audiencekit.cli import cli
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
|
|
|
@ -1,12 +1,12 @@
|
||||||
from abc import ABC, abstractmethod
|
from abc import ABC, abstractmethod
|
||||||
from multiprocessing import Process, Pipe, Manager
|
from multiprocessing import Process, Pipe, Manager
|
||||||
|
from traceback import format_exception
|
||||||
import sys
|
import sys
|
||||||
import os
|
import os
|
||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
from ovtk_audiencekit.events import Event
|
from ovtk_audiencekit.events import Event
|
||||||
from ovtk_audiencekit.utils import format_exception
|
|
||||||
|
|
||||||
|
|
||||||
class GracefulShutdownException(Exception):
|
class GracefulShutdownException(Exception):
|
||||||
|
@ -133,7 +133,7 @@ class ChatProcess(Process, ABC):
|
||||||
return 0
|
return 0
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.logger.error(f'Uncaught exception in {self._name}: {e}')
|
self.logger.error(f'Uncaught exception in {self._name}: {e}')
|
||||||
self.logger.debug(format_exception(e))
|
self.logger.debug(''.join(format_exception(None, e, e.__traceback__)))
|
||||||
return 1
|
return 1
|
||||||
finally:
|
finally:
|
||||||
self.on_exit()
|
self.on_exit()
|
|
@ -0,0 +1,72 @@
|
||||||
|
import time
|
||||||
|
from enum import Enum, auto
|
||||||
|
from itertools import chain
|
||||||
|
|
||||||
|
from ovtk_audiencekit.chats import ChatProcess
|
||||||
|
from ovtk_audiencekit.events import Message, SysMessage, USER_TYPE
|
||||||
|
|
||||||
|
import pytchat
|
||||||
|
|
||||||
|
|
||||||
|
class STATES(Enum):
|
||||||
|
WAITING = auto()
|
||||||
|
READING = auto()
|
||||||
|
|
||||||
|
|
||||||
|
class YoutubeProcess(ChatProcess):
|
||||||
|
def __init__(self, *args, video_id=None, **kwargs):
|
||||||
|
super().__init__(*args, **kwargs)
|
||||||
|
|
||||||
|
self._state_machine = self.bind_to_states(STATES)
|
||||||
|
self._video_id = video_id
|
||||||
|
|
||||||
|
self.state = STATES.CONNECTING
|
||||||
|
self._chat = pytchat.create(video_id=self._video_id)
|
||||||
|
|
||||||
|
def normalize_event(self, event):
|
||||||
|
user_type = USER_TYPE.USER
|
||||||
|
if event.author.isChatOwner:
|
||||||
|
user_type = USER_TYPE.OWNER
|
||||||
|
elif event.author.isChatModerator:
|
||||||
|
user_type = USER_TYPE.MODERATOR
|
||||||
|
elif event.author.isChatSponsor:
|
||||||
|
user_type = USER_TYPE.PATRON
|
||||||
|
|
||||||
|
msg = Message(self._name, event.message,
|
||||||
|
event.author.name, event.author.channelId, user_type,
|
||||||
|
id=event.id)
|
||||||
|
return msg
|
||||||
|
|
||||||
|
def loop(self, next_state):
|
||||||
|
return self._state_machine(self.state, next_state)
|
||||||
|
|
||||||
|
def on_state_enter(self, new_state):
|
||||||
|
status_messages = {
|
||||||
|
STATES.READING: 'Connected to channel!',
|
||||||
|
}
|
||||||
|
|
||||||
|
message = status_messages.get(new_state)
|
||||||
|
if message is not None:
|
||||||
|
sys_msg = SysMessage(self._name, message)
|
||||||
|
self.publish(sys_msg)
|
||||||
|
|
||||||
|
def on_waiting(self, next_state):
|
||||||
|
if self._chat.is_alive():
|
||||||
|
return STATES.READING
|
||||||
|
return 5
|
||||||
|
|
||||||
|
def on_reading(self, next_state):
|
||||||
|
try:
|
||||||
|
if not self._chat.is_alive():
|
||||||
|
return STATES.TIMEOUT
|
||||||
|
|
||||||
|
for raw_event in self._chat.get().sync_items():
|
||||||
|
normalized_event = self.normalize_event(raw_event)
|
||||||
|
self.publish(normalized_event)
|
||||||
|
|
||||||
|
return 0
|
||||||
|
except TimeoutError:
|
||||||
|
return STATES.TIMEOUT
|
||||||
|
|
||||||
|
def on_failure(self, next_state):
|
||||||
|
return None
|
|
@ -1,5 +1,4 @@
|
||||||
import logging
|
import logging
|
||||||
import warnings
|
|
||||||
|
|
||||||
import click
|
import click
|
||||||
from blessed import Terminal
|
from blessed import Terminal
|
||||||
|
@ -16,11 +15,8 @@ class CustomFormatter(logging.Formatter):
|
||||||
logging.CRITICAL: term.white_on_red_bold
|
logging.CRITICAL: term.white_on_red_bold
|
||||||
}
|
}
|
||||||
|
|
||||||
def __init__(self, show_time, show_loc):
|
def __init__(self, show_time):
|
||||||
if show_loc:
|
format = "%(levelname)s:%(name)s (%(filename)s:%(lineno)d): %(message)s"
|
||||||
format = "%(levelname)s:%(name)s (%(filename)s:%(lineno)d): %(message)s"
|
|
||||||
else:
|
|
||||||
format = "%(levelname)s:%(name)s: %(message)s"
|
|
||||||
if show_time:
|
if show_time:
|
||||||
format = "%(asctime)s:" + format
|
format = "%(asctime)s:" + format
|
||||||
super().__init__(format)
|
super().__init__(format)
|
||||||
|
@ -47,7 +43,7 @@ def cli(loglevel, show_time=False):
|
||||||
logger.setLevel(loglevel)
|
logger.setLevel(loglevel)
|
||||||
log_handler = logging.StreamHandler()
|
log_handler = logging.StreamHandler()
|
||||||
log_handler.setLevel(loglevel)
|
log_handler.setLevel(loglevel)
|
||||||
log_handler.setFormatter(CustomFormatter(show_time, loglevel==logging.DEBUG))
|
log_handler.setFormatter(CustomFormatter(show_time))
|
||||||
logger.addHandler(log_handler)
|
logger.addHandler(log_handler)
|
||||||
# Quiet the depencency loggers
|
# Quiet the depencency loggers
|
||||||
logging.getLogger('websockets.server').setLevel(logging.WARN)
|
logging.getLogger('websockets.server').setLevel(logging.WARN)
|
||||||
|
@ -57,8 +53,3 @@ def cli(loglevel, show_time=False):
|
||||||
logging.getLogger('simpleobsws').setLevel(logging.INFO)
|
logging.getLogger('simpleobsws').setLevel(logging.INFO)
|
||||||
logging.getLogger('quart.serving').setLevel(logging.WARN)
|
logging.getLogger('quart.serving').setLevel(logging.WARN)
|
||||||
logging.getLogger('numba').setLevel(logging.WARN)
|
logging.getLogger('numba').setLevel(logging.WARN)
|
||||||
logging.getLogger('hypercorn.error').setLevel(logging.WARN)
|
|
||||||
logging.getLogger('hypercorn.access').setLevel(logging.WARN)
|
|
||||||
# Quiet warnings
|
|
||||||
if loglevel > logging.DEBUG:
|
|
||||||
warnings.filterwarnings("ignore")
|
|
|
@ -0,0 +1,20 @@
|
||||||
|
import logging
|
||||||
|
|
||||||
|
import click
|
||||||
|
|
||||||
|
from ovtk_audiencekit.core import MainProcess
|
||||||
|
from .group import cli
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
@cli.command()
|
||||||
|
@click.argument('config_file', type=click.Path('r'), default='config.kdl')
|
||||||
|
@click.option('--port', default='8080')
|
||||||
|
@click.option('--bind', default='127.0.0.1')
|
||||||
|
def start(config_file, port=None, bind=None):
|
||||||
|
"""Start audiencekit server"""
|
||||||
|
logger.info('Hewwo!!')
|
||||||
|
main = MainProcess(config_file, port, bind)
|
||||||
|
main.run()
|
||||||
|
logger.info('Bye bye~')
|
|
@ -95,7 +95,6 @@ class Clip:
|
||||||
self._stream.start_stream()
|
self._stream.start_stream()
|
||||||
|
|
||||||
def play(self):
|
def play(self):
|
||||||
self._end_event.clear()
|
|
||||||
self._play()
|
self._play()
|
||||||
self._end_event.wait(timeout=self.length)
|
self._end_event.wait(timeout=self.length)
|
||||||
|
|
|
@ -2,7 +2,24 @@ import os
|
||||||
import random
|
import random
|
||||||
|
|
||||||
import appdirs
|
import appdirs
|
||||||
|
from kdl import ParseConfig
|
||||||
|
|
||||||
|
def csv_parser(text, fragment):
|
||||||
|
text = fragment.fragment[1:-1]
|
||||||
|
return [field.strip() for field in text.split(',')]
|
||||||
|
|
||||||
|
def semisv_parser(text, fragment):
|
||||||
|
text = fragment.fragment[1:-1]
|
||||||
|
return [field.strip() for field in text.split(';')]
|
||||||
|
|
||||||
|
customParsers = {
|
||||||
|
'list': csv_parser,
|
||||||
|
'csv': csv_parser,
|
||||||
|
'semisv': semisv_parser,
|
||||||
|
}
|
||||||
|
|
||||||
|
kdl_parse_config = ParseConfig(valueConverters=customParsers)
|
||||||
|
kdl_reserved = ['secrets', 'chat', 'plugin', 'import']
|
||||||
|
|
||||||
CACHE_DIR = appdirs.user_cache_dir('audiencekit', 'ovtk')
|
CACHE_DIR = appdirs.user_cache_dir('audiencekit', 'ovtk')
|
||||||
DATA_DIR = appdirs.user_data_dir('audiencekit', 'ovtk')
|
DATA_DIR = appdirs.user_data_dir('audiencekit', 'ovtk')
|
|
@ -0,0 +1,241 @@
|
||||||
|
import importlib
|
||||||
|
from multiprocessing import Lock
|
||||||
|
import asyncio
|
||||||
|
from datetime import datetime, timedelta
|
||||||
|
from traceback import format_exception
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
|
||||||
|
import kdl
|
||||||
|
from click import progressbar
|
||||||
|
from aioscheduler import TimedScheduler
|
||||||
|
from quart import Quart
|
||||||
|
|
||||||
|
from ovtk_audiencekit.core import WebsocketServerProcess
|
||||||
|
from ovtk_audiencekit.core.Config import kdl_parse_config, kdl_reserved
|
||||||
|
from ovtk_audiencekit.events import Event, Delete
|
||||||
|
from ovtk_audiencekit.chats.ChatProcess import ShutdownRequest
|
||||||
|
from ovtk_audiencekit.plugins import builtins
|
||||||
|
from ovtk_audiencekit.plugins.PluginBase import PluginError
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
def parse_kdl_deep(path, relativeto=None):
|
||||||
|
if relativeto:
|
||||||
|
path = os.path.normpath(os.path.join(relativeto, path))
|
||||||
|
|
||||||
|
with open(path, 'r') as f:
|
||||||
|
try:
|
||||||
|
config = kdl.parse(f.read(), kdl_parse_config)
|
||||||
|
except kdl.errors.ParseError as e:
|
||||||
|
e.file = path
|
||||||
|
raise e
|
||||||
|
|
||||||
|
for node in config.nodes:
|
||||||
|
if node.name == 'import':
|
||||||
|
yield from parse_kdl_deep(node.args[0], relativeto=os.path.dirname(path))
|
||||||
|
else:
|
||||||
|
yield node
|
||||||
|
|
||||||
|
|
||||||
|
class MainProcess:
|
||||||
|
def __init__(self, config_path, port, bind):
|
||||||
|
super().__init__()
|
||||||
|
self.config_path = config_path
|
||||||
|
self.port = port
|
||||||
|
self.bind = bind
|
||||||
|
|
||||||
|
self.chat_processes = {}
|
||||||
|
self.plugins = {}
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def get_external_module_names(node, store):
|
||||||
|
if len(node.args) == 0:
|
||||||
|
raise ValueError(f'Invalid arguments - usage: {node.name} module')
|
||||||
|
nameparts = node.args[0].split(':')
|
||||||
|
module_name = nameparts[0]
|
||||||
|
instance_name = nameparts[1] if len(nameparts) == 2 else module_name
|
||||||
|
if store.get(instance_name):
|
||||||
|
if instance_name != module_name:
|
||||||
|
raise ValueError(f"Multiple nodes named {instance_name}, please specify unique names as the second argument")
|
||||||
|
else:
|
||||||
|
raise ValueError(f"Multiple definitions of {instance_name} exist, please specify unique names as the second argument")
|
||||||
|
return module_name, instance_name
|
||||||
|
|
||||||
|
async def handle_events(self):
|
||||||
|
while True:
|
||||||
|
event = await self.event_queue.get()
|
||||||
|
logger.info(event)
|
||||||
|
|
||||||
|
if isinstance(event, Event):
|
||||||
|
for plugin_name, plugin in list(self.plugins.items()):
|
||||||
|
try:
|
||||||
|
event = plugin.on_bus_event(event)
|
||||||
|
if asyncio.iscoroutinefunction(plugin.on_bus_event):
|
||||||
|
event = await event
|
||||||
|
except PluginError as e:
|
||||||
|
if e.fatal:
|
||||||
|
logger.critical(f'Failure when processing {e.source} ({e}) - disabling...')
|
||||||
|
else:
|
||||||
|
logger.warning(f'Encounterd error when processing {e.source} ({e})')
|
||||||
|
logger.debug(''.join(format_exception(None, e, e.__traceback__)))
|
||||||
|
if e.fatal:
|
||||||
|
self.plugins[e.source].__del__()
|
||||||
|
del self.plugins[e.source]
|
||||||
|
except Exception as e:
|
||||||
|
logger.critical(f'Failure when processing {plugin_name} ({e}) - disabling...')
|
||||||
|
logger.debug(''.join(format_exception(None, e, e.__traceback__)))
|
||||||
|
self.plugins[plugin_name].__del__()
|
||||||
|
del self.plugins[plugin_name]
|
||||||
|
if event is None:
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
self.server_process.message_pipe.send(event)
|
||||||
|
logger.debug(f'Event after plugin chain - {event}')
|
||||||
|
elif isinstance(event, Delete):
|
||||||
|
self.server_process.message_pipe.send(event)
|
||||||
|
else:
|
||||||
|
logger.error(f'Unknown data in event loop - {event}')
|
||||||
|
|
||||||
|
def setup(self):
|
||||||
|
config = kdl.Document(list(parse_kdl_deep(self.config_path)))
|
||||||
|
|
||||||
|
stdin_lock = Lock()
|
||||||
|
# Load secrets
|
||||||
|
secrets = {}
|
||||||
|
if node := config.get('secrets'):
|
||||||
|
for module in node.nodes:
|
||||||
|
fields = secrets.get(module.name, {})
|
||||||
|
for node in module.nodes:
|
||||||
|
fields[node.name] = node.args[0] if len(node.args) == 1 else node.args
|
||||||
|
secrets[module.name] = fields
|
||||||
|
|
||||||
|
# Dynamically import chats
|
||||||
|
with progressbar(list(config.getAll('chat')), label="Preparing modules (chats)", item_show_func=lambda i: i and i.args[0]) as bar:
|
||||||
|
for node in bar:
|
||||||
|
module_name, chat_name = self.get_external_module_names(node, self.chat_processes)
|
||||||
|
secrets_for_mod = secrets.get(module_name, {})
|
||||||
|
|
||||||
|
try:
|
||||||
|
chat_module = importlib.import_module(f'.{module_name}', package='ovtk_audiencekit.chats')
|
||||||
|
chat_process = chat_module.Process(stdin_lock, chat_name, **node.props, **secrets_for_mod)
|
||||||
|
self.chat_processes[chat_name] = chat_process
|
||||||
|
except Exception as e:
|
||||||
|
raise ValueError(f'Failed to initalize {module_name} module "{chat_name}" - {e}')
|
||||||
|
|
||||||
|
if len(self.chat_processes.keys()) == 0:
|
||||||
|
logger.warning('No chats configured!')
|
||||||
|
|
||||||
|
# Start chat processes
|
||||||
|
for process in self.chat_processes.values():
|
||||||
|
process.start()
|
||||||
|
|
||||||
|
# Load plugins
|
||||||
|
self.event_queue = asyncio.Queue()
|
||||||
|
## Builtins
|
||||||
|
for node_name in builtins.__all__:
|
||||||
|
self.plugins[node_name] = builtins.__dict__[node_name](self.chat_processes, self.event_queue, node_name)
|
||||||
|
## Dynamic
|
||||||
|
with progressbar(list(config.getAll('plugin')), label="Preparing modules (plugins)", item_show_func=lambda i: i and i.args[0]) as bar:
|
||||||
|
for node in bar:
|
||||||
|
module_name, plugin_name = self.get_external_module_names(node, self.plugins)
|
||||||
|
secrets_for_mod = secrets.get(module_name, {})
|
||||||
|
|
||||||
|
try:
|
||||||
|
plugin_module = importlib.import_module(f'.{module_name}', package='ovtk_audiencekit.plugins')
|
||||||
|
plugin = plugin_module.Plugin(self.chat_processes, self.event_queue, plugin_name,
|
||||||
|
**node.props, **secrets_for_mod, _children=node.nodes)
|
||||||
|
self.plugins[plugin_name] = plugin
|
||||||
|
|
||||||
|
# Register UI with webserver
|
||||||
|
self.webserver.register_blueprint(plugin.blueprint, url_prefix=f'/{plugin_name}')
|
||||||
|
except Exception as e:
|
||||||
|
raise ValueError(f'Failed to initalize {module_name} plugin "{plugin_name}" - {e}')
|
||||||
|
|
||||||
|
# Run plugin definitions
|
||||||
|
with progressbar(list(config.nodes), label=f"Executing {self.config_path}") as bar:
|
||||||
|
for node in bar:
|
||||||
|
if node.name in kdl_reserved:
|
||||||
|
continue
|
||||||
|
plugin_name = node.name
|
||||||
|
plugin_module = self.plugins.get(plugin_name)
|
||||||
|
if plugin_module is None:
|
||||||
|
logger.error(f'Unknown plugin: {node.name}')
|
||||||
|
else:
|
||||||
|
asyncio.get_event_loop().run_until_complete(plugin_module._run(*node.args, **node.props, _children=node.nodes))
|
||||||
|
|
||||||
|
# Register watchable handles
|
||||||
|
self.pipes = [process.event_pipe for process in self.chat_processes.values()]
|
||||||
|
self.pipes.append(self.server_process.message_pipe)
|
||||||
|
|
||||||
|
async def tick_plugins(self):
|
||||||
|
for plugin in self.plugins.values():
|
||||||
|
res = plugin.tick(0.5)
|
||||||
|
if asyncio.iscoroutinefunction(plugin.tick):
|
||||||
|
await res
|
||||||
|
self._skehdule.schedule(self.tick_plugins(), datetime.utcnow() + timedelta(seconds=0.5))
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
# Start websocket server
|
||||||
|
self.server_process = WebsocketServerProcess(self.port, self.bind)
|
||||||
|
self.server_process.start()
|
||||||
|
|
||||||
|
self.webserver = Quart(__name__)
|
||||||
|
|
||||||
|
loop = asyncio.get_event_loop()
|
||||||
|
sys_tasks = []
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Do initial setup
|
||||||
|
self.setup()
|
||||||
|
self._skehdule = TimedScheduler()
|
||||||
|
|
||||||
|
sys_tasks.append(loop.create_task(self.tick_plugins()))
|
||||||
|
sys_tasks.append(loop.create_task(self.handle_events()))
|
||||||
|
|
||||||
|
async def start_scheduler():
|
||||||
|
self._skehdule.start()
|
||||||
|
sys_tasks.append(loop.create_task(start_scheduler()))
|
||||||
|
|
||||||
|
async def start_uiwebserver():
|
||||||
|
try:
|
||||||
|
# HACK: eats the KeyboardInterrupt - maybe others too
|
||||||
|
await self.webserver.run_task()
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
pass
|
||||||
|
except Exception as e:
|
||||||
|
logger.critical(f'Failure in web process - {e}')
|
||||||
|
logger.debug(''.join(format_exception(None, e, e.__traceback__)))
|
||||||
|
finally:
|
||||||
|
raise KeyboardInterrupt()
|
||||||
|
sys_tasks.append(loop.create_task(start_uiwebserver()))
|
||||||
|
|
||||||
|
def get_event(pipe):
|
||||||
|
event = pipe.recv()
|
||||||
|
self.event_queue.put_nowait(event)
|
||||||
|
for pipe in self.pipes:
|
||||||
|
# REVIEW: This does not work on windows!!!!
|
||||||
|
loop.add_reader(pipe.fileno(), lambda pipe=pipe: get_event(pipe))
|
||||||
|
|
||||||
|
logger.info('Ready to rumble!')
|
||||||
|
loop.run_forever()
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
pass
|
||||||
|
except kdl.errors.ParseError as e:
|
||||||
|
if (e.file):
|
||||||
|
logger.critical(f'Invalid configuration in {e.file} at {e.line}:{e.col} - {e.msg}')
|
||||||
|
else:
|
||||||
|
logger.critical(f'Invalid configuration - {e.msg}')
|
||||||
|
except Exception as e:
|
||||||
|
logger.critical(f'Failure in core process - {e}')
|
||||||
|
logger.debug(''.join(format_exception(None, e, e.__traceback__)))
|
||||||
|
finally:
|
||||||
|
logger.warn('Closing up shop...')
|
||||||
|
for task in sys_tasks:
|
||||||
|
task.cancel()
|
||||||
|
for process in self.chat_processes.values():
|
||||||
|
process.control_pipe.send(ShutdownRequest('root'))
|
||||||
|
process.join(5)
|
||||||
|
if process.exitcode is None:
|
||||||
|
process.terminate()
|
||||||
|
self.server_process.terminate()
|
|
@ -12,7 +12,7 @@ logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class WebsocketServerProcess(Process):
|
class WebsocketServerProcess(Process):
|
||||||
def __init__(self, bind, port):
|
def __init__(self, port, bind):
|
||||||
super().__init__()
|
super().__init__()
|
||||||
|
|
||||||
self._bind = bind
|
self._bind = bind
|
|
@ -8,7 +8,7 @@ from requests.exceptions import HTTPError
|
||||||
from owoify.owoify import owoify, Owoness
|
from owoify.owoify import owoify, Owoness
|
||||||
|
|
||||||
from ovtk_audiencekit.plugins import PluginBase
|
from ovtk_audiencekit.plugins import PluginBase
|
||||||
from ovtk_audiencekit.core.Data import CACHE_DIR
|
from ovtk_audiencekit.core.Config import CACHE_DIR
|
||||||
from ovtk_audiencekit.plugins.builtins.Command import Command, CommandTypes
|
from ovtk_audiencekit.plugins.builtins.Command import Command, CommandTypes
|
||||||
from ovtk_audiencekit.events.Message import Message, SysMessage
|
from ovtk_audiencekit.events.Message import Message, SysMessage
|
||||||
|
|
|
@ -3,7 +3,7 @@ import logging
|
||||||
import json
|
import json
|
||||||
import os
|
import os
|
||||||
|
|
||||||
from ovtk_audiencekit.core.Data import CACHE_DIR
|
from ovtk_audiencekit.core.Config import CACHE_DIR
|
||||||
from ovtk_audiencekit.plugins import PluginBase
|
from ovtk_audiencekit.plugins import PluginBase
|
||||||
from ovtk_audiencekit.events import Message
|
from ovtk_audiencekit.events import Message
|
||||||
|
|
|
@ -0,0 +1,133 @@
|
||||||
|
from dataclasses import asdict
|
||||||
|
from abc import ABC, abstractmethod
|
||||||
|
from functools import reduce
|
||||||
|
from operator import getitem
|
||||||
|
from string import Formatter
|
||||||
|
import logging
|
||||||
|
import asyncio
|
||||||
|
|
||||||
|
import kdl
|
||||||
|
from quart import Blueprint
|
||||||
|
|
||||||
|
from ovtk_audiencekit.core.Config import kdl_parse_config
|
||||||
|
|
||||||
|
|
||||||
|
class PluginError(Exception):
|
||||||
|
def __init__(self, source, message, fatal=True):
|
||||||
|
self.source = source
|
||||||
|
self.message = message
|
||||||
|
self.fatal = fatal
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
return self.message
|
||||||
|
|
||||||
|
|
||||||
|
class GetitemFormatter(Formatter):
|
||||||
|
def get_field(self, field_name, args, kwargs):
|
||||||
|
keys = field_name.split('.')
|
||||||
|
field = reduce(getitem, keys, kwargs)
|
||||||
|
return (field, keys[0])
|
||||||
|
|
||||||
|
|
||||||
|
class PluginBase(ABC):
|
||||||
|
plugins = {}
|
||||||
|
|
||||||
|
def __init__(self, chat_processes, event_queue, name, _children=None, **kwargs):
|
||||||
|
super().__init__(**kwargs)
|
||||||
|
self.chats = chat_processes
|
||||||
|
self._event_queue = event_queue
|
||||||
|
self._name = name
|
||||||
|
|
||||||
|
self.logger = logging.getLogger(f'plugin.{self._name}')
|
||||||
|
self.plugins[name] = self
|
||||||
|
self.blueprint = Blueprint(self._name, __name__)
|
||||||
|
if _children:
|
||||||
|
raise ValueError('Module does not accept children')
|
||||||
|
|
||||||
|
def __del__(self):
|
||||||
|
if self.plugins.get(self._name) == self:
|
||||||
|
del self.plugins[self._name]
|
||||||
|
|
||||||
|
# Base class helpers
|
||||||
|
def broadcast(self, event):
|
||||||
|
"""Send event to every active chat"""
|
||||||
|
for proc in self.chats.values():
|
||||||
|
if proc.readonly:
|
||||||
|
continue
|
||||||
|
proc.control_pipe.send(event)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _kdl_arg_formatter(text, fragment, args):
|
||||||
|
key = fragment.fragment[1:-1]
|
||||||
|
try:
|
||||||
|
if '{' in key:
|
||||||
|
return GetitemFormatter().format(key, **args).replace(r'\"', '"')
|
||||||
|
else:
|
||||||
|
return reduce(getitem, key.split('.'), args)
|
||||||
|
except (KeyError, IndexError) as e:
|
||||||
|
raise ValueError(f'Invalid arg string - "{key}": {e}') from e
|
||||||
|
|
||||||
|
def fill_context(self, actionnode, ctx):
|
||||||
|
config = asdict(kdl_parse_config)
|
||||||
|
config['valueConverters'] = {
|
||||||
|
**config['valueConverters'],
|
||||||
|
'arg': lambda text, fragment, args=ctx: self._kdl_arg_formatter(text, fragment, args),
|
||||||
|
}
|
||||||
|
config = kdl.ParseConfig(**config)
|
||||||
|
|
||||||
|
newnode = kdl.parse(str(actionnode), config).nodes[0]
|
||||||
|
return newnode
|
||||||
|
|
||||||
|
async def call_plugin_from_kdl(self, node, *args, _ctx={}, **kwargs):
|
||||||
|
"""
|
||||||
|
Calls some other plugin as configured by the passed KDL node
|
||||||
|
If this was done in response to an event, pass it as event in _ctx!
|
||||||
|
"""
|
||||||
|
node = self.fill_context(node, _ctx)
|
||||||
|
target = self.plugins.get(node.name)
|
||||||
|
if target is None:
|
||||||
|
self.logger.warning(f'Could not find plugin or builtin with name {node.name}')
|
||||||
|
else:
|
||||||
|
return await target._run(*node.args, *args, **node.props, _ctx=_ctx, **kwargs, _children=node.nodes)
|
||||||
|
|
||||||
|
def send_to_bus(self, event):
|
||||||
|
"""
|
||||||
|
Send an event to the event bus
|
||||||
|
WARNING: This will cause the event to be processed by other plugins - be careful not to cause an infinite loop!
|
||||||
|
"""
|
||||||
|
self._event_queue.put_nowait(event)
|
||||||
|
|
||||||
|
async def _run(self, *args, **kwargs):
|
||||||
|
try:
|
||||||
|
res = self.run(*args, **kwargs)
|
||||||
|
if asyncio.iscoroutinefunction(self.run):
|
||||||
|
return await res
|
||||||
|
else:
|
||||||
|
return res
|
||||||
|
except Exception as e:
|
||||||
|
if isinstance(e, KeyboardInterrupt):
|
||||||
|
raise e
|
||||||
|
raise PluginError(self._name, str(e)) from e
|
||||||
|
|
||||||
|
# User-defined
|
||||||
|
async def tick(self, dt):
|
||||||
|
"""Called at least every half second - perform time-dependent updates here!"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
async def on_bus_event(self, event):
|
||||||
|
"""Called for every event from the chats"""
|
||||||
|
return event
|
||||||
|
|
||||||
|
async def on_control_event(self, event):
|
||||||
|
"""
|
||||||
|
Called for events targeting this plugin name specifically.
|
||||||
|
This is normally used for other applications to communicate with this one over the websocket interface
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
async def run(self, _children=None, _ctx={}, **kwargs):
|
||||||
|
"""
|
||||||
|
Run plugin action, either due to a definition in the config, or due to another plugin
|
||||||
|
"""
|
||||||
|
pass
|
|
@ -9,7 +9,7 @@ from TTS.config import load_config
|
||||||
from ovtk_audiencekit.plugins import PluginBase
|
from ovtk_audiencekit.plugins import PluginBase
|
||||||
from ovtk_audiencekit.events import Message, SysMessage
|
from ovtk_audiencekit.events import Message, SysMessage
|
||||||
from ovtk_audiencekit.core import Clip
|
from ovtk_audiencekit.core import Clip
|
||||||
from ovtk_audiencekit.core.Data import CACHE_DIR
|
from ovtk_audiencekit.core.Config import CACHE_DIR
|
||||||
|
|
||||||
|
|
||||||
class TextToSpeechPlugin(PluginBase):
|
class TextToSpeechPlugin(PluginBase):
|
|
@ -0,0 +1,3 @@
|
||||||
|
from .PluginBase import PluginBase, PluginError
|
||||||
|
|
||||||
|
__all__ = ['PluginBase', 'PluginError']
|
|
@ -16,4 +16,5 @@ class ChancePlugin(PluginBase):
|
||||||
raise ValueError('Chance must be a string (optionally ending in %) or number')
|
raise ValueError('Chance must be a string (optionally ending in %) or number')
|
||||||
|
|
||||||
if random.random() < chance / 100:
|
if random.random() < chance / 100:
|
||||||
await self.execute_kdl(_children, _ctx=_ctx)
|
for node in _children:
|
||||||
|
await self.call_plugin_from_kdl(node, _ctx=_ctx)
|
|
@ -12,4 +12,4 @@ class ChoicePlugin(PluginBase):
|
||||||
|
|
||||||
async def run(self, _children=None, _ctx={}, **kwargs):
|
async def run(self, _children=None, _ctx={}, **kwargs):
|
||||||
node = random.choice(_children)
|
node = random.choice(_children)
|
||||||
await self.execute_kdl([node], _ctx=_ctx)
|
await self.call_plugin_from_kdl(node, _ctx=_ctx)
|
|
@ -130,7 +130,8 @@ class CommandPlugin(PluginBase):
|
||||||
args = command.parse(event.text)
|
args = command.parse(event.text)
|
||||||
self.logger.debug(f"Parsed args for {command.name}: {args}")
|
self.logger.debug(f"Parsed args for {command.name}: {args}")
|
||||||
ctx = dict(event=event, **args)
|
ctx = dict(event=event, **args)
|
||||||
await self.execute_kdl(actionnode.nodes, _ctx=ctx)
|
for node in actionnode.nodes:
|
||||||
|
await self.call_plugin_from_kdl(node, _ctx=ctx)
|
||||||
except argparse.ArgumentError as e:
|
except argparse.ArgumentError as e:
|
||||||
msg = SysMessage(self._name, f"{e}. See !help {command.name}", replies_to=event)
|
msg = SysMessage(self._name, f"{e}. See !help {command.name}", replies_to=event)
|
||||||
self.chats[event.via].send(msg)
|
self.chats[event.via].send(msg)
|
|
@ -0,0 +1,51 @@
|
||||||
|
import maya
|
||||||
|
|
||||||
|
from ovtk_audiencekit.plugins import PluginBase
|
||||||
|
|
||||||
|
|
||||||
|
class CueEvent:
|
||||||
|
def __init__(self, oneshot, at=None, **kwargs):
|
||||||
|
self.oneshot = oneshot
|
||||||
|
|
||||||
|
if at:
|
||||||
|
self._next_run = maya.parse(at)
|
||||||
|
self._interval = None
|
||||||
|
else:
|
||||||
|
self._next_run = maya.now().add(**kwargs)
|
||||||
|
self._interval = kwargs
|
||||||
|
|
||||||
|
def check(self):
|
||||||
|
if self._next_run <= maya.now():
|
||||||
|
if self._interval:
|
||||||
|
self._next_run = maya.now().add(**self._interval)
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
class CuePlugin(PluginBase):
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
super().__init__(*args, **kwargs)
|
||||||
|
self.cue_events = {}
|
||||||
|
|
||||||
|
def run(self, name=None, _children=None, **kwargs):
|
||||||
|
if not _children:
|
||||||
|
raise ValueError('Cue defined without any events')
|
||||||
|
|
||||||
|
if name is None:
|
||||||
|
name = f"cue-{len(self.cue_events.keys())}"
|
||||||
|
|
||||||
|
for eventnode in _children:
|
||||||
|
at = eventnode.args[0] if len(eventnode.args) == 1 else None
|
||||||
|
oneshot = eventnode.name in ['once', 'after']
|
||||||
|
cue_event = CueEvent(oneshot, at=at, **eventnode.props)
|
||||||
|
|
||||||
|
actions = [lambda node=node: self.call_plugin_from_kdl(node) for node in eventnode.nodes];
|
||||||
|
self.cue_events[name] = (cue_event, actions)
|
||||||
|
|
||||||
|
async def tick(self, dt):
|
||||||
|
for key, (event, actions) in list(self.cue_events.items()):
|
||||||
|
if event.check():
|
||||||
|
for action in actions:
|
||||||
|
await action()
|
||||||
|
if event.oneshot:
|
||||||
|
del self.cue_events[key]
|
|
@ -0,0 +1,17 @@
|
||||||
|
import mido
|
||||||
|
|
||||||
|
from ovtk_audiencekit.plugins import PluginBase
|
||||||
|
|
||||||
|
|
||||||
|
class MidiPlugin(PluginBase):
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
super().__init__(*args, **kwargs)
|
||||||
|
self.output_port = mido.open_output()
|
||||||
|
|
||||||
|
def run(self, type, _ctx={}, **kwargs):
|
||||||
|
if type == 'sysex':
|
||||||
|
data = kwargs['data']
|
||||||
|
msg = mido.Message('sysex', data=bytes(data, encoding='utf-8'), time=0)
|
||||||
|
self.output_port.send(msg)
|
||||||
|
else:
|
||||||
|
raise NotImplementedError('TODO: note on/off and cc')
|
|
@ -0,0 +1,24 @@
|
||||||
|
from ovtk_audiencekit.plugins import PluginBase
|
||||||
|
|
||||||
|
|
||||||
|
class RememberPlugin(PluginBase):
|
||||||
|
"""Saves the response of other plugins in the local context (can be fetched with the custom arg type)"""
|
||||||
|
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
super().__init__(*args, **kwargs)
|
||||||
|
|
||||||
|
async def run(self, name, _children=None, _ctx={}, **kwargs):
|
||||||
|
if _children is None:
|
||||||
|
self.logger.warn('No children - this does nothing!')
|
||||||
|
return
|
||||||
|
|
||||||
|
responses = []
|
||||||
|
|
||||||
|
for child in _children:
|
||||||
|
res = await self.call_plugin_from_kdl(child, _ctx=_ctx)
|
||||||
|
responses.append(res)
|
||||||
|
|
||||||
|
if len(responses) == 1:
|
||||||
|
responses = responses[0]
|
||||||
|
|
||||||
|
_ctx[name] = responses
|
|
@ -108,15 +108,16 @@ class TriggerPlugin(PluginBase):
|
||||||
unknown_args[key] = value
|
unknown_args[key] = value
|
||||||
trigger = Trigger(**args, attr_checks=unknown_args)
|
trigger = Trigger(**args, attr_checks=unknown_args)
|
||||||
|
|
||||||
handler = lambda ctx: self.execute_kdl(_children, _ctx=ctx)
|
actions = [lambda ctx, node=node: self.call_plugin_from_kdl(node, _ctx=ctx) for node in _children]
|
||||||
|
|
||||||
self.triggers.append((trigger, handler, _ctx))
|
self.triggers.append((trigger, actions, _ctx))
|
||||||
|
|
||||||
async def on_bus_event(self, event):
|
async def on_bus_event(self, event):
|
||||||
for trigger, handler, ctx in self.triggers:
|
for trigger, actions, ctx in self.triggers:
|
||||||
if trigger.matches(event, self.last_msg):
|
if trigger.matches(event, self.last_msg):
|
||||||
_ctx = {**ctx, 'event': event}
|
_ctx = {**ctx, 'event': event}
|
||||||
await handler(_ctx)
|
for action in actions:
|
||||||
|
await action(_ctx)
|
||||||
if isinstance(event, Message):
|
if isinstance(event, Message):
|
||||||
self.last_msg = event
|
self.last_msg = event
|
||||||
return event
|
return event
|
|
@ -8,8 +8,6 @@ from .Chance import ChancePlugin as chance
|
||||||
from .Choice import ChoicePlugin as choice
|
from .Choice import ChoicePlugin as choice
|
||||||
from .Midi import MidiPlugin as midi
|
from .Midi import MidiPlugin as midi
|
||||||
from .WebSocket import WebSocketPlugin as ws
|
from .WebSocket import WebSocketPlugin as ws
|
||||||
from .Set import SetPlugin as set
|
from .Remember import RememberPlugin as remember
|
||||||
from .Scene import ScenePlugin as scene
|
|
||||||
from .Log import LogPlugin as log
|
|
||||||
|
|
||||||
__all__ = ['trigger', 'reply', 'command', 'cue', 'write', 'exec', 'chance', 'choice', 'midi', 'ws', 'set', 'scene', 'log']
|
__all__ = ['trigger', 'reply', 'command', 'cue', 'write', 'exec', 'chance', 'choice', 'midi', 'ws', 'remember']
|
|
@ -1,4 +1,5 @@
|
||||||
from multiprocessing import Process, Pipe
|
from multiprocessing import Process, Pipe
|
||||||
|
from traceback import format_exception
|
||||||
import logging
|
import logging
|
||||||
import asyncio
|
import asyncio
|
||||||
|
|
|
@ -1,4 +1,3 @@
|
||||||
from .NonBlockingWebsocket import NonBlockingWebsocket
|
from .NonBlockingWebsocket import NonBlockingWebsocket
|
||||||
from .make_sync import make_sync
|
from .make_sync import make_sync
|
||||||
from .get_subclasses import get_subclasses
|
from .get_subclasses import get_subclasses
|
||||||
from .format_exception import format_exception
|
|
|
@ -1,60 +0,0 @@
|
||||||
[project]
|
|
||||||
name = "ovtk_audiencekit"
|
|
||||||
version = "0.1.0"
|
|
||||||
description = ""
|
|
||||||
authors = [
|
|
||||||
{name = "Skeh", email = "im@skeh.site"},
|
|
||||||
]
|
|
||||||
dependencies = [
|
|
||||||
"click",
|
|
||||||
"kdl-py",
|
|
||||||
"quart==0.18.*",
|
|
||||||
"hypercorn",
|
|
||||||
"requests",
|
|
||||||
"websockets",
|
|
||||||
"aioprocessing",
|
|
||||||
"aioscheduler",
|
|
||||||
"pyaudio==0.2.*",
|
|
||||||
"librosa==0.8.*",
|
|
||||||
"pytsmod",
|
|
||||||
"numpy",
|
|
||||||
"multipledispatch",
|
|
||||||
"blessed",
|
|
||||||
"appdirs",
|
|
||||||
"maya",
|
|
||||||
"mido",
|
|
||||||
"python-rtmidi",
|
|
||||||
"simpleobsws",
|
|
||||||
"python-osc>=1.9.0",
|
|
||||||
]
|
|
||||||
requires-python = ">=3.10,<3.11"
|
|
||||||
readme = "README.md"
|
|
||||||
license = {text = "GPLv2"}
|
|
||||||
|
|
||||||
[project.optional-dependencies]
|
|
||||||
tts = [
|
|
||||||
"TTS==0.9.*",
|
|
||||||
"torch==1.13.*",
|
|
||||||
]
|
|
||||||
phrasecounter = ["num2words"]
|
|
||||||
jail = ["owoify-py==2.*"]
|
|
||||||
twitch = ["miniirc"]
|
|
||||||
|
|
||||||
[tool.pdm.dev-dependencies]
|
|
||||||
dev = [
|
|
||||||
"pipenv-setup",
|
|
||||||
]
|
|
||||||
|
|
||||||
[build-system]
|
|
||||||
requires = ["pdm-backend"]
|
|
||||||
build-backend = "pdm.backend"
|
|
||||||
|
|
||||||
[tool.pdm.scripts]
|
|
||||||
start = "python audiencekit.py start"
|
|
||||||
ws = "python audiencekit.py ws"
|
|
||||||
|
|
||||||
[tool.pdm]
|
|
||||||
[[tool.pdm.source]]
|
|
||||||
url = "https://pypi.org/simple"
|
|
||||||
verify_ssl = true
|
|
||||||
name = "pypi"
|
|
|
@ -1,183 +0,0 @@
|
||||||
import os
|
|
||||||
import json
|
|
||||||
import webbrowser
|
|
||||||
from enum import Enum, auto
|
|
||||||
|
|
||||||
import requests
|
|
||||||
|
|
||||||
from ovtk_audiencekit.chats import ChatProcess
|
|
||||||
from ovtk_audiencekit.events.Message import Message, SysMessage, USER_TYPE
|
|
||||||
|
|
||||||
|
|
||||||
class STATES(Enum):
|
|
||||||
WAITING_FOR_BROADCAST = auto()
|
|
||||||
POLLING = auto()
|
|
||||||
REFRESH = auto()
|
|
||||||
UNAUTHORIZED = auto()
|
|
||||||
FAILURE = auto()
|
|
||||||
|
|
||||||
|
|
||||||
class YoutubeLivePollProcess(ChatProcess):
|
|
||||||
class CONTROL_MESSAGES(str, Enum):
|
|
||||||
RESTART = "restart"
|
|
||||||
|
|
||||||
GOOGLE_OAUTH_TOKEN_URL = 'https://accounts.google.com/o/oauth2/token'
|
|
||||||
YOUTUBE_API_URL = 'https://www.googleapis.com/youtube/v3'
|
|
||||||
|
|
||||||
def __init__(self, *args, client_secrets_path=None):
|
|
||||||
if client_secrets_path is None or not os.path.exists(client_secrets_path):
|
|
||||||
raise ValueError('Missing client secrets')
|
|
||||||
|
|
||||||
with open(client_secrets_path, 'r') as f:
|
|
||||||
client_secrets = json.load(f).get('installed')
|
|
||||||
if client_secrets is None:
|
|
||||||
raise ValueError('Malformed client secrets file - missing installed section')
|
|
||||||
|
|
||||||
super().__init__(*args)
|
|
||||||
self._client_secrets = client_secrets
|
|
||||||
self._state_machine = self.bind_to_states(STATES)
|
|
||||||
|
|
||||||
self._consent_code = None
|
|
||||||
self._refresh_token = None
|
|
||||||
self._access_token = None
|
|
||||||
self._stream_title = None
|
|
||||||
self._live_chat_id = None
|
|
||||||
self._page_token = None
|
|
||||||
|
|
||||||
def loop(self, next_state):
|
|
||||||
if self.state is None:
|
|
||||||
if os.path.exists('refresh_token.secret'):
|
|
||||||
with open('refresh_token.secret', 'r') as f:
|
|
||||||
self._refresh_token = f.read()
|
|
||||||
return STATES.REFRESH, STATES.WAITING_FOR_BROADCAST
|
|
||||||
return STATES.UNAUTHORIZED, STATES.WAITING_FOR_BROADCAST
|
|
||||||
return self._state_machine(self.state, next_state)
|
|
||||||
|
|
||||||
def on_unauthorized(self, next_state):
|
|
||||||
self.request_oauth_consent()
|
|
||||||
response = self.safe_input('Allow access in your browser and paste response code here: ')
|
|
||||||
self.setup_oauth_consent(response['token'])
|
|
||||||
return next_state
|
|
||||||
|
|
||||||
def on_failure(self, next_state):
|
|
||||||
pass
|
|
||||||
|
|
||||||
def on_refresh(self, next_state):
|
|
||||||
self.get_fresh_access_token()
|
|
||||||
return next_state
|
|
||||||
|
|
||||||
def on_waiting_for_broadcast(self, next_state):
|
|
||||||
response = requests.get(f'{self.__class__.YOUTUBE_API_URL}/liveBroadcasts',
|
|
||||||
params={'part': 'snippet', 'broadcastStatus': 'active'},
|
|
||||||
headers={'Authorization': f'Bearer {self._access_token}'})
|
|
||||||
|
|
||||||
if (response.status_code == 401):
|
|
||||||
return STATES.REFRESH, self.state
|
|
||||||
|
|
||||||
items = response.json()['items']
|
|
||||||
if len(items) == 1:
|
|
||||||
stream_snippet = items[0]['snippet']
|
|
||||||
self._live_chat_id = stream_snippet['liveChatId']
|
|
||||||
self._stream_title = stream_snippet['title']
|
|
||||||
return STATES.POLLING
|
|
||||||
|
|
||||||
return 30
|
|
||||||
|
|
||||||
def on_polling(self, next_state):
|
|
||||||
response = requests.get(f'{self.__class__.YOUTUBE_API_URL}/liveChat/messages',
|
|
||||||
params={'liveChatId': self._live_chat_id,
|
|
||||||
'part': 'snippet,authorDetails',
|
|
||||||
'hl': 'en_US',
|
|
||||||
'pageToken': self._page_token},
|
|
||||||
headers={'Authorization': f'Bearer {self._access_token}'}
|
|
||||||
)
|
|
||||||
|
|
||||||
if (response.status_code == 401):
|
|
||||||
return STATES.REFRESH, self.state
|
|
||||||
|
|
||||||
if response.status_code == requests.codes.ok:
|
|
||||||
data = response.json()
|
|
||||||
self._page_token = data['nextPageToken']
|
|
||||||
if len(data['items']):
|
|
||||||
for message in data['items']:
|
|
||||||
normalized = self.normalize_message(message)
|
|
||||||
self.publish(normalized)
|
|
||||||
|
|
||||||
sleep_milis = max(data['pollingIntervalMillis'], 5000)
|
|
||||||
return sleep_milis / 1000
|
|
||||||
else:
|
|
||||||
print(response.json())
|
|
||||||
return STATES.FAILURE
|
|
||||||
|
|
||||||
def on_state_enter(self, new_state):
|
|
||||||
status_messages = {
|
|
||||||
STATES.WAITING_FOR_BROADCAST: 'Waiting for active broadcast...',
|
|
||||||
STATES.POLLING: f'''Tuning into "{self._stream_title}" - ready to rumble ~\n''',
|
|
||||||
STATES.UNAUTHORIZED: 'Unauthorized - see terminal',
|
|
||||||
STATES.FAILURE: 'YouTube API returned a bad status, polling stopped - see terminal',
|
|
||||||
}
|
|
||||||
|
|
||||||
message = status_messages.get(new_state)
|
|
||||||
if message is not None:
|
|
||||||
sys_msg = SysMessage(self._name, message)
|
|
||||||
self.publish(sys_msg)
|
|
||||||
|
|
||||||
def process_messages(self, message_type, args, next_state):
|
|
||||||
if message_type == self.__class__.CONTROL_MESSAGES.RESTART:
|
|
||||||
if self.state == STATES.POLLING:
|
|
||||||
self.publish(SysMessage(self._name, 'Restart requested, but service is in healthy state! Ignoring...'))
|
|
||||||
else:
|
|
||||||
self.publish(SysMessage(self._name, 'Restarting service...'))
|
|
||||||
return None, None
|
|
||||||
|
|
||||||
def normalize_message(self, message):
|
|
||||||
if message['authorDetails']['isChatOwner']:
|
|
||||||
author_type = USER_TYPE.OWNER
|
|
||||||
elif message['authorDetails']['isChatModerator']:
|
|
||||||
author_type = USER_TYPE.MODERATOR
|
|
||||||
elif message['authorDetails']['isChatSponsor']:
|
|
||||||
author_type = USER_TYPE.PATRON
|
|
||||||
else:
|
|
||||||
author_type = USER_TYPE.USER
|
|
||||||
|
|
||||||
text = message['snippet']['displayMessage']
|
|
||||||
author_name = message['authorDetails']['displayName']
|
|
||||||
author_id = message['authorDetails']['channelId']
|
|
||||||
return Message(self._name, text, author_name, author_id, author_type)
|
|
||||||
|
|
||||||
def request_oauth_consent(self):
|
|
||||||
params = {
|
|
||||||
'client_id': self._client_secrets['client_id'],
|
|
||||||
'redirect_uri': 'urn:ietf:wg:oauth:2.0:oob',
|
|
||||||
'scope': 'https://www.googleapis.com/auth/youtube',
|
|
||||||
'response_type': 'code',
|
|
||||||
}
|
|
||||||
param_str = '&'.join(f'{k}={v}' for (k, v) in params.items())
|
|
||||||
url = f"{self._client_secrets['auth_uri']}?{param_str}"
|
|
||||||
webbrowser.open(url)
|
|
||||||
|
|
||||||
def setup_oauth_consent(self, consent_code):
|
|
||||||
response = requests.post(self.__class__.GOOGLE_OAUTH_TOKEN_URL, data={
|
|
||||||
'code': consent_code,
|
|
||||||
'client_id': self._client_secrets['client_id'],
|
|
||||||
'client_secret': self._client_secrets['client_secret'],
|
|
||||||
'redirect_uri': 'urn:ietf:wg:oauth:2.0:oob',
|
|
||||||
'grant_type': 'authorization_code',
|
|
||||||
})
|
|
||||||
response.raise_for_status()
|
|
||||||
auth = response.json()
|
|
||||||
with open('refresh_token.secret', 'w') as f:
|
|
||||||
f.write(auth['refresh_token'])
|
|
||||||
self._access_token = auth['access_token']
|
|
||||||
self._refresh_token = auth['refresh_token']
|
|
||||||
|
|
||||||
def get_fresh_access_token(self):
|
|
||||||
response = requests.post(self.__class__.GOOGLE_OAUTH_TOKEN_URL, data={
|
|
||||||
'client_id': self._client_secrets['client_id'],
|
|
||||||
'client_secret': self._client_secrets['client_secret'],
|
|
||||||
'refresh_token': self._refresh_token,
|
|
||||||
'grant_type': 'refresh_token'
|
|
||||||
})
|
|
||||||
response.raise_for_status()
|
|
||||||
auth = response.json()
|
|
||||||
self._access_token = auth['access_token']
|
|
|
@ -1,29 +0,0 @@
|
||||||
import logging
|
|
||||||
import asyncio
|
|
||||||
|
|
||||||
import click
|
|
||||||
|
|
||||||
from ovtk_audiencekit.core import MainProcess
|
|
||||||
from .group import cli
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
@cli.command()
|
|
||||||
@click.argument('config_file', type=click.Path('r'), default='config.kdl')
|
|
||||||
@click.option('--bus-bind', default='localhost')
|
|
||||||
@click.option('--bus-port', default='8080')
|
|
||||||
@click.option('--web-bind', default='localhost')
|
|
||||||
@click.option('--web-port', default='8000')
|
|
||||||
def start(config_file, bus_bind=None, bus_port=None, web_bind=None, web_port=None):
|
|
||||||
"""Start audiencekit server"""
|
|
||||||
logger.info('Hewwo!!')
|
|
||||||
main = MainProcess(config_file,
|
|
||||||
bus_conf=(bus_bind, bus_port),
|
|
||||||
web_conf=(web_bind, web_port))
|
|
||||||
try:
|
|
||||||
asyncio.run(main.run())
|
|
||||||
except KeyboardInterrupt:
|
|
||||||
pass
|
|
||||||
finally:
|
|
||||||
logger.info('Suya~')
|
|
|
@ -1,143 +0,0 @@
|
||||||
from functools import reduce
|
|
||||||
from operator import getitem
|
|
||||||
from string import Formatter
|
|
||||||
from dataclasses import dataclass, field
|
|
||||||
from abc import ABC, abstractmethod
|
|
||||||
|
|
||||||
import kdl
|
|
||||||
import kdl.types
|
|
||||||
|
|
||||||
from ovtk_audiencekit.utils import format_exception
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class Dynamic(ABC):
|
|
||||||
source: str
|
|
||||||
parser: any
|
|
||||||
|
|
||||||
def to_kdl(self):
|
|
||||||
return self.source
|
|
||||||
|
|
||||||
@abstractmethod
|
|
||||||
def compute():
|
|
||||||
pass
|
|
||||||
|
|
||||||
def __repr__(self):
|
|
||||||
return str(self.source)
|
|
||||||
|
|
||||||
def get(instance, key):
|
|
||||||
if isinstance(instance, dict):
|
|
||||||
return getitem(instance, key)
|
|
||||||
else:
|
|
||||||
try:
|
|
||||||
return getattr(instance, key)
|
|
||||||
except KeyError:
|
|
||||||
return getitem(instance, key)
|
|
||||||
|
|
||||||
class Arg(Dynamic):
|
|
||||||
class GetitemFormatter(Formatter):
|
|
||||||
def get_field(self, field_name, args, kwargs):
|
|
||||||
keys = field_name.split('.')
|
|
||||||
field = reduce(get, keys, kwargs)
|
|
||||||
return (field, keys[0])
|
|
||||||
|
|
||||||
def compute(self, *args, _ctx={}):
|
|
||||||
key = self.parser.fragment[1:-1]
|
|
||||||
try:
|
|
||||||
if '{' in key:
|
|
||||||
return Arg.GetitemFormatter().format(key, **_ctx).replace(r'\"', '"')
|
|
||||||
else:
|
|
||||||
return reduce(get, key.split('.'), _ctx)
|
|
||||||
except (KeyError, IndexError) as e:
|
|
||||||
raise self.parser.error(f'Invalid arg string: {e}') from e
|
|
||||||
except Exception as e:
|
|
||||||
raise self.parser.error(f'Exception raised during arg inject: {format_exception(e, traceback=False)}') from e
|
|
||||||
|
|
||||||
|
|
||||||
class Eval(Dynamic):
|
|
||||||
def compute(self, *args, **kwargs):
|
|
||||||
contents = self.parser.fragment[1:-1]
|
|
||||||
try:
|
|
||||||
return eval(contents, kwargs)
|
|
||||||
except Exception as e:
|
|
||||||
raise self.parser.error(f'Exception raised during eval: {format_exception(e, traceback=False)}') from e
|
|
||||||
|
|
||||||
|
|
||||||
def csv_parser(text, parser):
|
|
||||||
text = parser.fragment[1:-1]
|
|
||||||
return [field.strip() for field in text.split(',')]
|
|
||||||
|
|
||||||
def semisv_parser(text, parser):
|
|
||||||
text = parser.fragment[1:-1]
|
|
||||||
return [field.strip() for field in text.split(';')]
|
|
||||||
|
|
||||||
customValueParsers = {
|
|
||||||
'arg': Arg,
|
|
||||||
't': Arg,
|
|
||||||
'eval': Eval,
|
|
||||||
'list': csv_parser,
|
|
||||||
'csv': csv_parser,
|
|
||||||
'semisv': semisv_parser,
|
|
||||||
}
|
|
||||||
|
|
||||||
def compute_dynamic(kdl_node, *args, **kwargs):
|
|
||||||
args = []
|
|
||||||
for arg in kdl_node.args:
|
|
||||||
if isinstance(arg, Dynamic):
|
|
||||||
arg = arg.compute(*args, **kwargs)
|
|
||||||
args.append(arg)
|
|
||||||
props = {}
|
|
||||||
for key, prop in kdl_node.props.items():
|
|
||||||
if isinstance(prop, Dynamic):
|
|
||||||
prop = prop.compute(*args, **kwargs)
|
|
||||||
props[key] = prop
|
|
||||||
|
|
||||||
return args, props
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class KdlscriptNode(kdl.Node):
|
|
||||||
alias: str | None = field(default=None, init=False)
|
|
||||||
sub: str | None = field(default=None, init=False)
|
|
||||||
|
|
||||||
def __post_init__(self):
|
|
||||||
lhs, *sub = self.name.split('.')
|
|
||||||
if len(sub) == 0:
|
|
||||||
sub = None
|
|
||||||
alias, *name = lhs.split(':')
|
|
||||||
if len(name) == 0:
|
|
||||||
name = alias
|
|
||||||
alias = None
|
|
||||||
elif len(name) == 1:
|
|
||||||
name = name[0]
|
|
||||||
else:
|
|
||||||
raise ValueError("Invalid node name")
|
|
||||||
|
|
||||||
self.name = name
|
|
||||||
self.alias = alias
|
|
||||||
self.sub = sub
|
|
||||||
|
|
||||||
# HACK: Gross disgusting monkey patch
|
|
||||||
kdl.types.Node = KdlscriptNode
|
|
||||||
|
|
||||||
kdl_parse_config = kdl.ParseConfig(valueConverters=customValueParsers)
|
|
||||||
kdl_reserved = ['secrets', 'chat', 'plugin', 'import']
|
|
||||||
|
|
||||||
|
|
||||||
def parse_kdl_deep(path, relativeto=None):
|
|
||||||
if relativeto:
|
|
||||||
path = os.path.normpath(os.path.join(relativeto, path))
|
|
||||||
|
|
||||||
with open(path, 'r') as f:
|
|
||||||
try:
|
|
||||||
config = kdl.parse(f.read(), kdl_parse_config)
|
|
||||||
for node in config.nodes:
|
|
||||||
node.args, node.props = compute_dynamic(node)
|
|
||||||
except kdl.errors.ParseError as e:
|
|
||||||
e.file = path
|
|
||||||
raise e
|
|
||||||
|
|
||||||
for node in config.nodes:
|
|
||||||
if node.name == 'import':
|
|
||||||
yield from parse_kdl_deep(node.args[0], relativeto=os.path.dirname(path))
|
|
||||||
else:
|
|
||||||
yield node
|
|
|
@ -1,344 +0,0 @@
|
||||||
import importlib
|
|
||||||
from multiprocessing import Lock
|
|
||||||
import asyncio
|
|
||||||
from datetime import datetime, timedelta
|
|
||||||
import logging
|
|
||||||
import os
|
|
||||||
import os.path
|
|
||||||
import pathlib
|
|
||||||
import sys
|
|
||||||
import signal
|
|
||||||
|
|
||||||
import kdl
|
|
||||||
from click import progressbar
|
|
||||||
from aioscheduler import TimedScheduler
|
|
||||||
from quart import Quart
|
|
||||||
import hypercorn
|
|
||||||
import hypercorn.asyncio
|
|
||||||
import hypercorn.logging
|
|
||||||
|
|
||||||
from ovtk_audiencekit.core import WebsocketServerProcess
|
|
||||||
from ovtk_audiencekit.core.Config import parse_kdl_deep, kdl_reserved, compute_dynamic
|
|
||||||
from ovtk_audiencekit.events import Event, Delete
|
|
||||||
from ovtk_audiencekit.chats.ChatProcess import ShutdownRequest
|
|
||||||
from ovtk_audiencekit.plugins import builtins, PluginError
|
|
||||||
from ovtk_audiencekit.utils import format_exception
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
class HypercornLoggingShim(hypercorn.logging.Logger):
|
|
||||||
"""Force bog-standard loggers for Hypercorn"""
|
|
||||||
def __init__(self, config):
|
|
||||||
self.access_logger = logging.getLogger('hypercorn.access')
|
|
||||||
self.error_logger = logging.getLogger('hypercorn.error')
|
|
||||||
self.access_log_format = config.access_log_format
|
|
||||||
|
|
||||||
def import_or_reload_mod(module_name, default_package=None, external=False):
|
|
||||||
if external:
|
|
||||||
package = None
|
|
||||||
import_fragment = module_name
|
|
||||||
else:
|
|
||||||
package = default_package
|
|
||||||
import_fragment = f'.{module_name}'
|
|
||||||
fq_module = f'{package}{import_fragment}' if not external else import_fragment
|
|
||||||
|
|
||||||
if module := sys.modules.get(fq_module):
|
|
||||||
for submod in [mod for name, mod in sys.modules.items() if name.startswith(fq_module)]:
|
|
||||||
importlib.reload(submod)
|
|
||||||
importlib.reload(module)
|
|
||||||
else:
|
|
||||||
module = importlib.import_module(import_fragment, package=package)
|
|
||||||
return module
|
|
||||||
|
|
||||||
|
|
||||||
class MainProcess:
|
|
||||||
def __init__(self, config_path, bus_conf=(None, None), web_conf=(None, None)):
|
|
||||||
self._running = False
|
|
||||||
self.config_path = config_path
|
|
||||||
self.bus_conf = bus_conf
|
|
||||||
self.web_conf = web_conf
|
|
||||||
|
|
||||||
self.chat_processes = {}
|
|
||||||
self.plugins = {}
|
|
||||||
self.event_queue = asyncio.Queue()
|
|
||||||
|
|
||||||
# Init websocket server (event bus)
|
|
||||||
# HACK: Must be done here to avoid shadowing its asyncio loop
|
|
||||||
self.server_process = WebsocketServerProcess(*self.bus_conf)
|
|
||||||
self.server_process.start()
|
|
||||||
|
|
||||||
# Save sys.path since some config will clobber it
|
|
||||||
self._initial_syspath = sys.path
|
|
||||||
|
|
||||||
def _unload_plugin(self, plugin_name):
|
|
||||||
plugin = self.plugins[plugin_name]
|
|
||||||
plugin.close()
|
|
||||||
del self.plugins[plugin_name]
|
|
||||||
del plugin
|
|
||||||
|
|
||||||
def _get_event_from_pipe(self, pipe):
|
|
||||||
event = pipe.recv()
|
|
||||||
self.event_queue.put_nowait(event)
|
|
||||||
|
|
||||||
def _setup_webserver(self):
|
|
||||||
self.webserver = Quart(__name__, static_folder=None, template_folder=None)
|
|
||||||
listen = ':'.join(self.web_conf)
|
|
||||||
self.webserver.config['SERVER_NAME'] = listen
|
|
||||||
@self.webserver.context_processor
|
|
||||||
async def update_ctx():
|
|
||||||
return { 'EVBUS': 'ws://' + ':'.join(self.bus_conf) }
|
|
||||||
self.webserver.jinja_options = {
|
|
||||||
'block_start_string': '<%', 'block_end_string': '%>',
|
|
||||||
'variable_start_string': '<<', 'variable_end_string': '>>',
|
|
||||||
'comment_start_string': '<#', 'comment_end_string': '#>',
|
|
||||||
}
|
|
||||||
|
|
||||||
async def serve_coro():
|
|
||||||
config = hypercorn.config.Config()
|
|
||||||
config.bind = listen
|
|
||||||
config.use_reloader = False
|
|
||||||
config.logger_class = HypercornLoggingShim
|
|
||||||
try:
|
|
||||||
await hypercorn.asyncio.serve(self.webserver, config, shutdown_trigger=self.shutdown_ev.wait)
|
|
||||||
except Exception as e:
|
|
||||||
logger.critical(f'Failure in web process - {e}')
|
|
||||||
logger.debug(format_exception(e))
|
|
||||||
raise e
|
|
||||||
# MAGIC: As root tasks are supposed to be infinte loops, raise an exception if hypercorn shut down
|
|
||||||
raise asyncio.CancelledError()
|
|
||||||
|
|
||||||
return serve_coro()
|
|
||||||
|
|
||||||
async def handle_events(self):
|
|
||||||
while True:
|
|
||||||
event = await self.event_queue.get()
|
|
||||||
logger.info(event)
|
|
||||||
|
|
||||||
if isinstance(event, Event):
|
|
||||||
for plugin_name, plugin in list(self.plugins.items()):
|
|
||||||
try:
|
|
||||||
event = plugin.on_bus_event(event)
|
|
||||||
if asyncio.iscoroutinefunction(plugin.on_bus_event):
|
|
||||||
event = await event
|
|
||||||
except PluginError as e:
|
|
||||||
if e.fatal:
|
|
||||||
logger.critical(f'Failure when processing {e.source} ({e}) - disabling...')
|
|
||||||
else:
|
|
||||||
logger.warning(f'Encounterd error when processing {e.source} ({e})')
|
|
||||||
logger.debug(format_exception(e))
|
|
||||||
if e.fatal:
|
|
||||||
self._unload_plugin(e.source)
|
|
||||||
except Exception as e:
|
|
||||||
self._plugin_error
|
|
||||||
logger.critical(f'Failure when processing {plugin_name} ({e}) - disabling...')
|
|
||||||
logger.debug(format_exception(e))
|
|
||||||
self._unload_plugin(plugin_name)
|
|
||||||
if event is None:
|
|
||||||
break
|
|
||||||
else:
|
|
||||||
self.server_process.message_pipe.send(event)
|
|
||||||
logger.debug(f'Event after plugin chain - {event}')
|
|
||||||
elif isinstance(event, Delete):
|
|
||||||
self.server_process.message_pipe.send(event)
|
|
||||||
else:
|
|
||||||
logger.error(f'Unknown data in event loop - {event}')
|
|
||||||
|
|
||||||
async def tick_plugins(self):
|
|
||||||
while True:
|
|
||||||
await asyncio.sleep(0.5)
|
|
||||||
for plugin_name, plugin in list(self.plugins.items()):
|
|
||||||
try:
|
|
||||||
res = plugin.tick(0.5) # Not necesarily honest!
|
|
||||||
if asyncio.iscoroutinefunction(plugin.tick):
|
|
||||||
await res
|
|
||||||
except Exception as e:
|
|
||||||
logger.critical(f'Failure during background processing for {plugin_name} ({e}) - disabling...')
|
|
||||||
logger.debug(format_exception(e))
|
|
||||||
self._unload_plugin(plugin_name)
|
|
||||||
|
|
||||||
async def user_setup(self):
|
|
||||||
config = kdl.Document(list(parse_kdl_deep(self.config_path)))
|
|
||||||
stdin_lock = Lock()
|
|
||||||
# Load secrets
|
|
||||||
secrets = {}
|
|
||||||
if node := config.get('secrets'):
|
|
||||||
for module in node.nodes:
|
|
||||||
fields = secrets.get(module.name, {})
|
|
||||||
for node in module.nodes:
|
|
||||||
fields[node.name] = node.args[0] if len(node.args) == 1 else node.args
|
|
||||||
secrets[module.name] = fields
|
|
||||||
|
|
||||||
# Dynamically import chats
|
|
||||||
with progressbar(list(config.getAll('chat')), label="Preparing modules (chats)", item_show_func=lambda i: i and i.args[0]) as bar:
|
|
||||||
for node in bar:
|
|
||||||
if len(node.args) == 0:
|
|
||||||
continue
|
|
||||||
module_name = node.args[0]
|
|
||||||
chat_name = node.alias or module_name
|
|
||||||
if chat_name in self.plugins:
|
|
||||||
raise ValueError(f'Definition "{chat_name}" already exists - rename using alias syntax: `alias:chat ...`')
|
|
||||||
secrets_for_mod = secrets.get(module_name, {})
|
|
||||||
try:
|
|
||||||
chat_module = import_or_reload_mod(module_name,
|
|
||||||
default_package='ovtk_audiencekit.chats',
|
|
||||||
external=False)
|
|
||||||
chat_process = chat_module.Process(stdin_lock, chat_name, **node.props, **secrets_for_mod)
|
|
||||||
self.chat_processes[chat_name] = chat_process
|
|
||||||
except Exception as e:
|
|
||||||
raise ValueError(f'Failed to initalize {module_name} module "{chat_name}" - {e}')
|
|
||||||
|
|
||||||
if len(self.chat_processes.keys()) == 0:
|
|
||||||
logger.warning('No chats configured!')
|
|
||||||
|
|
||||||
# Start chat processes
|
|
||||||
for process in self.chat_processes.values():
|
|
||||||
process.start()
|
|
||||||
# Bridge pipe to asyncio event loop
|
|
||||||
pipe = process.event_pipe
|
|
||||||
# REVIEW: This does not work on windows!!!! add_reader is not implemented
|
|
||||||
# in a way that supports pipes on either windows loop runners
|
|
||||||
asyncio.get_event_loop().add_reader(pipe.fileno(), lambda pipe=pipe: self._get_event_from_pipe(pipe))
|
|
||||||
|
|
||||||
# Load plugins
|
|
||||||
global_ctx = {}
|
|
||||||
## Builtins
|
|
||||||
for node_name in builtins.__all__:
|
|
||||||
plugin = builtins.__dict__[node_name](self.chat_processes, self.event_queue, node_name, global_ctx)
|
|
||||||
self.plugins[node_name] = plugin
|
|
||||||
self.webserver.register_blueprint(plugin.blueprint)
|
|
||||||
|
|
||||||
## Dynamic
|
|
||||||
with progressbar(list(config.getAll('plugin')), label="Preparing modules (plugins)", item_show_func=lambda i: i and i.args[0]) as bar:
|
|
||||||
for node in bar:
|
|
||||||
if len(node.args) == 0:
|
|
||||||
continue
|
|
||||||
module_name = node.args[0]
|
|
||||||
plugin_name = node.alias or module_name
|
|
||||||
if plugin_name in self.plugins:
|
|
||||||
raise ValueError(f'Definition "{plugin_name}" already exists - rename using alias syntax: `alias:plugin ...`')
|
|
||||||
secrets_for_mod = secrets.get(module_name, {})
|
|
||||||
try:
|
|
||||||
plugin_module = import_or_reload_mod(module_name,
|
|
||||||
default_package='ovtk_audiencekit.plugins',
|
|
||||||
external=False)
|
|
||||||
plugin = plugin_module.Plugin(self.chat_processes, self.event_queue, plugin_name, global_ctx,
|
|
||||||
**node.props, **secrets_for_mod, _children=node.nodes)
|
|
||||||
self.plugins[plugin_name] = plugin
|
|
||||||
# Register UI with webserver
|
|
||||||
self.webserver.register_blueprint(plugin.blueprint)
|
|
||||||
except Exception as e:
|
|
||||||
raise ValueError(f'Failed to initalize {module_name} plugin "{plugin_name}" - {e}')
|
|
||||||
|
|
||||||
# Run plugin definitions
|
|
||||||
with progressbar(list(config.nodes), label=f"Executing {self.config_path}") as bar:
|
|
||||||
for node in bar:
|
|
||||||
if node.name in kdl_reserved:
|
|
||||||
continue
|
|
||||||
plugin_name = node.name
|
|
||||||
plugin_module = self.plugins.get(plugin_name)
|
|
||||||
if plugin_module is None:
|
|
||||||
logger.error(f'Unknown plugin: {node.name}')
|
|
||||||
else:
|
|
||||||
await plugin_module._call(node.sub, node.tag, *node.args, **node.props, _ctx=global_ctx, _children=node.nodes)
|
|
||||||
|
|
||||||
async def user_shutdown(self):
|
|
||||||
for process_name, process in list(reversed(self.chat_processes.items())):
|
|
||||||
pipe = process.event_pipe
|
|
||||||
process.control_pipe.send(ShutdownRequest('root'))
|
|
||||||
process.join(5)
|
|
||||||
if process.exitcode is None:
|
|
||||||
process.terminate()
|
|
||||||
asyncio.get_event_loop().remove_reader(pipe.fileno())
|
|
||||||
del self.chat_processes[process_name]
|
|
||||||
for plugin_name in list(reversed(self.plugins.keys())):
|
|
||||||
# NOTE: The plugin will likely stick around in memory for a bit after this,
|
|
||||||
# as the webserver will still have its quart blueprint attached
|
|
||||||
self._unload_plugin(plugin_name)
|
|
||||||
sys.path = self._initial_syspath
|
|
||||||
|
|
||||||
async def run(self):
|
|
||||||
self.shutdown_ev = asyncio.Event()
|
|
||||||
self.reload_ev = asyncio.Event()
|
|
||||||
loop = asyncio.get_event_loop()
|
|
||||||
user_tasks = set()
|
|
||||||
|
|
||||||
try:
|
|
||||||
# System setup
|
|
||||||
## Bridge websocket server pipe to asyncio loop
|
|
||||||
## REVIEW: This does not work on windows!!!! add_reader is not implemented
|
|
||||||
## in a way that supports pipes on either windows loop runners
|
|
||||||
ws_pipe = self.server_process.message_pipe
|
|
||||||
loop.add_reader(ws_pipe.fileno(), lambda: self._get_event_from_pipe(ws_pipe))
|
|
||||||
## Register stdin handler
|
|
||||||
reader = asyncio.StreamReader()
|
|
||||||
await loop.connect_read_pipe(lambda: asyncio.StreamReaderProtocol(reader), sys.stdin)
|
|
||||||
async def discount_repl():
|
|
||||||
# REVIEW: Not a good UX at the moment (as new logs clobber the terminal entry)
|
|
||||||
async for line in reader:
|
|
||||||
line = line.strip()
|
|
||||||
if line == b'reload':
|
|
||||||
self.reload_ev.set()
|
|
||||||
elif line == b'quit':
|
|
||||||
self.shutdown_ev.set()
|
|
||||||
self.cli_task = loop.create_task(discount_repl())
|
|
||||||
## Scheduler for timed tasks
|
|
||||||
self._skehdule = TimedScheduler(max_tasks=1)
|
|
||||||
self._skehdule.start()
|
|
||||||
## UI server
|
|
||||||
serve_coro = self._setup_webserver()
|
|
||||||
self.webserver_task = loop.create_task(serve_coro)
|
|
||||||
logger.debug(f'Listening on: {":".join(self.web_conf)} (UI) and {":".join(self.bus_conf)} (event bus)')
|
|
||||||
|
|
||||||
# User (plugin / chat) mode (reloading allowed)
|
|
||||||
while True:
|
|
||||||
async with self.webserver.app_context():
|
|
||||||
await self.user_setup()
|
|
||||||
# Start plumbing tasks
|
|
||||||
user_tasks.add(loop.create_task(self.tick_plugins()))
|
|
||||||
user_tasks.add(loop.create_task(self.handle_events()))
|
|
||||||
|
|
||||||
logger.info(f'Ready to rumble! Press Ctrl+C to shut down')
|
|
||||||
reload_task = loop.create_task(self.reload_ev.wait())
|
|
||||||
done, pending = await asyncio.wait([*user_tasks, self.webserver_task, reload_task], return_when=asyncio.FIRST_COMPLETED)
|
|
||||||
|
|
||||||
if reload_task in done:
|
|
||||||
logger.warn('Reloading (some events may be missed!)')
|
|
||||||
logger.debug('Teardown...')
|
|
||||||
self.reload_ev.clear()
|
|
||||||
# Shutdown plugins / chats
|
|
||||||
await self.user_shutdown()
|
|
||||||
# Stop event plumbing
|
|
||||||
for task in user_tasks:
|
|
||||||
task.cancel()
|
|
||||||
user_tasks.clear()
|
|
||||||
# HACK: Restart webserver to workaround quart's inability to remove blueprints
|
|
||||||
# Stop
|
|
||||||
await self.webserver.shutdown()
|
|
||||||
self.shutdown_ev.set()
|
|
||||||
try:
|
|
||||||
await self.webserver_task
|
|
||||||
except asyncio.CancelledError:
|
|
||||||
self.shutdown_ev.clear()
|
|
||||||
# Start
|
|
||||||
logger.debug('Startup...')
|
|
||||||
serve_coro = self._setup_webserver()
|
|
||||||
self.webserver_task = loop.create_task(serve_coro)
|
|
||||||
else:
|
|
||||||
break
|
|
||||||
except KeyboardInterrupt:
|
|
||||||
pass
|
|
||||||
except kdl.errors.ParseError as e:
|
|
||||||
try:
|
|
||||||
logger.critical(f'Invalid configuration in {e.file}: line {e.line}, character {e.col} - {e.msg}')
|
|
||||||
except AttributeError:
|
|
||||||
logger.critical(f'Invalid configuration - {e.msg}')
|
|
||||||
except Exception as e:
|
|
||||||
logger.critical(f'Failure in core process - {e}')
|
|
||||||
logger.debug(format_exception(e))
|
|
||||||
finally:
|
|
||||||
logger.warn('Closing up shop...')
|
|
||||||
for task in user_tasks:
|
|
||||||
task.cancel()
|
|
||||||
await self.user_shutdown()
|
|
||||||
self.webserver_task.cancel()
|
|
||||||
self.server_process.terminate()
|
|
|
@ -1,138 +0,0 @@
|
||||||
from abc import ABC, abstractmethod
|
|
||||||
import logging
|
|
||||||
import asyncio
|
|
||||||
import os.path
|
|
||||||
import sys
|
|
||||||
import copy
|
|
||||||
|
|
||||||
import kdl
|
|
||||||
import quart
|
|
||||||
|
|
||||||
from ovtk_audiencekit.core.Config import kdl_parse_config, compute_dynamic
|
|
||||||
|
|
||||||
|
|
||||||
class PluginError(Exception):
|
|
||||||
def __init__(self, source, message, fatal=True):
|
|
||||||
self.source = source
|
|
||||||
self.message = message
|
|
||||||
self.fatal = fatal
|
|
||||||
|
|
||||||
def __str__(self):
|
|
||||||
return self.message
|
|
||||||
|
|
||||||
class OvtkBlueprint(quart.Blueprint):
|
|
||||||
def url_for(self, endpoint, *args, **kwargs):
|
|
||||||
"""url_for method that understands blueprint-relative names under non-request contexts"""
|
|
||||||
if endpoint.startswith('.'):
|
|
||||||
endpoint = self.name + endpoint
|
|
||||||
return quart.url_for(endpoint, *args, **kwargs)
|
|
||||||
|
|
||||||
|
|
||||||
class PluginBase(ABC):
|
|
||||||
plugins = {}
|
|
||||||
|
|
||||||
def __init__(self, chat_processes, event_queue, name, global_ctx, _children=None, **kwargs):
|
|
||||||
super().__init__(**kwargs)
|
|
||||||
self.chats = chat_processes
|
|
||||||
self._event_queue = event_queue
|
|
||||||
self._name = name
|
|
||||||
self._global_ctx = global_ctx
|
|
||||||
|
|
||||||
self.plugins[name] = self
|
|
||||||
|
|
||||||
self.logger = logging.getLogger(f'plugin.{self._name}')
|
|
||||||
|
|
||||||
# HACK: This is kinda gross, and probably wont be true for frozen modules
|
|
||||||
plugin_dir = os.path.dirname(sys.modules[self.__class__.__module__].__file__)
|
|
||||||
self.blueprint = OvtkBlueprint(self._name, __name__,
|
|
||||||
url_prefix=f'/{self._name}',
|
|
||||||
static_url_path='static',
|
|
||||||
static_folder=os.path.join(plugin_dir, 'static'),
|
|
||||||
template_folder=os.path.join(plugin_dir, 'templates'))
|
|
||||||
|
|
||||||
if _children:
|
|
||||||
raise ValueError('Module does not accept children')
|
|
||||||
|
|
||||||
def __del__(self):
|
|
||||||
if self.plugins.get(self._name) == self:
|
|
||||||
del self.plugins[self._name]
|
|
||||||
|
|
||||||
async def _call(self, subroutine, tag, *args, **kwargs):
|
|
||||||
try:
|
|
||||||
if subroutine:
|
|
||||||
func = self
|
|
||||||
for accessor in subroutine:
|
|
||||||
func = getattr(func, accessor)
|
|
||||||
else:
|
|
||||||
func = self.run
|
|
||||||
res = func(*args, **kwargs)
|
|
||||||
if asyncio.iscoroutinefunction(func):
|
|
||||||
res = await res
|
|
||||||
return res
|
|
||||||
except Exception as e:
|
|
||||||
if isinstance(e, KeyboardInterrupt):
|
|
||||||
raise e
|
|
||||||
raise PluginError(self._name, str(e)) from e
|
|
||||||
|
|
||||||
# Base class helpers
|
|
||||||
def broadcast(self, event):
|
|
||||||
"""Send event to every active chat"""
|
|
||||||
for proc in self.chats.values():
|
|
||||||
if proc.readonly:
|
|
||||||
continue
|
|
||||||
proc.control_pipe.send(event)
|
|
||||||
|
|
||||||
async def execute_kdl(self, nodes, *py_args, _ctx={}, **py_props):
|
|
||||||
"""
|
|
||||||
Run other plugins as configured by the passed KDL nodes collection
|
|
||||||
If this was done in response to an event, pass it as 'event' in _ctx!
|
|
||||||
"""
|
|
||||||
_ctx = copy.deepcopy({**self._global_ctx, **_ctx})
|
|
||||||
for node in nodes:
|
|
||||||
try:
|
|
||||||
args, props = compute_dynamic(node, _ctx=_ctx)
|
|
||||||
target = self.plugins.get(node.name)
|
|
||||||
if target is None:
|
|
||||||
self.logger.warning(f'Could not find plugin or builtin with name {node.name}')
|
|
||||||
break
|
|
||||||
result = await target._call(node.sub, node.tag, *args, *py_args, **props, _ctx=_ctx, **py_props, _children=node.nodes)
|
|
||||||
if node.alias:
|
|
||||||
_ctx[node.alias] = result
|
|
||||||
except Exception as e:
|
|
||||||
self.logger.warning(f'Failed to execute defered KDL: {e}')
|
|
||||||
break
|
|
||||||
|
|
||||||
|
|
||||||
def send_to_bus(self, event):
|
|
||||||
"""
|
|
||||||
Send an event to the event bus
|
|
||||||
WARNING: This will cause the event to be processed by other plugins - be careful not to cause an infinite loop!
|
|
||||||
"""
|
|
||||||
self._event_queue.put_nowait(event)
|
|
||||||
|
|
||||||
# User-defined
|
|
||||||
def close(self):
|
|
||||||
"""Called when plugin is about to be unloaded. Use this to safely close any resouces if needed"""
|
|
||||||
pass
|
|
||||||
|
|
||||||
async def tick(self, dt):
|
|
||||||
"""Called at least every half second - perform time-dependent updates here!"""
|
|
||||||
pass
|
|
||||||
|
|
||||||
async def on_bus_event(self, event):
|
|
||||||
"""Called for every event from the chats"""
|
|
||||||
return event
|
|
||||||
|
|
||||||
async def on_control_event(self, event):
|
|
||||||
"""
|
|
||||||
Called for events targeting this plugin name specifically.
|
|
||||||
This is normally used for other applications to communicate with this one over the websocket interface
|
|
||||||
"""
|
|
||||||
pass
|
|
||||||
|
|
||||||
@abstractmethod
|
|
||||||
async def run(self, _children=None, _ctx={}, **kwargs):
|
|
||||||
"""
|
|
||||||
Run plugin action, either due to a definition in the config, or due to another plugin
|
|
||||||
"""
|
|
||||||
pass
|
|
|
@ -1 +0,0 @@
|
||||||
from .osc import OSCPlugin as Plugin
|
|
|
@ -1,17 +0,0 @@
|
||||||
from pythonosc.udp_client import SimpleUDPClient
|
|
||||||
|
|
||||||
from ovtk_audiencekit.plugins import PluginBase
|
|
||||||
|
|
||||||
|
|
||||||
class OSCPlugin(PluginBase):
|
|
||||||
def __init__(self, *args, ip='localhost', port=None, **kwargs):
|
|
||||||
super().__init__(*args, **kwargs)
|
|
||||||
if port is None:
|
|
||||||
raise RuntimeError('A unique port must be specified')
|
|
||||||
self.client = SimpleUDPClient(ip, int(port))
|
|
||||||
|
|
||||||
async def run(self, endpoint, *data, _children=None, _ctx={}, **kwargs):
|
|
||||||
if len(data) == 1:
|
|
||||||
self.client.send_message(endpoint, data[0])
|
|
||||||
else:
|
|
||||||
self.client.send_message(endpoint, data)
|
|
|
@ -1,3 +0,0 @@
|
||||||
from ovtk_audiencekit.core.PluginBase import PluginBase, PluginError
|
|
||||||
|
|
||||||
__all__ = ['PluginBase', 'PluginError']
|
|
|
@ -1,122 +0,0 @@
|
||||||
import asyncio
|
|
||||||
import logging
|
|
||||||
import datetime
|
|
||||||
import uuid
|
|
||||||
|
|
||||||
import maya
|
|
||||||
import aioscheduler
|
|
||||||
|
|
||||||
from ovtk_audiencekit.plugins import PluginBase
|
|
||||||
from ovtk_audiencekit.utils import format_exception
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
TIME_SEGMENTS = ['seconds', 'minutes', 'hours', 'days']
|
|
||||||
|
|
||||||
class Cue:
|
|
||||||
def __init__(self, repeat, at=None, **kwargs):
|
|
||||||
self.repeat = repeat
|
|
||||||
self.enabled = True
|
|
||||||
|
|
||||||
if at:
|
|
||||||
self._next = maya.when(at)
|
|
||||||
else:
|
|
||||||
self._next = maya.now().add(**kwargs)
|
|
||||||
self._interval = kwargs
|
|
||||||
|
|
||||||
@property
|
|
||||||
def next(self):
|
|
||||||
return self._next.datetime(to_timezone='UTC', naive=True)
|
|
||||||
|
|
||||||
def is_obsolete(self):
|
|
||||||
if self.repeat:
|
|
||||||
return False
|
|
||||||
if self._next <= maya.now():
|
|
||||||
return False
|
|
||||||
return True
|
|
||||||
|
|
||||||
def reschedule(self, fresh=False):
|
|
||||||
if fresh:
|
|
||||||
self._next = maya.now().add(**self._interval)
|
|
||||||
else:
|
|
||||||
self._next = self._next.add(**self._interval)
|
|
||||||
# HACK: Compare epochs directly, as maya comps are only second accurate
|
|
||||||
if not fresh and self._next._epoch <= maya.now()._epoch:
|
|
||||||
offset = maya.now()._epoch - self._next._epoch
|
|
||||||
logger.warn(f'Cannot keep up with configured interval - {underrun} underrun. Repetition may fail!')
|
|
||||||
self._next = maya.now().add(**self._interval)
|
|
||||||
|
|
||||||
|
|
||||||
class CuePlugin(PluginBase):
|
|
||||||
def __init__(self, *args, **kwargs):
|
|
||||||
super().__init__(*args, **kwargs)
|
|
||||||
self.cues = {}
|
|
||||||
self.tasks = {}
|
|
||||||
|
|
||||||
self.scheduler = aioscheduler.TimedScheduler()
|
|
||||||
self.scheduler.start()
|
|
||||||
|
|
||||||
self._cleanup_task = asyncio.create_task(self._cleanup())
|
|
||||||
|
|
||||||
def run(self, name=None, repeat=False, enabled=None, _children=[], _ctx={}, **kwargs):
|
|
||||||
if name is None:
|
|
||||||
name = str(uuid.uuid4())
|
|
||||||
|
|
||||||
first_set = self.cues.get(name) is None
|
|
||||||
|
|
||||||
if len(_children) > 0:
|
|
||||||
has_interval = any(kwargs.get(segment) is not None for segment in TIME_SEGMENTS)
|
|
||||||
if kwargs.get('at') is None and not has_interval:
|
|
||||||
raise ValueError('Provide a concrete time with `at` or a timer length with `seconds`, `hours`, etc')
|
|
||||||
if kwargs.get('at') is not None and repeat and not has_interval:
|
|
||||||
raise ValueError('`repeat` can not be used with solely a concrete time')
|
|
||||||
|
|
||||||
cue = Cue(repeat, **kwargs)
|
|
||||||
|
|
||||||
async def handler():
|
|
||||||
# Repetion management
|
|
||||||
if not cue.enabled:
|
|
||||||
return
|
|
||||||
if cue.repeat:
|
|
||||||
cue.reschedule()
|
|
||||||
self.tasks[name] = self.scheduler.schedule(handler(), cue.next)
|
|
||||||
# Run configured actions
|
|
||||||
try:
|
|
||||||
await self.execute_kdl(_children, _ctx=_ctx)
|
|
||||||
except Exception as e:
|
|
||||||
self.logger.error(f'Failed to complete cue {name}: {e}')
|
|
||||||
self.logger.debug(format_exception(e))
|
|
||||||
|
|
||||||
self.cues[name] = (cue, handler)
|
|
||||||
self.schedule_exec(name, cue.next, handler())
|
|
||||||
|
|
||||||
if enabled is not None:
|
|
||||||
entry = self.cues.get(name)
|
|
||||||
if entry is None:
|
|
||||||
self.logger.warn(f'Cannot find cue with name "{name}"')
|
|
||||||
return
|
|
||||||
cue, handler = entry
|
|
||||||
|
|
||||||
cue.enabled = enabled
|
|
||||||
|
|
||||||
if enabled and not first_set:
|
|
||||||
cue.reschedule(fresh=True)
|
|
||||||
self.schedule_exec(name, cue.next, handler())
|
|
||||||
|
|
||||||
def schedule_exec(self, name, at, coro):
|
|
||||||
if existing_task := self.tasks.get(name):
|
|
||||||
self.scheduler.cancel(existing_task)
|
|
||||||
try:
|
|
||||||
self.tasks[name] = self.scheduler.schedule(coro, at)
|
|
||||||
except ValueError as e:
|
|
||||||
self.logger.error(f'Cannot schedule cue {name} at {at}: {e}')
|
|
||||||
|
|
||||||
async def _cleanup(self):
|
|
||||||
while True:
|
|
||||||
await asyncio.sleep(60)
|
|
||||||
for name, (cue, _) in self.cues.items():
|
|
||||||
if cue.is_obsolete():
|
|
||||||
del self.cues[name]
|
|
||||||
if task := self.tasks.get(name):
|
|
||||||
self.scheduler.cancel(task)
|
|
||||||
del self.tasks[name]
|
|
|
@ -1,14 +0,0 @@
|
||||||
from ovtk_audiencekit.plugins import PluginBase
|
|
||||||
|
|
||||||
import logging
|
|
||||||
|
|
||||||
level_names = ['critical', 'error', 'warning', 'info', 'debug']
|
|
||||||
|
|
||||||
class LogPlugin(PluginBase):
|
|
||||||
def run(self, msg, level="info", **kwargs):
|
|
||||||
try:
|
|
||||||
int_level = next((getattr(logging, level_name.upper()) for level_name in level_names if level_name.startswith(level.lower())))
|
|
||||||
except StopIteration:
|
|
||||||
self.logger.debug(f'Using default log level for KDL log call since user level "{level}" is not recognized')
|
|
||||||
int_level = logging.INFO
|
|
||||||
self.logger.log(int_level, msg)
|
|
|
@ -1,61 +0,0 @@
|
||||||
import asyncio
|
|
||||||
|
|
||||||
import mido
|
|
||||||
|
|
||||||
from ovtk_audiencekit.plugins import PluginBase
|
|
||||||
|
|
||||||
|
|
||||||
def matches(msg, attrs):
|
|
||||||
for attr, match_val in attrs.items():
|
|
||||||
msg_val = getattr(msg, attr)
|
|
||||||
if msg_val != match_val:
|
|
||||||
return False
|
|
||||||
return True
|
|
||||||
|
|
||||||
|
|
||||||
class MidiPlugin(PluginBase):
|
|
||||||
def __init__(self, *args, **kwargs):
|
|
||||||
super().__init__(*args, **kwargs)
|
|
||||||
loop = asyncio.get_event_loop()
|
|
||||||
def callback(msg):
|
|
||||||
asyncio.run_coroutine_threadsafe(self.recv_callback(msg), loop)
|
|
||||||
|
|
||||||
self.output_port = mido.open_output()
|
|
||||||
self.input_port = mido.open_input(callback=callback)
|
|
||||||
self.listeners = {
|
|
||||||
'note_off': [],
|
|
||||||
'note_on': [],
|
|
||||||
'control_change': [],
|
|
||||||
'program_change': [],
|
|
||||||
'sysex': [],
|
|
||||||
'song_select': [],
|
|
||||||
'start': [],
|
|
||||||
'continue': [],
|
|
||||||
'stop': [],
|
|
||||||
}
|
|
||||||
|
|
||||||
def close(self):
|
|
||||||
self.input_port.close()
|
|
||||||
self.output_port.close()
|
|
||||||
|
|
||||||
def run(self, type, _ctx={}, _children=None, **kwargs):
|
|
||||||
if type == 'sysex':
|
|
||||||
data = kwargs['data']
|
|
||||||
msg = mido.Message('sysex', data=bytes(data, encoding='utf-8'), time=0)
|
|
||||||
else:
|
|
||||||
msg = mido.Message(type, **kwargs, time=0)
|
|
||||||
self.output_port.send(msg)
|
|
||||||
|
|
||||||
async def recv_callback(self, msg):
|
|
||||||
if hasattr(msg, 'channel'):
|
|
||||||
msg.channel += 1 # Channels in mido are 0-15, but in spec are 1-16. Adjust to spec
|
|
||||||
self.logger.debug(f"Recv: {msg}")
|
|
||||||
for params, handler, ctx in self.listeners[msg.type]:
|
|
||||||
if matches(msg, params):
|
|
||||||
_ctx = {**ctx, 'midi': msg}
|
|
||||||
await handler(_ctx)
|
|
||||||
|
|
||||||
def listen(self, type, _ctx={}, _children=None, **kwargs):
|
|
||||||
kwargs = {k:int(v) for k, v in kwargs.items()}
|
|
||||||
handler = lambda ctx: self.execute_kdl(_children, _ctx=ctx)
|
|
||||||
self.listeners[type].append((kwargs, handler, _ctx))
|
|
|
@ -1,150 +0,0 @@
|
||||||
from dataclasses import dataclass, field
|
|
||||||
import asyncio
|
|
||||||
from typing import Callable
|
|
||||||
import quart
|
|
||||||
import json
|
|
||||||
|
|
||||||
import kdl
|
|
||||||
|
|
||||||
from ovtk_audiencekit.plugins import PluginBase
|
|
||||||
from ovtk_audiencekit.utils import format_exception
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class Scene:
|
|
||||||
name: str
|
|
||||||
group: str
|
|
||||||
enter: Callable
|
|
||||||
exit: Callable
|
|
||||||
entry_context: dict = field(default_factory=dict)
|
|
||||||
tasks: list[asyncio.Task] = field(default_factory=list)
|
|
||||||
|
|
||||||
|
|
||||||
class ScenePlugin(PluginBase):
|
|
||||||
"""Allows for creating modal configurations"""
|
|
||||||
|
|
||||||
def __init__(self, *args, **kwargs):
|
|
||||||
super().__init__(*args, **kwargs)
|
|
||||||
|
|
||||||
self.scenes = {}
|
|
||||||
self.active = {}
|
|
||||||
|
|
||||||
self._scene_state_changed = asyncio.Event()
|
|
||||||
|
|
||||||
self.blueprint.add_url_rule('/', 'ctrlpanel', self.ui_ctrlpanel)
|
|
||||||
self.blueprint.add_url_rule('/<name>/<cmd>', 'api-sceneset', self.ui_setscene)
|
|
||||||
self.blueprint.add_url_rule('/monitor', 'monitor', self.ui_monitor_ws, is_websocket=True)
|
|
||||||
|
|
||||||
async def run(self, name, _children=None, _ctx={}, active=None, group=None, immediate=True, **kwargs):
|
|
||||||
if _children is None and active is None:
|
|
||||||
raise UsageError('Either define a new scene or set `--active` to true / false')
|
|
||||||
|
|
||||||
if _children:
|
|
||||||
await self.define(name, group, _children, default_active=active, ctx=_ctx)
|
|
||||||
else:
|
|
||||||
await self.switch(name, active, is_immediate=immediate, ctx=_ctx)
|
|
||||||
|
|
||||||
async def define(self, name, group, children, default_active=False, ctx={}):
|
|
||||||
if self.scenes.get(name) is not None:
|
|
||||||
raise UsageError(f'Scene with name "{name}" already exists!')
|
|
||||||
|
|
||||||
# Categorize nodes
|
|
||||||
enter_nodes = []
|
|
||||||
exit_nodes = []
|
|
||||||
for child in children:
|
|
||||||
if child.name == 'exit':
|
|
||||||
exit_nodes.extend(child.nodes)
|
|
||||||
else:
|
|
||||||
enter_nodes.append(child)
|
|
||||||
# Make transisition functions
|
|
||||||
async def enter(ctx):
|
|
||||||
await self.execute_kdl(enter_nodes, _ctx=ctx)
|
|
||||||
scene.entry_context = ctx
|
|
||||||
async def exit(ctx):
|
|
||||||
await self.execute_kdl(exit_nodes, _ctx=ctx)
|
|
||||||
|
|
||||||
scene = Scene(name, group, enter, exit)
|
|
||||||
self.scenes[name] = scene
|
|
||||||
|
|
||||||
if default_active:
|
|
||||||
await self.switch(name, default_active, is_immediate=True, ctx=ctx)
|
|
||||||
|
|
||||||
async def switch(self, name, active, is_immediate=True, ctx={}):
|
|
||||||
scene = self.scenes.get(name)
|
|
||||||
if scene is None:
|
|
||||||
raise UsageError(f'No defined scene with name "{name}"')
|
|
||||||
|
|
||||||
if active:
|
|
||||||
if current := self.active.get(scene.group):
|
|
||||||
if current == scene:
|
|
||||||
return
|
|
||||||
await self._execute(current, 'exit', is_immediate, ctx)
|
|
||||||
self.active[scene.group] = scene
|
|
||||||
await self._execute(scene, 'enter', is_immediate, ctx)
|
|
||||||
else:
|
|
||||||
if self.active.get(scene.group) == scene:
|
|
||||||
self.active[scene.group] = None
|
|
||||||
await self._execute(scene, 'exit', is_immediate, ctx)
|
|
||||||
|
|
||||||
self._scene_state_changed.set()
|
|
||||||
self._scene_state_changed.clear()
|
|
||||||
|
|
||||||
async def _execute(self, scene, mode, immediate, ctx):
|
|
||||||
ctx = {**ctx} # HACK: Copy to avoid leakage from previous group item exit
|
|
||||||
|
|
||||||
scene_transision_fn = getattr(scene, mode)
|
|
||||||
|
|
||||||
# Wrap to handle context at exec time
|
|
||||||
async def context_wrapper(ctx):
|
|
||||||
if mode == 'exit':
|
|
||||||
ctx = {
|
|
||||||
**scene.entry_context,
|
|
||||||
'caller_ctx': ctx,
|
|
||||||
}
|
|
||||||
await scene_transision_fn(ctx)
|
|
||||||
if mode == 'enter':
|
|
||||||
scene.entry_context = ctx
|
|
||||||
coro = context_wrapper(ctx)
|
|
||||||
# Wrap to finish any other pending tasks before running this
|
|
||||||
if len(scene.tasks):
|
|
||||||
async def exec_order_wrap(other):
|
|
||||||
await asyncio.gather(*scene.tasks)
|
|
||||||
scene.tasks = []
|
|
||||||
await other
|
|
||||||
coro = exec_order_wrap(coro)
|
|
||||||
|
|
||||||
# Run (or schedule for execution)
|
|
||||||
if immediate:
|
|
||||||
try:
|
|
||||||
await coro
|
|
||||||
except Exception as e:
|
|
||||||
self.logger.error(f'Failed to handle "{scene.name}" {mode} transistion: {e}')
|
|
||||||
self.logger.debug(format_exception(e))
|
|
||||||
else:
|
|
||||||
scene.tasks.append(asyncio.create_task(coro))
|
|
||||||
|
|
||||||
def _get_state(self):
|
|
||||||
groups = {}
|
|
||||||
for scene_name, scene in self.scenes.items():
|
|
||||||
active = self.active.get(scene.group) == scene
|
|
||||||
group = scene.group or "default group"
|
|
||||||
if groups.get(group) is None:
|
|
||||||
groups[group] = {}
|
|
||||||
groups[group][scene_name] = active
|
|
||||||
return groups
|
|
||||||
|
|
||||||
async def ui_ctrlpanel(self):
|
|
||||||
groups = self._get_state()
|
|
||||||
return await quart.render_template('index.html', init_state=json.dumps(groups))
|
|
||||||
|
|
||||||
async def ui_setscene(self, name=None, cmd=None):
|
|
||||||
active = cmd == 'activate'
|
|
||||||
await self.switch(name, active, is_immediate=True)
|
|
||||||
return quart.Response(status=200)
|
|
||||||
|
|
||||||
async def ui_monitor_ws(self):
|
|
||||||
await quart.websocket.accept()
|
|
||||||
while True:
|
|
||||||
groups = self._get_state()
|
|
||||||
await quart.websocket.send(json.dumps(groups))
|
|
||||||
await self._scene_state_changed.wait()
|
|
|
@ -1 +0,0 @@
|
||||||
from .Plugin import ScenePlugin, Scene
|
|
|
@ -1,83 +0,0 @@
|
||||||
<!DOCTYPE html>
|
|
||||||
<html lang="en" dir="ltr">
|
|
||||||
<head>
|
|
||||||
<meta charset="utf-8">
|
|
||||||
<title>Test page</title>
|
|
||||||
<script type="importmap">
|
|
||||||
{
|
|
||||||
"imports": { "vue": "https://unpkg.com/vue@3/dist/vue.esm-browser.js" }
|
|
||||||
}
|
|
||||||
</script>
|
|
||||||
|
|
||||||
<script type="module">
|
|
||||||
import { createApp, ref, onMounted } from 'vue'
|
|
||||||
|
|
||||||
createApp({
|
|
||||||
setup() {
|
|
||||||
const groups = ref(JSON.parse('<< init_state|safe >>'))
|
|
||||||
const inflight = ref([])
|
|
||||||
onMounted(() => {
|
|
||||||
const websock = new WebSocket('<<url_for(".monitor")>>');
|
|
||||||
websock.addEventListener('message', (msg) => {
|
|
||||||
groups.value = JSON.parse(msg.data)
|
|
||||||
inflight.value = []
|
|
||||||
})
|
|
||||||
})
|
|
||||||
const toggle = async (group_name, scene_name) => {
|
|
||||||
if (inflight.value.includes(scene_name)) return
|
|
||||||
inflight.value.push(scene_name)
|
|
||||||
const next_state = !groups.value[group_name][scene_name]
|
|
||||||
await fetch(`${scene_name}/${next_state ? 'activate' : 'deactivate'}`, { method: 'GET' })
|
|
||||||
}
|
|
||||||
return { groups, inflight, toggle }
|
|
||||||
},
|
|
||||||
}).mount('#app')
|
|
||||||
</script>
|
|
||||||
</head>
|
|
||||||
|
|
||||||
<body id="root">
|
|
||||||
<div id="app">
|
|
||||||
<div v-for="(group, group_name) in groups" class="group">
|
|
||||||
<h3>{{ group_name }}</h3>
|
|
||||||
<div v-for="(active, scene_name) in group" v-on:click="toggle(group_name, scene_name)"
|
|
||||||
:class="{ active, pending: inflight.includes(scene_name), scene: true }"
|
|
||||||
>
|
|
||||||
<p>{{ scene_name }}</p>
|
|
||||||
</div>
|
|
||||||
</div>
|
|
||||||
</div>
|
|
||||||
|
|
||||||
<style type="text/css">
|
|
||||||
#app {
|
|
||||||
display: flex;
|
|
||||||
flex-direction: row;
|
|
||||||
flex-wrap: wrap;
|
|
||||||
font-family: sans-serif;
|
|
||||||
gap: 8px;
|
|
||||||
}
|
|
||||||
p {
|
|
||||||
text-align: center;
|
|
||||||
}
|
|
||||||
h3 {
|
|
||||||
margin-right: 1em;
|
|
||||||
}
|
|
||||||
.group {
|
|
||||||
display: flex;
|
|
||||||
flex-direction: column;
|
|
||||||
gap: 4px;
|
|
||||||
}
|
|
||||||
.scene {
|
|
||||||
padding: 12px 24px;
|
|
||||||
user-select: none;
|
|
||||||
background-color: lightgray;
|
|
||||||
flex: 1;
|
|
||||||
}
|
|
||||||
.scene.pending {
|
|
||||||
background-color: lightgoldenrodyellow;
|
|
||||||
}
|
|
||||||
.scene.active {
|
|
||||||
background-color: lightgreen;
|
|
||||||
}
|
|
||||||
</style>
|
|
||||||
</body>
|
|
||||||
</html>
|
|
|
@ -1,40 +0,0 @@
|
||||||
from ovtk_audiencekit.plugins import PluginBase
|
|
||||||
from ovtk_audiencekit.core.Config import compute_dynamic
|
|
||||||
|
|
||||||
|
|
||||||
class SetPlugin(PluginBase):
|
|
||||||
"""Set arbitrary data in the local context (can be fetched with the custom arg type)"""
|
|
||||||
|
|
||||||
def __init__(self, *args, **kwargs):
|
|
||||||
super().__init__(*args, **kwargs)
|
|
||||||
|
|
||||||
def run(self, *args, _children=[], _ctx={}, **kwargs):
|
|
||||||
self.proc_node(_ctx, *args, _children=_children, _stack=[], **kwargs)
|
|
||||||
self.logger.debug(_ctx)
|
|
||||||
|
|
||||||
def proc_node(self, target, *args, _children=[], _stack=[], **props):
|
|
||||||
if len(args) > 0 and len(props) > 0:
|
|
||||||
raise ValueError("Cannot use both item/list and keyword forms at the same time")
|
|
||||||
if _children and (len(args) > 0 or len(props) > 0):
|
|
||||||
raise ValueError("Cannot define value as something and dict at the same time")
|
|
||||||
if len(args) > 0 and len(_stack) == 0:
|
|
||||||
raise ValueError("Cannot use item/list short form on top level set")
|
|
||||||
|
|
||||||
if len(props) > 0:
|
|
||||||
for key, value in props.items():
|
|
||||||
if target.get(key) is not None:
|
|
||||||
fullkey = '.'.join([n for n, t in _stack] + [key])
|
|
||||||
self.logger.debug(f'Shadowing {fullkey}')
|
|
||||||
target[key] = value
|
|
||||||
elif _children:
|
|
||||||
for child in _children:
|
|
||||||
sub = dict()
|
|
||||||
target[child.name] = sub
|
|
||||||
stack = [*_stack, (child.name, target)]
|
|
||||||
key = '.'.join(s for s, t in stack)
|
|
||||||
|
|
||||||
args, props = compute_dynamic(child, _ctx=stack[0][1])
|
|
||||||
self.proc_node(sub, *args, _children=child.nodes, _stack=stack, **props)
|
|
||||||
elif len(args) > 0:
|
|
||||||
name, target = _stack[-1]
|
|
||||||
target[name] = args[0] if len(args) == 1 else args
|
|
|
@ -1,4 +0,0 @@
|
||||||
import traceback as traceback_lib
|
|
||||||
|
|
||||||
def format_exception(e, traceback=True):
|
|
||||||
return ''.join(traceback_lib.format_exception(None, e, e.__traceback__ if traceback else None))
|
|
Loading…
Reference in New Issue