Compare commits

...

17 commits

Author SHA1 Message Date
6f2128beb4 Fix TTS logging spam 2025-03-09 15:56:21 -07:00
fd128948ca Fix kdl encoding detection 2025-03-09 15:56:21 -07:00
ba82f2e422 Use selector asyncio loop
Testing shows that it is more stable than the proactor loop. Dunno why
2025-03-09 15:56:21 -07:00
533286c7ca Fix full-async stdin reader 2025-03-09 15:56:21 -07:00
23f4190506 Remove aioprocessing, update all other dependencies
This removes the ability for audio clips to be played sync, but no 
internal uses do that anyways, so its fine probably
2025-03-09 15:56:21 -07:00
3a4e65b683 Fix module defined event ingest 2025-03-09 15:56:21 -07:00
ffd48dc1f2 Fix some plugin requests -> httpx imports 2025-03-09 15:56:21 -07:00
d9e14d49dd Fix missing await 2025-03-09 15:56:21 -07:00
be6ec19762 Move websocket bus to asyncio operation 2025-03-09 15:56:21 -07:00
ded19ef261 Move chats to asyncio operation 2025-03-09 15:56:21 -07:00
54226e7940 Remove peertube chat module
This feature was rejected upstream aaaages ago, and i dont maintain the 
fork with it in it anymore
2025-03-09 15:56:21 -07:00
2d908e60a5 Add logging to tts gen 2025-03-09 15:55:25 -07:00
d53508c158 Fix audio stretching behavior attempt 2 2025-03-09 15:55:06 -07:00
11b4c92fe9 [plugins/tts] Do text filtering for external users as well 2025-03-02 17:21:05 -05:00
32fc1660ec [builtins/scene] Fix title
oop
2025-03-02 17:11:29 -05:00
4b5dd0cf43 Fix non-pitchsynced playback speed 2025-03-02 17:10:28 -05:00
ba0b8c1068 [plugins] Work around blueprint templates (web UI) sharing a namespace 2025-03-02 17:08:33 -05:00
27 changed files with 1954 additions and 877 deletions

2076
pdm.lock generated

File diff suppressed because it is too large Load diff

View file

@ -8,21 +8,20 @@ authors = [
dependencies = [ dependencies = [
"click", "click",
"kdl-py", "kdl-py",
"quart==0.18.*", "quart",
"werkzeug==2.3.7", "werkzeug",
"hypercorn", "hypercorn",
"requests", "websockets",
"websockets==11.0.3",
"aioprocessing",
"aioscheduler", "aioscheduler",
"pyaudio==0.2.*", "pyaudio",
"librosa==0.8.*", "librosa",
"pytsmod", "pytsmod",
"numpy", "numpy",
"multipledispatch", "multipledispatch",
"blessed", "blessed",
"appdirs", "appdirs",
"maya", "maya",
"httpx",
] ]
requires-python = ">=3.10,<3.11" requires-python = ">=3.10,<3.11"
readme = "README.md" readme = "README.md"
@ -30,18 +29,17 @@ license = {text = "GPLv2"}
[project.optional-dependencies] [project.optional-dependencies]
tts = [ tts = [
"TTS==0.9.*", "coqui-tts",
"torch==1.13.*",
] ]
phrasecounter = ["num2words"] phrasecounter = ["num2words"]
jail = ["owoify-py==2.*"] jail = ["owoify-py"]
twitch = ["miniirc"] twitch = ["miniirc"]
midi = [ midi = [
"mido", "mido",
"python-rtmidi", "python-rtmidi",
] ]
obs = ["simpleobsws"] obs = ["simpleobsws"]
osc = ["python-osc>=1.9.0"] osc = ["python-osc"]
yt-dlp = ["yt-dlp"] yt-dlp = ["yt-dlp"]
[build-system] [build-system]

View file

@ -1,5 +1,5 @@
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from multiprocessing import Process, Pipe, Manager import asyncio
import sys import sys
import os import os
import json import json
@ -16,22 +16,30 @@ class GracefulShutdownException(Exception):
class ShutdownRequest(Event): class ShutdownRequest(Event):
_hidden = True _hidden = True
class ManagerMock(dict):
__getattr__ = dict.get
__setattr__ = dict.__setitem__
__delattr__ = dict.__delitem__
class ChatProcess(Process, ABC):
def __init__(self, stdin_lock, name, readonly=False): class ChatProcess(ABC):
def __init__(self, name, event_queue, readonly=False):
super().__init__() super().__init__()
self._stdin_lock = stdin_lock
self._name = name self._name = name
self._event_queue = event_queue
self._control_queue = asyncio.Queue()
self.readonly = readonly self.readonly = readonly
self.event_pipe, self._event_pipe = Pipe(duplex=False)
self._control_pipe, self.control_pipe = Pipe(duplex=False)
self.logger = logging.getLogger(f'chat.{self._name}') self.logger = logging.getLogger(f'chat.{self._name}')
self._state = None self._state = None
self._next_state = None self._next_state = None
self.shared = Manager().Namespace() self.shared = ManagerMock({})
@abstractmethod
def setup(self, *args, **kwargs):
pass
@abstractmethod @abstractmethod
def loop(self, next_state): def loop(self, next_state):
@ -41,7 +49,7 @@ class ChatProcess(Process, ABC):
pass pass
def send(self, event): def send(self, event):
self.control_pipe.send(event) self._control_queue.put_nowait(event)
@property @property
def state(self): def state(self):
@ -82,24 +90,20 @@ class ChatProcess(Process, ABC):
print(f'WARNING: Unprocessed control message in {self._name} - {event}') print(f'WARNING: Unprocessed control message in {self._name} - {event}')
def safe_input(self, prompt): def safe_input(self, prompt):
self._stdin_lock.aquire()
with os.fdopen(0) as real_stdin:
old_stdin = sys.stdin
sys.stdin = real_stdin
response = input(prompt) response = input(prompt)
sys.stdin = old_stdin
self._stdin_lock.release()
return response return response
def publish(self, event): def publish(self, event):
self._event_pipe.send(event) self._event_queue.put_nowait(event)
def run(self): async def run(self):
while True: while True:
try: try:
timeout = 0 timeout = 0
# Run the code loop # Run the code loop
response = self.loop(self._next_state) response = self.loop(self._next_state)
if asyncio.iscoroutine(response):
response = await response
# Check response type and use the appropriate interface # Check response type and use the appropriate interface
# # integer / float: timeout value to use for the next loop # # integer / float: timeout value to use for the next loop
# # two states (tuple): current state and next state # # two states (tuple): current state and next state
@ -113,14 +117,19 @@ class ChatProcess(Process, ABC):
self.state = response self.state = response
# Wait for inconming events for at most timeout seconds - handle if any come in # Wait for inconming events for at most timeout seconds - handle if any come in
if self._control_pipe.poll(timeout): try:
incoming_event = self._control_pipe.recv() incoming_event = await asyncio.wait_for(self._control_queue.get(), timeout)
except asyncio.exceptions.TimeoutError:
continue
# Handle messages implimented by this base class # Handle messages implimented by this base class
if isinstance(incoming_event, ShutdownRequest): if isinstance(incoming_event, ShutdownRequest):
raise GracefulShutdownException() raise GracefulShutdownException()
# Pass the rest on # Pass the rest on
elif not self.readonly: elif not self.readonly:
response = self.on_control_event(incoming_event, self._next_state) response = self.on_control_event(incoming_event, self._next_state)
if asyncio.iscoroutine(response):
response = await response
# Similar to above, handle response states # Similar to above, handle response states
if response is None: if response is None:
continue continue
@ -128,7 +137,6 @@ class ChatProcess(Process, ABC):
self.state, self._next_state = response self.state, self._next_state = response
else: else:
self.state = response self.state = response
except (GracefulShutdownException, KeyboardInterrupt): except (GracefulShutdownException, KeyboardInterrupt):
return 0 return 0
except Exception as e: except Exception as e:

View file

@ -1,4 +1,5 @@
import random import random
import asyncio
from enum import Enum, auto from enum import Enum, auto
from ovtk_audiencekit.chats import ChatProcess from ovtk_audiencekit.chats import ChatProcess
@ -16,8 +17,7 @@ class StartStop(Event):
class FakeChat(ChatProcess): class FakeChat(ChatProcess):
def __init__(self, *args, max_delay=10, max_messages_per_chunk=1, start_paused=True, max_monitization=None, **kwargs): def setup(self, max_delay=10, max_messages_per_chunk=1, start_paused=True, max_monitization=None):
super().__init__(*args, **kwargs)
self._max_delay = max_delay self._max_delay = max_delay
self._max_messages_per_chunk = max_messages_per_chunk self._max_messages_per_chunk = max_messages_per_chunk
self._max_monitization = max_monitization self._max_monitization = max_monitization

View file

@ -3,9 +3,11 @@ import random
import logging import logging
from enum import Enum, auto from enum import Enum, auto
from itertools import chain from itertools import chain
import asyncio
import websockets
from ovtk_audiencekit.chats import ChatProcess from ovtk_audiencekit.chats import ChatProcess
from ovtk_audiencekit.utils import NonBlockingWebsocket
from ovtk_audiencekit.events.Message import Message, USER_TYPE from ovtk_audiencekit.events.Message import Message, USER_TYPE
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -17,8 +19,7 @@ class STATES(Enum):
class MisskeyProcess(ChatProcess): class MisskeyProcess(ChatProcess):
def __init__(self, *args, instance=None, channel=None, token=None, **kwargs): def setup(self, instance=None, channel=None, token=None):
super().__init__(*args, **kwargs)
self._url = f'wss://{instance}/streaming' self._url = f'wss://{instance}/streaming'
if token: if token:
self._url += f'?token={token}' self._url += f'?token={token}'
@ -42,9 +43,8 @@ class MisskeyProcess(ChatProcess):
return msg return msg
return None return None
def on_connecting(self, next_state): async def on_connecting(self, next_state):
self._ws = NonBlockingWebsocket(self._url) self._ws = await websockets.connect(self._url)
self._ws.start()
payload = { payload = {
'type': 'connect', 'type': 'connect',
'body': { 'body': {
@ -55,12 +55,15 @@ class MisskeyProcess(ChatProcess):
} }
} }
} }
self._ws.send(json.dumps(payload)) logger.debug("Subscribed to channel bus")
await self._ws.send(json.dumps(payload))
return STATES.READING return STATES.READING
def on_reading(self, next_state, timeout=0.1): async def on_reading(self, next_state, timeout=0.1):
if self._ws.poll(timeout): try:
note_event = self._ws.recv() note_event = await asyncio.wait_for(self._ws.recv(), timeout)
except asyncio.exceptions.TimeoutError:
return 0
try: try:
misskey_event = json.loads(note_event) misskey_event = json.loads(note_event)
if misskey_event['body']['id'] == self._subid and misskey_event['body']['type'] == 'note': if misskey_event['body']['id'] == self._subid and misskey_event['body']['type'] == 'note':

View file

@ -1 +0,0 @@
from .peertube import PtChatProcess as Process

View file

@ -1,70 +0,0 @@
import json
import random
import logging
from enum import Enum, auto
from itertools import chain
import socketio
import requests
from ovtk_audiencekit.chats import ChatProcess
from ovtk_audiencekit.events.Message import Message, USER_TYPE
logger = logging.getLogger(__name__)
class STATES(Enum):
CONNECTING = auto()
READING = auto()
class PtChatProcess(ChatProcess):
def __init__(self, *args, instance_url=None, channel=None, token=None, **kwargs):
super().__init__(*args, **kwargs)
self.instance_url = instance_url
self.channel = channel
self._state_machine = self.bind_to_states(STATES)
self.state = STATES.CONNECTING
def normalize_event(self, event):
message = event['message']
user_name = message['account']['displayName'] or message['account']['name']
user_id = message['account']['id']
text = message.get('text', '')
if text:
msg = Message(self._name, text,
user_name, user_id, USER_TYPE.USER,
id=message['id'])
return msg
return None
def on_connecting(self, next_state):
self._session = requests.Session()
self._sio = socketio.Client()
api_channel = self._session.get(f'{self.instance_url}/api/v1/video-channels/{self.channel}')
api_channel.raise_for_status()
room_id = api_channel.json().get('roomId')
if room_id is None:
raise ValueError('No chatroom returned from channel api!')
self._sio.connect(self.instance_url, namespaces=['/live-chat'], wait=True)
self._sio.emit('subscribe', data={'roomId': room_id}, namespace='/live-chat')
self._sio.on('new-message', self.handle_message, namespace='/live-chat')
return STATES.READING
def handle_message(self, data):
try:
norm = self.normalize_event(data)
self.publish(norm)
except Exception:
logger.error(f'Failed to process message: {data}')
def on_reading(self, next_state, timeout=0.5):
self._sio.sleep(timeout)
return 0
def loop(self, next_state):
return self._state_machine(self.state, next_state)

View file

@ -1,4 +1,4 @@
import time import asyncio
from enum import Enum, auto from enum import Enum, auto
from itertools import chain from itertools import chain
@ -20,7 +20,7 @@ valid_emote_resolutions = [1.0, 2.0, 3.0, 4.0]
class TwitchProcess(ChatProcess): class TwitchProcess(ChatProcess):
def __init__(self, *args, async def setup(self,
# Shared # Shared
channel_name=None, client_id=None, access_token=None, channel_name=None, client_id=None, access_token=None,
# IRC options # IRC options
@ -28,11 +28,7 @@ class TwitchProcess(ChatProcess):
# EventSub options # EventSub options
eventsub=True, eventsub_host='wss://ovtk.skeh.site/twitch', eventsub=True, eventsub_host='wss://ovtk.skeh.site/twitch',
# BTTV integration # BTTV integration
bttv=False, bttv=False):
# Inheritance boilerplate
**kwargs):
super().__init__(*args, **kwargs)
if channel_name is None or client_id is None or access_token is None: if channel_name is None or client_id is None or access_token is None:
raise ValueError('Twitch chat is missing config requirements') raise ValueError('Twitch chat is missing config requirements')
if emote_res not in valid_emote_resolutions: if emote_res not in valid_emote_resolutions:
@ -50,8 +46,8 @@ class TwitchProcess(ChatProcess):
self.api = TwitchAPI(self._client_id, self._token) self.api = TwitchAPI(self._client_id, self._token)
self.shared.api = self.api self.shared.api = self.api
cheermotes = self.api.get_cheermotes(self._channel_name) cheermotes = await self.api.get_cheermotes(self._channel_name)
target_data = self.api.get_user_details(self._channel_name) target_data = await self.api.get_user_details(self._channel_name)
self.shared.target_data = target_data self.shared.target_data = target_data
self.shared.users = [] self.shared.users = []
@ -64,12 +60,12 @@ class TwitchProcess(ChatProcess):
self._sources.append(self.eventsub) self._sources.append(self.eventsub)
self.bttv = BTTV(target_data['user']['id']) if bttv else None self.bttv = BTTV(target_data['user']['id']) if bttv else None
await self.bttv.setup()
def loop(self, next_state): def loop(self, next_state):
return self._state_machine(self.state, next_state) return self._state_machine(self.state, next_state)
def on_control_event(self, event, next_state): async def on_control_event(self, event, next_state):
if isinstance(event, Message): if isinstance(event, Message):
# Twitch.... why... why no newlines.... # Twitch.... why... why no newlines....
for line in event.text.split('\n'): for line in event.text.split('\n'):
@ -77,7 +73,7 @@ class TwitchProcess(ChatProcess):
continue continue
if event.replies_to: if event.replies_to:
line = f"@{event.replies_to.user_name} {line}" line = f"@{event.replies_to.user_name} {line}"
self.irc.send(line) await self.irc.send(line)
def on_state_enter(self, new_state): def on_state_enter(self, new_state):
status_messages = { status_messages = {
@ -89,15 +85,16 @@ class TwitchProcess(ChatProcess):
sys_msg = SysMessage(self._name, message) sys_msg = SysMessage(self._name, message)
self.publish(sys_msg) self.publish(sys_msg)
def on_connecting(self, next_state): async def on_connecting(self, next_state):
self.irc.connect() await self.irc.connect()
if self.__dict__.get('eventsub'): if self.__dict__.get('eventsub'):
self.eventsub.subscribe(self._channel_name) await self.eventsub.subscribe(self._channel_name)
return STATES.READING return STATES.READING
def on_reading(self, next_state): async def on_reading(self, next_state):
try: try:
for event in chain(*(source.read(0.1) for source in self._sources)): events = [event for source in self._sources async for event in source.read(0.1)]
for event in events:
# Retarget event # Retarget event
event.via = self._name event.via = self._name
if self.bttv and isinstance(event, Message): if self.bttv and isinstance(event, Message):
@ -107,8 +104,8 @@ class TwitchProcess(ChatProcess):
except TimeoutError: except TimeoutError:
return STATES.TIMEOUT return STATES.TIMEOUT
def on_timeout(self, next_state): async def on_timeout(self, next_state):
time.sleep(3) await asyncio.sleep(3)
return STATES.CONNECTING return STATES.CONNECTING
def on_failure(self, next_state): def on_failure(self, next_state):

View file

@ -1,7 +1,7 @@
from itertools import chain from itertools import chain
import re import re
import requests import httpx
API_URL = 'https://api.betterttv.net/3/cached' API_URL = 'https://api.betterttv.net/3/cached'
CDN_URL = 'https://cdn.betterttv.net/emote' CDN_URL = 'https://cdn.betterttv.net/emote'
@ -11,9 +11,10 @@ word_regex = re.compile('\b([A-z0-9]+)\b')
class BTTV: class BTTV:
def __init__(self, user_id): def __init__(self, user_id):
self.user_id = user_id self.user_id = user_id
self._session = requests.Session() self._session = httpx.AsyncClient()
self.emotes = {emote['code']: emote for emote in chain(self._get_global(), self._get_channel())} async def setup(self):
self.emotes = {emote['code']: emote for emote in chain(await self._get_global(), await self._get_channel())}
def hydrate(self, event): def hydrate(self, event):
text, emotes = self.parse_emotes(event.text) text, emotes = self.parse_emotes(event.text)
@ -37,13 +38,13 @@ class BTTV:
return output, used_emotes return output, used_emotes
def _get_global(self): async def _get_global(self):
response = self._session.get(f'{API_URL}/emotes/global') response = await self._session.get(f'{API_URL}/emotes/global')
response.raise_for_status() response.raise_for_status()
return response.json() return response.json()
def _get_channel(self): async def _get_channel(self):
response = self._session.get(f'{API_URL}/users/twitch/{self.user_id}') response = await self._session.get(f'{API_URL}/users/twitch/{self.user_id}')
response.raise_for_status() response.raise_for_status()
data = response.json() data = response.json()
return chain(data['channelEmotes'], data['sharedEmotes']) return chain(data['channelEmotes'], data['sharedEmotes'])

View file

@ -1,4 +1,4 @@
import requests import httpx
API_URL = 'https://api.twitch.tv/helix' API_URL = 'https://api.twitch.tv/helix'
@ -6,33 +6,33 @@ class TwitchAPI:
def __init__(self, client_id, token): def __init__(self, client_id, token):
self._client_id = client_id self._client_id = client_id
self._token = token self._token = token
self._session = requests.Session() self._session = httpx.AsyncClient()
self._session.headers.update({'Authorization': f'Bearer {self._token}', 'Client-ID': client_id, 'Accept': 'application/vnd.twitchtv.v5+json'}) self._session.headers.update({'Authorization': f'Bearer {self._token}', 'Client-ID': client_id, 'Accept': 'application/vnd.twitchtv.v5+json'})
def get_cheermotes(self, channel): async def get_cheermotes(self, channel):
response = self._session.get(f'{API_URL}/bits/cheermotes', params={'channel_id': channel}) response = await self._session.get(f'{API_URL}/bits/cheermotes', params={'channel_id': channel})
response.raise_for_status() response.raise_for_status()
return response.json()['data'] return response.json()['data']
def get_user_details(self, username): async def get_user_details(self, username):
data = dict(link=f'https://twitch.tv/{username}') data = dict(link=f'https://twitch.tv/{username}')
try: try:
user_response = self._session.get(f'{API_URL}/users', params={'login': username}) user_response = await self._session.get(f'{API_URL}/users', params={'login': username})
user_response.raise_for_status() user_response.raise_for_status()
data['user'] = user_response.json()['data'][0] data['user'] = user_response.json()['data'][0]
channel_response = self._session.get(f'{API_URL}/channels', params={'broadcaster_id': data['user']['id']}) channel_response = await self._session.get(f'{API_URL}/channels', params={'broadcaster_id': data['user']['id']})
channel_response.raise_for_status() channel_response.raise_for_status()
data['channel'] = channel_response.json()['data'][0] data['channel'] = channel_response.json()['data'][0]
except (KeyError, IndexError): except (KeyError, IndexError):
return None return None
return data return data
def get_clips(self, username): async def get_clips(self, username):
user_response = self._session.get(f'{API_URL}/users', params={'login': username}) user_response = await self._session.get(f'{API_URL}/users', params={'login': username})
user_response.raise_for_status() user_response.raise_for_status()
channel_id = user_response.json()['data'][0]['id'] channel_id = user_response.json()['data'][0]['id']
clips_response = self._session.get(f'{API_URL}/clips', params={'broadcaster_id': broadcaster_id}) clips_response = await self._session.get(f'{API_URL}/clips', params={'broadcaster_id': broadcaster_id})
clips_response.raise_for_status() clips_response.raise_for_status()
return clips_response.json()['data'] return clips_response.json()['data']

View file

@ -1,7 +1,8 @@
import json import json
import logging import logging
import asyncio
from ovtk_audiencekit.utils import NonBlockingWebsocket import websockets
from ovtk_audiencekit.core.Data import ovtk_user_id from ovtk_audiencekit.core.Data import ovtk_user_id
from ovtk_audiencekit.events import Follow from ovtk_audiencekit.events import Follow
@ -15,14 +16,13 @@ class TwitchEventSub:
self.api = api self.api = api
self.mirror_socket = f'{eventsub_host}/ws/{ovtk_user_id}' self.mirror_socket = f'{eventsub_host}/ws/{ovtk_user_id}'
def subscribe(self, username): async def subscribe(self, username):
self._ws = NonBlockingWebsocket(self.mirror_socket) self._ws = await websockets.connect(self.mirror_socket)
self._ws.start()
greet = self._ws.recv() greet = await self._ws.recv()
greet = json.loads(greet) greet = json.loads(greet)
if len(greet['subs']) == 0: if len(greet['subs']) == 0:
target_user = self.api.get_user_details(username) target_user = await self.api.get_user_details(username)
target_user_id = target_user['user']['id'] target_user_id = target_user['user']['id']
supported_eventsub_types = [ supported_eventsub_types = [
@ -37,8 +37,8 @@ class TwitchEventSub:
'condition': condition, 'condition': condition,
'version': '1', 'version': '1',
} }
self._ws.send(json.dumps(payload)) await self._ws.send(json.dumps(payload))
status = self._ws.recv() status = await self._ws.recv()
status = json.loads(status) status = json.loads(status)
if status['type'] != 'subsuccess': if status['type'] != 'subsuccess':
logger.warning(f'Failed to subscribe to {sub_type} EventSub topics - these events will not come through!!') logger.warning(f'Failed to subscribe to {sub_type} EventSub topics - these events will not come through!!')
@ -63,12 +63,15 @@ class TwitchEventSub:
user_name, user_id) user_name, user_id)
return norm_event return norm_event
def read(self, timeout): async def read(self, timeout):
if self._ws.poll(timeout): try:
messages = self._ws.recv() messages = await asyncio.wait_for(self._ws.recv(), timeout)
except asyncio.exceptions.TimeoutError:
return
for message in messages.splitlines(): for message in messages.splitlines():
if message == 'PING': if message == 'PING':
self._ws.send('PONG') await self._ws.send('PONG')
continue continue
try: try:
data = json.loads(message) data = json.loads(message)

View file

@ -4,13 +4,13 @@ import re
from itertools import chain, islice from itertools import chain, islice
import logging import logging
from collections import OrderedDict from collections import OrderedDict
import asyncio
import websockets.exceptions import websockets
from miniirc import ircv3_message_parser from miniirc import ircv3_message_parser
from ovtk_audiencekit.events.Message import Message, USER_TYPE from ovtk_audiencekit.events.Message import Message, USER_TYPE
from ovtk_audiencekit.events import Subscription from ovtk_audiencekit.events import Subscription
from ovtk_audiencekit.utils import NonBlockingWebsocket
from ..Events import Raid from ..Events import Raid
@ -43,24 +43,25 @@ class TwitchIRC:
self._reply_buffer = OrderedDict() self._reply_buffer = OrderedDict()
self._group_gifts = {} self._group_gifts = {}
def connect(self): async def connect(self):
self._ws = NonBlockingWebsocket(WEBSOCKET_ADDRESS) self._ws = await websockets.connect(WEBSOCKET_ADDRESS)
self._ws.start() await self._ws.send(f'PASS oauth:{self._token}')
self._ws.send(f'PASS oauth:{self._token}') await self._ws.send(f'NICK {self._username}')
self._ws.send(f'NICK {self._username}') response = await self._ws.recv()
response = self._ws.recv()
if any('Welcome, GLHF!' in msg for msg in response.splitlines()): if any('Welcome, GLHF!' in msg for msg in response.splitlines()):
self._ws.send(f'JOIN #{self._channel_name}') await self._ws.send(f'JOIN #{self._channel_name}')
self._ws.send('CAP REQ :twitch.tv/tags') await self._ws.send('CAP REQ :twitch.tv/tags')
self._ws.send('CAP REQ :twitch.tv/commands') await self._ws.send('CAP REQ :twitch.tv/commands')
self._ws.send('CAP REQ :twitch.tv/membership') await self._ws.send('CAP REQ :twitch.tv/membership')
else: else:
raise TwitchIRCException(f'Got bad response during auth: {response}') raise TwitchIRCException(f'Got bad response during auth: {response}')
def read(self, timeout): async def read(self, timeout):
try:
messages = await asyncio.wait_for(self._ws.recv(), timeout)
except asyncio.exceptions.TimeoutError:
return
try: try:
if self._ws.poll(timeout):
messages = self._ws.recv()
for message in messages.splitlines(): for message in messages.splitlines():
normalized = None normalized = None
cmd, hostmask, tags, args = ircv3_message_parser(message) cmd, hostmask, tags, args = ircv3_message_parser(message)
@ -70,7 +71,7 @@ class TwitchIRC:
elif cmd == 'USERNOTICE': elif cmd == 'USERNOTICE':
normalized = self.normalize_event(hostmask, tags, args) normalized = self.normalize_event(hostmask, tags, args)
elif cmd == 'PING': elif cmd == 'PING':
self._ws.send(f"PONG {' '.join(args)}") await self._ws.send(f"PONG {' '.join(args)}")
elif cmd == 'RECONNECT': elif cmd == 'RECONNECT':
raise TimeoutError('Twitch API requested timeout') raise TimeoutError('Twitch API requested timeout')
elif cmd == 'JOIN': elif cmd == 'JOIN':
@ -87,12 +88,12 @@ class TwitchIRC:
self._reply_buffer.popitem() self._reply_buffer.popitem()
yield normalized yield normalized
except websockets.exceptions.ConnectionClosedError: except websockets.exceptions.ConnectionClosedError:
self.logger.info('Twitch websocket disconnected - trying reconnet') self.logger.warning('Twitch websocket disconnected - trying reconnet')
self.connect() await self.connect()
def send(self, message): async def send(self, message):
irc_msg = f'PRIVMSG #{self._username} :{message}' irc_msg = f'PRIVMSG #{self._username} :{message}'
self._ws.send(irc_msg) await self._ws.send(irc_msg)
def parse_badges(self, tags): def parse_badges(self, tags):
if isinstance(tags.get('badges'), str): if isinstance(tags.get('badges'), str):

View file

@ -1,9 +1,8 @@
import os
import json import json
import webbrowser import webbrowser
from enum import Enum, auto from enum import Enum, auto
import requests import httpx
from ovtk_audiencekit.chats import ChatProcess from ovtk_audiencekit.chats import ChatProcess
from ovtk_audiencekit.events.Message import Message, SysMessage, USER_TYPE from ovtk_audiencekit.events.Message import Message, SysMessage, USER_TYPE
@ -24,7 +23,7 @@ class YoutubeLivePollProcess(ChatProcess):
GOOGLE_OAUTH_TOKEN_URL = 'https://accounts.google.com/o/oauth2/token' GOOGLE_OAUTH_TOKEN_URL = 'https://accounts.google.com/o/oauth2/token'
YOUTUBE_API_URL = 'https://www.googleapis.com/youtube/v3' YOUTUBE_API_URL = 'https://www.googleapis.com/youtube/v3'
def __init__(self, *args, client_secrets_path=None): async def setup(self, client_secrets_path=None):
if client_secrets_path is None or not os.path.exists(client_secrets_path): if client_secrets_path is None or not os.path.exists(client_secrets_path):
raise ValueError('Missing client secrets') raise ValueError('Missing client secrets')
@ -33,7 +32,6 @@ class YoutubeLivePollProcess(ChatProcess):
if client_secrets is None: if client_secrets is None:
raise ValueError('Malformed client secrets file - missing installed section') raise ValueError('Malformed client secrets file - missing installed section')
super().__init__(*args)
self._client_secrets = client_secrets self._client_secrets = client_secrets
self._state_machine = self.bind_to_states(STATES) self._state_machine = self.bind_to_states(STATES)
@ -44,14 +42,14 @@ class YoutubeLivePollProcess(ChatProcess):
self._live_chat_id = None self._live_chat_id = None
self._page_token = None self._page_token = None
def loop(self, next_state): async def loop(self, next_state):
if self.state is None: if self.state is None:
if os.path.exists('refresh_token.secret'): if os.path.exists('refresh_token.secret'):
with open('refresh_token.secret', 'r') as f: with open('refresh_token.secret', 'r') as f:
self._refresh_token = f.read() self._refresh_token = f.read()
return STATES.REFRESH, STATES.WAITING_FOR_BROADCAST return STATES.REFRESH, STATES.WAITING_FOR_BROADCAST
return STATES.UNAUTHORIZED, STATES.WAITING_FOR_BROADCAST return STATES.UNAUTHORIZED, STATES.WAITING_FOR_BROADCAST
return self._state_machine(self.state, next_state) return await self._state_machine(self.state, next_state)
def on_unauthorized(self, next_state): def on_unauthorized(self, next_state):
self.request_oauth_consent() self.request_oauth_consent()
@ -66,8 +64,8 @@ class YoutubeLivePollProcess(ChatProcess):
self.get_fresh_access_token() self.get_fresh_access_token()
return next_state return next_state
def on_waiting_for_broadcast(self, next_state): async def on_waiting_for_broadcast(self, next_state):
response = requests.get(f'{self.__class__.YOUTUBE_API_URL}/liveBroadcasts', response = await httpx.get(f'{self.__class__.YOUTUBE_API_URL}/liveBroadcasts',
params={'part': 'snippet', 'broadcastStatus': 'active'}, params={'part': 'snippet', 'broadcastStatus': 'active'},
headers={'Authorization': f'Bearer {self._access_token}'}) headers={'Authorization': f'Bearer {self._access_token}'})
@ -83,8 +81,8 @@ class YoutubeLivePollProcess(ChatProcess):
return 30 return 30
def on_polling(self, next_state): async def on_polling(self, next_state):
response = requests.get(f'{self.__class__.YOUTUBE_API_URL}/liveChat/messages', response = await httpx.get(f'{self.__class__.YOUTUBE_API_URL}/liveChat/messages',
params={'liveChatId': self._live_chat_id, params={'liveChatId': self._live_chat_id,
'part': 'snippet,authorDetails', 'part': 'snippet,authorDetails',
'hl': 'en_US', 'hl': 'en_US',
@ -95,7 +93,7 @@ class YoutubeLivePollProcess(ChatProcess):
if (response.status_code == 401): if (response.status_code == 401):
return STATES.REFRESH, self.state return STATES.REFRESH, self.state
if response.status_code == requests.codes.ok: if response.status_code == httpx.codes.ok:
data = response.json() data = response.json()
self._page_token = data['nextPageToken'] self._page_token = data['nextPageToken']
if len(data['items']): if len(data['items']):
@ -156,8 +154,8 @@ class YoutubeLivePollProcess(ChatProcess):
url = f"{self._client_secrets['auth_uri']}?{param_str}" url = f"{self._client_secrets['auth_uri']}?{param_str}"
webbrowser.open(url) webbrowser.open(url)
def setup_oauth_consent(self, consent_code): async def setup_oauth_consent(self, consent_code):
response = requests.post(self.__class__.GOOGLE_OAUTH_TOKEN_URL, data={ response = await httpx.post(self.__class__.GOOGLE_OAUTH_TOKEN_URL, data={
'code': consent_code, 'code': consent_code,
'client_id': self._client_secrets['client_id'], 'client_id': self._client_secrets['client_id'],
'client_secret': self._client_secrets['client_secret'], 'client_secret': self._client_secrets['client_secret'],
@ -171,8 +169,8 @@ class YoutubeLivePollProcess(ChatProcess):
self._access_token = auth['access_token'] self._access_token = auth['access_token']
self._refresh_token = auth['refresh_token'] self._refresh_token = auth['refresh_token']
def get_fresh_access_token(self): async def get_fresh_access_token(self):
response = requests.post(self.__class__.GOOGLE_OAUTH_TOKEN_URL, data={ response = await httpx.post(self.__class__.GOOGLE_OAUTH_TOKEN_URL, data={
'client_id': self._client_secrets['client_id'], 'client_id': self._client_secrets['client_id'],
'client_secret': self._client_secrets['client_secret'], 'client_secret': self._client_secrets['client_secret'],
'refresh_token': self._refresh_token, 'refresh_token': self._refresh_token,

View file

@ -59,6 +59,12 @@ def cli(loglevel, show_time=False):
logging.getLogger('numba').setLevel(logging.WARN) logging.getLogger('numba').setLevel(logging.WARN)
logging.getLogger('hypercorn.error').setLevel(logging.WARN) logging.getLogger('hypercorn.error').setLevel(logging.WARN)
logging.getLogger('hypercorn.access').setLevel(logging.WARN) logging.getLogger('hypercorn.access').setLevel(logging.WARN)
logging.getLogger('httpx').setLevel(logging.WARN)
logging.getLogger('httpcore').setLevel(logging.INFO)
logging.getLogger('torio._extension.utils').setLevel(logging.WARN)
logging.getLogger('matplotlib').setLevel(logging.INFO)
logging.getLogger('fsspec').setLevel(logging.INFO)
logging.getLogger('TTS').setLevel(logging.INFO if loglevel == logging.DEBUG else logging.WARN)
# Quiet warnings # Quiet warnings
if loglevel > logging.DEBUG: if loglevel > logging.DEBUG:
warnings.filterwarnings("ignore") warnings.filterwarnings("ignore")

View file

@ -8,7 +8,6 @@ import pyaudio as pya
import librosa import librosa
import pytsmod as tsm import pytsmod as tsm
import soundfile import soundfile
from aioprocessing import AioEvent
# HACK: Redirect stderr to /dev/null to silence portaudio boot # HACK: Redirect stderr to /dev/null to silence portaudio boot
devnull = os.open(os.devnull, os.O_WRONLY) devnull = os.open(os.devnull, os.O_WRONLY)
@ -53,9 +52,9 @@ class Clip:
def stretch(self, speed, keep_pitch=True): def stretch(self, speed, keep_pitch=True):
if keep_pitch: if keep_pitch:
stretched = tsm.wsola(self._stereo_transpose(self.raw), speed) stretched = tsm.wsola(self._stereo_transpose(self.raw), 1 / speed)
else: else:
stretched = librosa.resample(self._stereo_transpose(self.raw), self.samplerate * (1 / speed), self.samplerate, fix=False, scale=True) stretched = librosa.resample(self._stereo_transpose(self.raw), self.samplerate * speed, self.samplerate, fix=False, scale=True)
self.raw = np.ascontiguousarray(self._stereo_transpose(stretched), dtype='float32') self.raw = np.ascontiguousarray(self._stereo_transpose(stretched), dtype='float32')
def save(self, filename): def save(self, filename):
@ -67,7 +66,8 @@ class Stream:
self.clip = clip self.clip = clip
self.pos = 0 self.pos = 0
self.playing = False self.playing = False
self._end_event = AioEvent() self.loop = asyncio.get_event_loop()
self._end_event = asyncio.Event()
self._stream = pyaudio.open( self._stream = pyaudio.open(
output_device_index=output_index, output_device_index=output_index,
format=pya.paFloat32, format=pya.paFloat32,
@ -85,16 +85,11 @@ class Stream:
if not self._stream.is_active(): if not self._stream.is_active():
self._stream.start_stream() self._stream.start_stream()
def play(self): async def play(self):
self._end_event.clear()
self._play()
self._end_event.wait(timeout=self.clip.length)
async def aplay(self):
self._end_event.clear() self._end_event.clear()
self._play() self._play()
try: try:
await self._end_event.coro_wait(timeout=self.clip.length) await self._end_event.wait()
except asyncio.CancelledError: except asyncio.CancelledError:
self.playing = False self.playing = False
self._stream.stop_stream() self._stream.stop_stream()
@ -117,7 +112,7 @@ class Stream:
if self.pos >= self.clip.raw.shape[0]: if self.pos >= self.clip.raw.shape[0]:
self.playing = False self.playing = False
self._end_event.set() self.loop.call_soon_threadsafe(self._end_event.set)
return buffer, pya.paContinue return buffer, pya.paContinue

View file

@ -139,7 +139,7 @@ def parse_kdl_deep(path, relativeto=None):
if relativeto: if relativeto:
path = os.path.normpath(os.path.join(relativeto, path)) path = os.path.normpath(os.path.join(relativeto, path))
with open(path, 'r') as f: with open(path, 'r', encoding='utf-8') as f:
try: try:
config = kdl.parse(f.read(), kdl_parse_config) config = kdl.parse(f.read(), kdl_parse_config)
for node in config.nodes: for node in config.nodes:

View file

@ -1,10 +1,8 @@
import importlib import importlib
from multiprocessing import Lock
import asyncio import asyncio
from datetime import datetime, timedelta from datetime import datetime, timedelta
import logging import logging
import os import os
import os.path
import pathlib import pathlib
import sys import sys
import signal import signal
@ -61,27 +59,22 @@ class MainProcess:
self.max_concurrent = max_concurrent self.max_concurrent = max_concurrent
self.chat_processes = {} self.chat_processes = {}
self.chat_tasks = set()
self.plugins = {} self.plugins = {}
self.event_queue = asyncio.Queue() self.event_queue = asyncio.Queue()
# Init websocket server (event bus)
# HACK: Must be done here to avoid shadowing its asyncio loop
self.server_process = WebsocketServerProcess(*self.bus_conf)
self.server_process.start()
# Save sys.path since some config will clobber it # Save sys.path since some config will clobber it
self._initial_syspath = sys.path self._initial_syspath = sys.path
if os.name == 'nt':
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
def _unload_plugin(self, plugin_name): def _unload_plugin(self, plugin_name):
plugin = self.plugins[plugin_name] plugin = self.plugins[plugin_name]
plugin.close() plugin.close()
del self.plugins[plugin_name] del self.plugins[plugin_name]
del plugin del plugin
def _get_event_from_pipe(self, pipe):
event = pipe.recv()
self.event_queue.put_nowait(event)
def _setup_webserver(self): def _setup_webserver(self):
self.webserver = Quart(__name__, static_folder=None, template_folder=None) self.webserver = Quart(__name__, static_folder=None, template_folder=None)
listen = ':'.join(self.web_conf) listen = ':'.join(self.web_conf)
@ -137,10 +130,10 @@ class MainProcess:
if event is None: if event is None:
break break
else: else:
self.server_process.message_pipe.send(event) await self.bus_server.send(event)
logger.debug(f'Event after plugin chain - {event}') logger.debug(f'Event after plugin chain - {event}')
elif isinstance(event, Delete): elif isinstance(event, Delete):
self.server_process.message_pipe.send(event) await self.bus_server.send(event)
else: else:
logger.error(f'Unknown data in event loop - {event}') logger.error(f'Unknown data in event loop - {event}')
@ -157,7 +150,6 @@ class MainProcess:
async def user_setup(self): async def user_setup(self):
config = kdl.Document(list(parse_kdl_deep(self.config_path))) config = kdl.Document(list(parse_kdl_deep(self.config_path)))
stdin_lock = Lock()
# Load secrets # Load secrets
secrets = {} secrets = {}
if node := config.get('secrets'): if node := config.get('secrets'):
@ -181,8 +173,11 @@ class MainProcess:
chat_module = import_or_reload_mod(module_name, chat_module = import_or_reload_mod(module_name,
default_package='ovtk_audiencekit.chats', default_package='ovtk_audiencekit.chats',
external=False) external=False)
chat_process = chat_module.Process(stdin_lock, chat_name, **node.props, **secrets_for_mod) chat_process = chat_module.Process(chat_name, self.event_queue)
self.chat_processes[chat_name] = chat_process self.chat_processes[chat_name] = chat_process
res = chat_process.setup(**node.props, **secrets_for_mod)
if asyncio.iscoroutine(res):
await res
except Exception as e: except Exception as e:
raise ValueError(f'Failed to initalize {module_name} module "{chat_name}" - {e}') raise ValueError(f'Failed to initalize {module_name} module "{chat_name}" - {e}')
@ -190,13 +185,9 @@ class MainProcess:
logger.warning('No chats configured!') logger.warning('No chats configured!')
# Start chat processes # Start chat processes
loop = asyncio.get_event_loop()
for process in self.chat_processes.values(): for process in self.chat_processes.values():
process.start() self.chat_tasks.add(loop.create_task(process.run()))
# Bridge pipe to asyncio event loop
pipe = process.event_pipe
# REVIEW: This does not work on windows!!!! add_reader is not implemented
# in a way that supports pipes on either windows loop runners
asyncio.get_event_loop().add_reader(pipe.fileno(), lambda pipe=pipe: self._get_event_from_pipe(pipe))
# Load plugins # Load plugins
global_ctx = {} global_ctx = {}
@ -228,6 +219,8 @@ class MainProcess:
except Exception as e: except Exception as e:
raise ValueError(f'Failed to initalize {module_name} plugin "{plugin_name}" - {e}') raise ValueError(f'Failed to initalize {module_name} plugin "{plugin_name}" - {e}')
self.bus_server.update_eventclasses()
# Run plugin definitions # Run plugin definitions
with progressbar(list(config.nodes), label=f"Executing {self.config_path}") as bar: with progressbar(list(config.nodes), label=f"Executing {self.config_path}") as bar:
for node in bar: for node in bar:
@ -241,13 +234,10 @@ class MainProcess:
await plugin_module._kdl_call(node, global_ctx) await plugin_module._kdl_call(node, global_ctx)
async def user_shutdown(self): async def user_shutdown(self):
for chat_task in self.chat_tasks:
chat_task.cancel()
self.chat_tasks.clear()
for process_name, process in list(reversed(self.chat_processes.items())): for process_name, process in list(reversed(self.chat_processes.items())):
pipe = process.event_pipe
process.control_pipe.send(ShutdownRequest('root'))
process.join(5)
if process.exitcode is None:
process.terminate()
asyncio.get_event_loop().remove_reader(pipe.fileno())
del self.chat_processes[process_name] del self.chat_processes[process_name]
for plugin_name in list(reversed(self.plugins.keys())): for plugin_name in list(reversed(self.plugins.keys())):
# NOTE: The plugin will likely stick around in memory for a bit after this, # NOTE: The plugin will likely stick around in memory for a bit after this,
@ -255,6 +245,18 @@ class MainProcess:
self._unload_plugin(plugin_name) self._unload_plugin(plugin_name)
sys.path = self._initial_syspath sys.path = self._initial_syspath
async def _discount_repl(self):
loop = asyncio.get_event_loop()
# REVIEW: Not a good UX at the moment (as new logs clobber the terminal entry)
while True:
line = await loop.run_in_executor(None, sys.stdin.readline)
line = line.strip()
logger.debug(f'Got terminal input: {line}')
if line == 'reload':
self.reload_ev.set()
elif line == 'quit':
self.shutdown_ev.set()
async def run(self): async def run(self):
self.shutdown_ev = asyncio.Event() self.shutdown_ev = asyncio.Event()
self.reload_ev = asyncio.Event() self.reload_ev = asyncio.Event()
@ -263,26 +265,11 @@ class MainProcess:
try: try:
# System setup # System setup
## Bridge websocket server pipe to asyncio loop ## Make stdin handler
## REVIEW: This does not work on windows!!!! add_reader is not implemented self.cli_task = loop.create_task(self._discount_repl())
## in a way that supports pipes on either windows loop runners ## Init websocket server (external end of the event bus)
ws_pipe = self.server_process.message_pipe self.bus_server = WebsocketServerProcess(self.event_queue, *self.bus_conf)
loop.add_reader(ws_pipe.fileno(), lambda: self._get_event_from_pipe(ws_pipe)) self.bus_task = loop.create_task(self.bus_server.run())
## Register stdin handler
reader = asyncio.StreamReader()
await loop.connect_read_pipe(lambda: asyncio.StreamReaderProtocol(reader), sys.stdin)
async def discount_repl():
# REVIEW: Not a good UX at the moment (as new logs clobber the terminal entry)
async for line in reader:
line = line.strip()
if line == b'reload':
self.reload_ev.set()
elif line == b'quit':
self.shutdown_ev.set()
self.cli_task = loop.create_task(discount_repl())
## Scheduler for timed tasks
self._skehdule = TimedScheduler(max_tasks=1)
self._skehdule.start()
## UI server ## UI server
serve_coro = self._setup_webserver() serve_coro = self._setup_webserver()
self.webserver_task = loop.create_task(serve_coro) self.webserver_task = loop.create_task(serve_coro)
@ -299,7 +286,7 @@ class MainProcess:
logger.info(f'Ready to rumble! Press Ctrl+C to shut down') logger.info(f'Ready to rumble! Press Ctrl+C to shut down')
reload_task = loop.create_task(self.reload_ev.wait()) reload_task = loop.create_task(self.reload_ev.wait())
done, pending = await asyncio.wait([*user_tasks, self.webserver_task, reload_task], return_when=asyncio.FIRST_COMPLETED) done, pending = await asyncio.wait([*user_tasks, self.webserver_task, self.bus_task, reload_task], return_when=asyncio.FIRST_COMPLETED)
if reload_task in done: if reload_task in done:
logger.warn('Reloading (some events may be missed!)') logger.warn('Reloading (some events may be missed!)')
@ -324,6 +311,7 @@ class MainProcess:
serve_coro = self._setup_webserver() serve_coro = self._setup_webserver()
self.webserver_task = loop.create_task(serve_coro) self.webserver_task = loop.create_task(serve_coro)
else: else:
logger.debug(f'Task {done} completed - assuming something went wrong!')
break break
except KeyboardInterrupt: except KeyboardInterrupt:
pass pass
@ -341,4 +329,4 @@ class MainProcess:
task.cancel() task.cancel()
await self.user_shutdown() await self.user_shutdown()
self.webserver_task.cancel() self.webserver_task.cancel()
self.server_process.terminate() self.bus_task.cancel()

View file

@ -29,6 +29,17 @@ class OvtkBlueprint(quart.Blueprint):
endpoint = self.name + endpoint endpoint = self.name + endpoint
return quart.url_for(endpoint, *args, **kwargs) return quart.url_for(endpoint, *args, **kwargs)
def render(self, name, **kwargs):
"""render_template that prefers the plugin-specific templates"""
full = self.template_folder / name
if os.path.exists(full):
template_string = None
with open(full, 'r') as template_file:
template_string = template_file.read()
return quart.render_template_string(template_string, **kwargs)
else:
return quart.render_template(name, **kwargs)
class PluginBase(ABC): class PluginBase(ABC):
plugins = {} plugins = {}

View file

@ -1,34 +1,31 @@
import asyncio import asyncio
import json import json
from multiprocessing import Process, Pipe
import logging import logging
import websockets import websockets
from ovtk_audiencekit.events import Event from ovtk_audiencekit.events import Event
from ovtk_audiencekit.utils import get_subclasses from ovtk_audiencekit.utils import get_subclasses, format_exception
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class WebsocketServerProcess(Process): class WebsocketServerProcess:
def __init__(self, bind, port): def __init__(self, event_queue, bind, port):
super().__init__()
self._bind = bind self._bind = bind
self._port = port self._port = port
self._pipe, self._caller_pipe = Pipe() self.event_queue = event_queue
self._send_queue = asyncio.Queue()
self.clients = set() self.clients = set()
self.update_eventclasses()
def update_eventclasses(self):
self._event_classes = get_subclasses(Event) self._event_classes = get_subclasses(Event)
@property
def message_pipe(self):
return self._caller_pipe
# Data input (external application socket -> plugin/chat pipe) # Data input (external application socket -> plugin/chat pipe)
async def handle_websocket(self, ws, path): async def _handle_client(self, ws, path):
self.clients.add(ws) self.clients.add(ws)
try: try:
async for message in ws: async for message in ws:
@ -39,7 +36,7 @@ class WebsocketServerProcess(Process):
type = type[0] type = type[0]
event_class = next(cls for cls in self._event_classes if cls.__name__ == type) event_class = next(cls for cls in self._event_classes if cls.__name__ == type)
event = event_class.hydrate(**data.get('data', {})) event = event_class.hydrate(**data.get('data', {}))
self._pipe.send(event) self.event_queue.put_nowait(event)
else: else:
logger.warn('Unknown data recieved on websocket', message) logger.warn('Unknown data recieved on websocket', message)
except json.decoder.JSONDecodeError as e: except json.decoder.JSONDecodeError as e:
@ -51,47 +48,44 @@ class WebsocketServerProcess(Process):
except websockets.exceptions.ConnectionClosedError: except websockets.exceptions.ConnectionClosedError:
pass pass
except asyncio.CancelledError: except asyncio.CancelledError:
ws.close() await ws.close()
finally: finally:
self.clients.discard(ws) self.clients.discard(ws)
# Data output (plugin/chat pipe -> external application socket) # Data output (plugin/chat pipe -> external application socket)
async def handle_pipe(self, pipe_ready): async def send(self, event):
await self._send_queue.put(event)
async def _send_loop(self):
while True: while True:
# Let other co-routines process until file descriptor is readable event = await self._send_queue.get()
await pipe_ready.wait()
pipe_ready.clear()
# Check if messages exist on the pipe before attempting to recv
# to avoid accidentally blocking the event loop when file
# descriptor does stuff we don't expect
if not self._pipe.poll():
continue
event = self._pipe.recv()
# Serialize and send to registered clients # Serialize and send to registered clients
serialized = event.serialize() serialized = event.serialize()
if self.clients: if self.clients:
await asyncio.wait([self.safe_send(client, serialized) for client in self.clients]) await asyncio.wait([self._safe_send(client, serialized) for client in self.clients])
async def safe_send(self, client, serialized): async def _safe_send(self, client, serialized):
try: try:
await client.send(serialized) await client.send(serialized)
except (websockets.exceptions.ConnectionClosedError, websockets.exceptions.ConnectionClosedOK): except (websockets.exceptions.ConnectionClosedError, websockets.exceptions.ConnectionClosedOK):
self.clients.discard(client) self.clients.discard(client)
def run(self): async def run(self):
# Setup asyncio websocket server loop = asyncio.get_event_loop()
start_server = websockets.serve(self.handle_websocket, self._bind, self._port) tasks = set()
asyncio.get_event_loop().run_until_complete(start_server)
# Make an awaitable object that flips when the pipe's underlying file descriptor is readable # Setup websocket server (input)
pipe_ready = asyncio.Event() self.ws_server = await websockets.serve(self._handle_client, self._bind, self._port)
# REVIEW: This does not work on windows!!!! tasks.add(loop.create_task(self.ws_server.serve_forever()))
asyncio.get_event_loop().add_reader(self._pipe.fileno(), pipe_ready.set) # Setup sending loop (output)
# Make and start our infinite pipe listener task tasks.add(loop.create_task(self._send_loop()))
asyncio.get_event_loop().create_task(self.handle_pipe(pipe_ready))
# Keep the asyncio code running in this thread until explicitly stopped
try: try:
asyncio.get_event_loop().run_forever() await asyncio.gather(*tasks)
except KeyboardInterrupt: except Exception as e:
return 0 logger.critical(f'Failure in bus process - {e}')
logger.info(format_exception(e))
raise e
finally:
for task in tasks:
task.cancel()

View file

@ -54,9 +54,9 @@ class AudioAlert(PluginBase):
if wait: if wait:
await stream.aplay() await stream.play()
else: else:
task = asyncio.create_task(stream.aplay()) task = asyncio.create_task(stream.play())
task.add_done_callback(self.tasks.discard) task.add_done_callback(self.tasks.discard)
self.tasks.add(task) self.tasks.add(task)

View file

@ -4,7 +4,7 @@ import pickle
import os import os
import maya import maya
from requests.exceptions import HTTPError from httpx import HTTPError
from owoify.owoify import owoify, Owoness from owoify.owoify import owoify, Owoness
from ovtk_audiencekit.core import PluginBase from ovtk_audiencekit.core import PluginBase

View file

@ -1,6 +1,6 @@
from argparse import ArgumentError from argparse import ArgumentError
from requests.exceptions import HTTPError from httpx import HTTPError
from ovtk_audiencekit.core import PluginBase from ovtk_audiencekit.core import PluginBase
from ovtk_audiencekit.plugins.builtins.Command import Command, CommandTypes from ovtk_audiencekit.plugins.builtins.Command import Command, CommandTypes

View file

@ -52,8 +52,8 @@ class TextToSpeechPlugin(PluginBase):
config_path = override_conf_path config_path = override_conf_path
self.synthesizer = Synthesizer( self.synthesizer = Synthesizer(
model_path, tts_checkpoint=model_path,
config_path, tts_config_path=config_path,
vocoder_checkpoint=vocoder_path, vocoder_checkpoint=vocoder_path,
vocoder_config=vocoder_config_path, vocoder_config=vocoder_config_path,
use_cuda=self.cuda, use_cuda=self.cuda,
@ -66,22 +66,26 @@ class TextToSpeechPlugin(PluginBase):
task.cancel() task.cancel()
def make_tts_wav(self, text, filename=None): def make_tts_wav(self, text, filename=None):
# Force punctuation (keeps the models from acting unpredictably)
text = text.strip()
if not any([text.endswith(punc) for punc in '.!?:']):
text += '.'
if filename is None: if filename is None:
filename = os.path.join(self.cache_dir, f'{uuid.uuid1()}.wav') filename = os.path.join(self.cache_dir, f'{uuid.uuid1()}.wav')
self.logger.info(f'Generating TTS "{text}"...')
if self.speaker_wav: if self.speaker_wav:
wav = self.synthesizer.tts(text, None, 'en', self.speaker_wav) wav = self.synthesizer.tts(text, None, 'en', self.speaker_wav)
else: else:
wav = self.synthesizer.tts(text) wav = self.synthesizer.tts(text)
self.synthesizer.save_wav(wav, filename) self.synthesizer.save_wav(wav, filename)
self.logger.info(f'Done - saved as {filename}')
return filename return filename
async def run(self, text, *args, _ctx={}, wait=False, **kwargs): async def run(self, text, *args, _ctx={}, wait=False, **kwargs):
try: try:
# Force punctuation (keep AI from spinning off into random noises)
if not any([text.endswith(punc) for punc in '.!?:']):
text += '.'
# Do TTS processing in a thread to avoid blocking main loop # Do TTS processing in a thread to avoid blocking main loop
filename = await asyncio.get_running_loop().run_in_executor(None, self.make_tts_wav, text) filename = await asyncio.get_running_loop().run_in_executor(None, self.make_tts_wav, text)
@ -90,7 +94,7 @@ class TextToSpeechPlugin(PluginBase):
stream = Stream(clip, self.output_index) stream = Stream(clip, self.output_index)
async def play(): async def play():
try: try:
await stream.aplay() await stream.play()
finally: finally:
stream.close() stream.close()
os.remove(os.path.join(self.cache_dir, filename)) os.remove(os.path.join(self.cache_dir, filename))

View file

@ -34,7 +34,7 @@ class ScenePlugin(PluginBase):
self.blueprint.add_url_rule('/', 'ctrlpanel', self.ui_ctrlpanel) self.blueprint.add_url_rule('/', 'ctrlpanel', self.ui_ctrlpanel)
self.blueprint.add_url_rule('/<name>/<cmd>', 'api-sceneset', self.ui_setscene) self.blueprint.add_url_rule('/<name>/<cmd>', 'api-sceneset', self.ui_setscene)
self.blueprint.add_url_rule('/monitor', 'monitor', self.ui_monitor_ws, is_websocket=True) self.blueprint.add_url_rule('/monitor', 'monitor', self.ui_monitor_ws, websocket=True)
async def run(self, name, _children=None, _ctx={}, active=None, group=None, oneshot=False, **kwargs): async def run(self, name, _children=None, _ctx={}, active=None, group=None, oneshot=False, **kwargs):
if _children is None: if _children is None:
@ -138,7 +138,7 @@ class ScenePlugin(PluginBase):
async def ui_ctrlpanel(self): async def ui_ctrlpanel(self):
groups = self._get_state() groups = self._get_state()
return await quart.render_template('index.html', init_state=json.dumps(groups)) return await self.blueprint.render('index.html', init_state=json.dumps(groups))
async def ui_setscene(self, name=None, cmd=None): async def ui_setscene(self, name=None, cmd=None):
active = cmd == 'activate' active = cmd == 'activate'

View file

@ -2,7 +2,7 @@
<html lang="en" dir="ltr"> <html lang="en" dir="ltr">
<head> <head>
<meta charset="utf-8"> <meta charset="utf-8">
<title>Test page</title> <title>Scene control</title>
<script type="importmap"> <script type="importmap">
{ {
"imports": { "vue": "https://unpkg.com/vue@3/dist/vue.esm-browser.js" } "imports": { "vue": "https://unpkg.com/vue@3/dist/vue.esm-browser.js" }

View file

@ -1,60 +0,0 @@
from multiprocessing import Process, Pipe
import logging
import asyncio
import websockets
logger = logging.getLogger(__name__)
class NonBlockingWebsocket(Process):
def __init__(self, ws_uri):
super().__init__()
self.uri = ws_uri
self._pipe, self._caller_pipe = Pipe()
self.daemon = True
self._send_queue = asyncio.Queue()
async def _setup(self):
self._ws = await websockets.connect(self.uri)
def send(self, data):
self._caller_pipe.send(data)
async def _send(self, pipe_ready):
while True:
await pipe_ready.wait()
pipe_ready.clear()
if not self._pipe.poll():
continue
data = self._pipe.recv()
await self._ws.send(data)
def poll(self, *args):
return self._caller_pipe.poll(*args)
def recv(self, *args):
return self._caller_pipe.recv(*args)
async def _read(self):
while True:
data = await self._ws.recv()
self._pipe.send(data)
def run(self):
loop = asyncio.new_event_loop()
# Setup ws client
loop.run_until_complete(self._setup())
# Make an awaitable object that flips when the pipe's underlying file descriptor is readable
pipe_ready = asyncio.Event()
loop.add_reader(self._pipe.fileno(), pipe_ready.set)
# Make and start our infinite tasks
loop.create_task(self._send(pipe_ready))
loop.create_task(self._read())
# Keep the asyncio code running in this thread until explicitly stopped
try:
loop.run_forever()
except KeyboardInterrupt:
return 0

View file

@ -1,4 +1,3 @@
from .NonBlockingWebsocket import NonBlockingWebsocket
from .make_sync import make_sync from .make_sync import make_sync
from .get_subclasses import get_subclasses from .get_subclasses import get_subclasses
from .format_exception import format_exception from .format_exception import format_exception