[plugins/Cue] [Breaking!] Simplified syntax, better accuracy, and cancellation, oh my!

Simplified syntax: `cue` no longer requires sub-nodes - simply specify `at` or your
interval options on the cue node itself, and set `repeat` to a boolean (previously done via using the `every` subnode vs `once`)

Better accuracy: scheduler now runs outside of tick, allowing sub-second accuracy in
ideal environments. This is still a long way from a proper realtime solution -
queuing is merely asyncio's best-effort, accuracy may vary wildly depending on load!

Cancelation: passing a string as the first argument to cue will give the event a name,
which can be used by a later call to set the event as enabled or disabled.
Additionally, toggling this flag / redefining a named cue will reschedule it if it is
an interval, which you can use to create timeout behaviors!
This commit is contained in:
Derek 2023-09-01 13:57:39 -04:00
parent 2ab370d4a0
commit e7d16f44f4

View file

@ -1,51 +1,123 @@
import asyncio
import logging
import datetime
import uuid
import maya
import aioscheduler
from ovtk_audiencekit.plugins import PluginBase
from ovtk_audiencekit.utils import format_exception
logger = logging.getLogger(__name__)
class CueEvent:
def __init__(self, oneshot, at=None, **kwargs):
self.oneshot = oneshot
TIME_SEGMENTS = ['seconds', 'minutes', 'hours', 'days']
class Cue:
def __init__(self, repeat, at=None, **kwargs):
self.repeat = repeat
self.enabled = True
if at:
self._next_run = maya.parse(at)
self._interval = None
self._next = maya.when(at)
else:
self._next_run = maya.now().add(**kwargs)
self._interval = kwargs
self._next = maya.now().add(**kwargs)
self._interval = kwargs
def check(self):
if self._next_run <= maya.now():
if self._interval:
self._next_run = maya.now().add(**self._interval)
return True
return False
@property
def next(self):
return self._next.datetime(to_timezone='UTC', naive=True)
def is_obsolete(self):
if self.repeat:
return False
if self._next <= maya.now():
return False
return True
def reschedule(self, fresh=False):
if fresh:
self._next = maya.now().add(**self._interval)
else:
self._next = self._next.add(**self._interval)
# HACK: Compare epochs directly, as maya comps are only second accurate
if not fresh and self._next._epoch <= maya.now()._epoch:
offset = maya.now()._epoch - self._next._epoch
logger.warn(f'Cannot keep up with configured interval - {underrun} underrun. Repetition may fail!')
self._next = maya.now().add(**self._interval)
class CuePlugin(PluginBase):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.cue_events = {}
self.cues = {}
self.tasks = {}
def run(self, name=None, _children=None, **kwargs):
if not _children:
raise ValueError('Cue defined without any events')
self.scheduler = aioscheduler.TimedScheduler()
self.scheduler.start()
self._cleanup_task = asyncio.create_task(self._cleanup())
def run(self, name=None, repeat=False, enabled=None, _children=[], _ctx={}, **kwargs):
if name is None:
name = f"cue-{len(self.cue_events.keys())}"
name = str(uuid.uuid4())
for eventnode in _children:
at = eventnode.args[0] if len(eventnode.args) == 1 else None
oneshot = eventnode.name in ['once', 'after']
cue_event = CueEvent(oneshot, at=at, **eventnode.props)
first_set = self.cues.get(name) is None
actions = [lambda node=node: self.execute_kdl(node) for node in eventnode.nodes];
self.cue_events[name] = (cue_event, actions)
if len(_children) > 0:
has_interval = any(kwargs.get(segment) is not None for segment in TIME_SEGMENTS)
if kwargs.get('at') is None and not has_interval:
raise ValueError('Provide a concrete time with `at` or a timer length with `seconds`, `hours`, etc')
if kwargs.get('at') is not None and repeat and not has_interval:
raise ValueError('`repeat` can not be used with solely a concrete time')
async def tick(self, dt):
for key, (event, actions) in list(self.cue_events.items()):
if event.check():
for action in actions:
await action()
if event.oneshot:
del self.cue_events[key]
cue = Cue(repeat, **kwargs)
async def handler():
# Repetion management
if not cue.enabled:
return
if cue.repeat:
cue.reschedule()
self.tasks[name] = self.scheduler.schedule(handler(), cue.next)
# Run configured actions
try:
for node in _children:
await self.execute_kdl(node, _ctx=_ctx)
except Exception as e:
self.logger.error(f'Failed to complete cue {name}: {e}')
self.logger.debug(format_exception(e))
self.cues[name] = (cue, handler)
self.schedule_exec(name, cue.next, handler())
if enabled is not None:
entry = self.cues.get(name)
if entry is None:
self.logger.warn(f'Cannot find cue with name "{name}"')
return
cue, handler = entry
cue.enabled = enabled
if enabled and not first_set:
cue.reschedule(fresh=True)
self.schedule_exec(name, cue.next, handler())
def schedule_exec(self, name, at, coro):
if existing_task := self.tasks.get(name):
self.scheduler.cancel(existing_task)
try:
self.tasks[name] = self.scheduler.schedule(coro, at)
except ValueError as e:
self.logger.error(f'Cannot schedule cue {name} at {at}: {e}')
async def _cleanup(self):
while True:
await asyncio.sleep(60)
for name, (cue, _) in self.cues.items():
if cue.is_obsolete():
del self.cues[name]
if task := self.tasks.get(name):
self.scheduler.cancel(task)
del self.tasks[name]