Compare commits

...

6 commits

Author SHA1 Message Date
1a1dfc7d2a Fix module defined event ingest 2025-03-07 02:09:34 -05:00
65d527bdfa Fix some plugin requests -> httpx imports 2025-03-07 01:39:07 -05:00
e67b31daf8 Fix missing await 2025-03-07 01:38:19 -05:00
7ebf0b48a4 Move websocket bus to asyncio operation 2025-02-17 23:25:50 -05:00
7685170714 Move chats to asyncio operation 2025-02-17 22:18:57 -05:00
1bc693a4eb Remove peertube chat module
This feature was rejected upstream aaaages ago, and i dont maintain the 
fork with it in it anymore
2025-02-17 18:36:35 -05:00
20 changed files with 320 additions and 404 deletions

58
pdm.lock generated
View file

@ -5,7 +5,7 @@
groups = ["default", "jail", "midi", "obs", "osc", "phrasecounter", "tts", "twitch", "yt-dlp"]
strategy = []
lock_version = "4.5.0"
content_hash = "sha256:081c38562637b061c3ff2507217f03a37cbddd6ecfea258c3347c2ad31423589"
content_hash = "sha256:625125acedb4bcd7e7a9e99ef777be25ed5655c4271e212b5f1134444a27178e"
[[metadata.targets]]
requires_python = ">=3.10,<3.11"
@ -69,6 +69,22 @@ files = [
{file = "anyascii-0.3.2.tar.gz", hash = "sha256:9d5d32ef844fe225b8bc7cba7f950534fae4da27a9bf3a6bea2cb0ea46ce4730"},
]
[[package]]
name = "anyio"
version = "4.8.0"
requires_python = ">=3.9"
summary = "High level compatibility layer for multiple asynchronous event loop implementations"
dependencies = [
"exceptiongroup>=1.0.2; python_version < \"3.11\"",
"idna>=2.8",
"sniffio>=1.1",
"typing-extensions>=4.5; python_version < \"3.13\"",
]
files = [
{file = "anyio-4.8.0-py3-none-any.whl", hash = "sha256:b5011f270ab5eb0abf13385f851315585cc37ef330dd88e27ec3d34d651fd47a"},
{file = "anyio-4.8.0.tar.gz", hash = "sha256:1d9fe889df5212298c0c0723fa20479d1b94883a2df44bd3897aa91083316f7a"},
]
[[package]]
name = "appdirs"
version = "1.4.4"
@ -491,6 +507,36 @@ files = [
{file = "hpack-4.1.0.tar.gz", hash = "sha256:ec5eca154f7056aa06f196a557655c5b009b382873ac8d1e66e79e87535f1dca"},
]
[[package]]
name = "httpcore"
version = "1.0.7"
requires_python = ">=3.8"
summary = "A minimal low-level HTTP client."
dependencies = [
"certifi",
"h11<0.15,>=0.13",
]
files = [
{file = "httpcore-1.0.7-py3-none-any.whl", hash = "sha256:a3fff8f43dc260d5bd363d9f9cf1830fa3a458b332856f34282de498ed420edd"},
{file = "httpcore-1.0.7.tar.gz", hash = "sha256:8551cb62a169ec7162ac7be8d4817d561f60e08eaa485234898414bb5a8a0b4c"},
]
[[package]]
name = "httpx"
version = "0.28.1"
requires_python = ">=3.8"
summary = "The next generation HTTP client."
dependencies = [
"anyio",
"certifi",
"httpcore==1.*",
"idna",
]
files = [
{file = "httpx-0.28.1-py3-none-any.whl", hash = "sha256:d909fcccc110f8c7faf814ca82a9a4d816bc5a6dbfea25d6591d6985b8ba59ad"},
{file = "httpx-0.28.1.tar.gz", hash = "sha256:75e98c5f16b0f35b567856f597f06ff2270a374470a5c2392242528e3e3e42fc"},
]
[[package]]
name = "humanize"
version = "4.12.0"
@ -1471,6 +1517,16 @@ files = [
{file = "snaptime-0.2.4.tar.gz", hash = "sha256:e3f1eb89043d58d30721ab98cb65023f1a4c2740e3b197704298b163c92d508b"},
]
[[package]]
name = "sniffio"
version = "1.3.1"
requires_python = ">=3.7"
summary = "Sniff out which async library your code is running under"
files = [
{file = "sniffio-1.3.1-py3-none-any.whl", hash = "sha256:2f6da418d1f1e0fddd844478f41680e794e6051915791a034ff65e5f100525a2"},
{file = "sniffio-1.3.1.tar.gz", hash = "sha256:f4324edc670a0f49750a81b895f35c3adb843cca46f0530f79fc1babb23789dc"},
]
[[package]]
name = "soundfile"
version = "0.13.1"

View file

@ -11,7 +11,6 @@ dependencies = [
"quart==0.18.*",
"werkzeug==2.3.7",
"hypercorn",
"requests",
"websockets==11.0.3",
"aioprocessing",
"aioscheduler",
@ -23,6 +22,7 @@ dependencies = [
"blessed",
"appdirs",
"maya",
"httpx>=0.28.1",
]
requires-python = ">=3.10,<3.11"
readme = "README.md"

View file

@ -1,5 +1,5 @@
from abc import ABC, abstractmethod
from multiprocessing import Process, Pipe, Manager
import asyncio
import sys
import os
import json
@ -16,22 +16,30 @@ class GracefulShutdownException(Exception):
class ShutdownRequest(Event):
_hidden = True
class ManagerMock(dict):
__getattr__ = dict.get
__setattr__ = dict.__setitem__
__delattr__ = dict.__delitem__
class ChatProcess(Process, ABC):
def __init__(self, stdin_lock, name, readonly=False):
class ChatProcess(ABC):
def __init__(self, name, event_queue, readonly=False):
super().__init__()
self._stdin_lock = stdin_lock
self._name = name
self._event_queue = event_queue
self._control_queue = asyncio.Queue()
self.readonly = readonly
self.event_pipe, self._event_pipe = Pipe(duplex=False)
self._control_pipe, self.control_pipe = Pipe(duplex=False)
self.logger = logging.getLogger(f'chat.{self._name}')
self._state = None
self._next_state = None
self.shared = Manager().Namespace()
self.shared = ManagerMock({})
@abstractmethod
def setup(self, *args, **kwargs):
pass
@abstractmethod
def loop(self, next_state):
@ -41,7 +49,7 @@ class ChatProcess(Process, ABC):
pass
def send(self, event):
self.control_pipe.send(event)
self._control_queue.put_nowait(event)
@property
def state(self):
@ -82,24 +90,20 @@ class ChatProcess(Process, ABC):
print(f'WARNING: Unprocessed control message in {self._name} - {event}')
def safe_input(self, prompt):
self._stdin_lock.aquire()
with os.fdopen(0) as real_stdin:
old_stdin = sys.stdin
sys.stdin = real_stdin
response = input(prompt)
sys.stdin = old_stdin
self._stdin_lock.release()
return response
def publish(self, event):
self._event_pipe.send(event)
self._event_queue.put_nowait(event)
def run(self):
async def run(self):
while True:
try:
timeout = 0
# Run the code loop
response = self.loop(self._next_state)
if asyncio.iscoroutine(response):
response = await response
# Check response type and use the appropriate interface
# # integer / float: timeout value to use for the next loop
# # two states (tuple): current state and next state
@ -113,14 +117,19 @@ class ChatProcess(Process, ABC):
self.state = response
# Wait for inconming events for at most timeout seconds - handle if any come in
if self._control_pipe.poll(timeout):
incoming_event = self._control_pipe.recv()
try:
incoming_event = await asyncio.wait_for(self._control_queue.get(), timeout)
except asyncio.exceptions.TimeoutError:
continue
# Handle messages implimented by this base class
if isinstance(incoming_event, ShutdownRequest):
raise GracefulShutdownException()
# Pass the rest on
elif not self.readonly:
response = self.on_control_event(incoming_event, self._next_state)
if asyncio.iscoroutine(response):
response = await response
# Similar to above, handle response states
if response is None:
continue
@ -128,7 +137,6 @@ class ChatProcess(Process, ABC):
self.state, self._next_state = response
else:
self.state = response
except (GracefulShutdownException, KeyboardInterrupt):
return 0
except Exception as e:

View file

@ -1,4 +1,5 @@
import random
import asyncio
from enum import Enum, auto
from ovtk_audiencekit.chats import ChatProcess
@ -16,8 +17,7 @@ class StartStop(Event):
class FakeChat(ChatProcess):
def __init__(self, *args, max_delay=10, max_messages_per_chunk=1, start_paused=True, max_monitization=None, **kwargs):
super().__init__(*args, **kwargs)
def setup(self, max_delay=10, max_messages_per_chunk=1, start_paused=True, max_monitization=None):
self._max_delay = max_delay
self._max_messages_per_chunk = max_messages_per_chunk
self._max_monitization = max_monitization

View file

@ -3,9 +3,11 @@ import random
import logging
from enum import Enum, auto
from itertools import chain
import asyncio
import websockets
from ovtk_audiencekit.chats import ChatProcess
from ovtk_audiencekit.utils import NonBlockingWebsocket
from ovtk_audiencekit.events.Message import Message, USER_TYPE
logger = logging.getLogger(__name__)
@ -17,8 +19,7 @@ class STATES(Enum):
class MisskeyProcess(ChatProcess):
def __init__(self, *args, instance=None, channel=None, token=None, **kwargs):
super().__init__(*args, **kwargs)
def setup(self, instance=None, channel=None, token=None):
self._url = f'wss://{instance}/streaming'
if token:
self._url += f'?token={token}'
@ -42,9 +43,8 @@ class MisskeyProcess(ChatProcess):
return msg
return None
def on_connecting(self, next_state):
self._ws = NonBlockingWebsocket(self._url)
self._ws.start()
async def on_connecting(self, next_state):
self._ws = await websockets.connect(self._url)
payload = {
'type': 'connect',
'body': {
@ -55,12 +55,15 @@ class MisskeyProcess(ChatProcess):
}
}
}
self._ws.send(json.dumps(payload))
logger.debug("Subscribed to channel bus")
await self._ws.send(json.dumps(payload))
return STATES.READING
def on_reading(self, next_state, timeout=0.1):
if self._ws.poll(timeout):
note_event = self._ws.recv()
async def on_reading(self, next_state, timeout=0.1):
try:
note_event = await asyncio.wait_for(self._ws.recv(), timeout)
except asyncio.exceptions.TimeoutError:
return 0
try:
misskey_event = json.loads(note_event)
if misskey_event['body']['id'] == self._subid and misskey_event['body']['type'] == 'note':

View file

@ -1 +0,0 @@
from .peertube import PtChatProcess as Process

View file

@ -1,70 +0,0 @@
import json
import random
import logging
from enum import Enum, auto
from itertools import chain
import socketio
import requests
from ovtk_audiencekit.chats import ChatProcess
from ovtk_audiencekit.events.Message import Message, USER_TYPE
logger = logging.getLogger(__name__)
class STATES(Enum):
CONNECTING = auto()
READING = auto()
class PtChatProcess(ChatProcess):
def __init__(self, *args, instance_url=None, channel=None, token=None, **kwargs):
super().__init__(*args, **kwargs)
self.instance_url = instance_url
self.channel = channel
self._state_machine = self.bind_to_states(STATES)
self.state = STATES.CONNECTING
def normalize_event(self, event):
message = event['message']
user_name = message['account']['displayName'] or message['account']['name']
user_id = message['account']['id']
text = message.get('text', '')
if text:
msg = Message(self._name, text,
user_name, user_id, USER_TYPE.USER,
id=message['id'])
return msg
return None
def on_connecting(self, next_state):
self._session = requests.Session()
self._sio = socketio.Client()
api_channel = self._session.get(f'{self.instance_url}/api/v1/video-channels/{self.channel}')
api_channel.raise_for_status()
room_id = api_channel.json().get('roomId')
if room_id is None:
raise ValueError('No chatroom returned from channel api!')
self._sio.connect(self.instance_url, namespaces=['/live-chat'], wait=True)
self._sio.emit('subscribe', data={'roomId': room_id}, namespace='/live-chat')
self._sio.on('new-message', self.handle_message, namespace='/live-chat')
return STATES.READING
def handle_message(self, data):
try:
norm = self.normalize_event(data)
self.publish(norm)
except Exception:
logger.error(f'Failed to process message: {data}')
def on_reading(self, next_state, timeout=0.5):
self._sio.sleep(timeout)
return 0
def loop(self, next_state):
return self._state_machine(self.state, next_state)

View file

@ -1,4 +1,4 @@
import time
import asyncio
from enum import Enum, auto
from itertools import chain
@ -20,7 +20,7 @@ valid_emote_resolutions = [1.0, 2.0, 3.0, 4.0]
class TwitchProcess(ChatProcess):
def __init__(self, *args,
async def setup(self,
# Shared
channel_name=None, client_id=None, access_token=None,
# IRC options
@ -28,11 +28,7 @@ class TwitchProcess(ChatProcess):
# EventSub options
eventsub=True, eventsub_host='wss://ovtk.skeh.site/twitch',
# BTTV integration
bttv=False,
# Inheritance boilerplate
**kwargs):
super().__init__(*args, **kwargs)
bttv=False):
if channel_name is None or client_id is None or access_token is None:
raise ValueError('Twitch chat is missing config requirements')
if emote_res not in valid_emote_resolutions:
@ -50,8 +46,8 @@ class TwitchProcess(ChatProcess):
self.api = TwitchAPI(self._client_id, self._token)
self.shared.api = self.api
cheermotes = self.api.get_cheermotes(self._channel_name)
target_data = self.api.get_user_details(self._channel_name)
cheermotes = await self.api.get_cheermotes(self._channel_name)
target_data = await self.api.get_user_details(self._channel_name)
self.shared.target_data = target_data
self.shared.users = []
@ -64,12 +60,12 @@ class TwitchProcess(ChatProcess):
self._sources.append(self.eventsub)
self.bttv = BTTV(target_data['user']['id']) if bttv else None
await self.bttv.setup()
def loop(self, next_state):
return self._state_machine(self.state, next_state)
def on_control_event(self, event, next_state):
async def on_control_event(self, event, next_state):
if isinstance(event, Message):
# Twitch.... why... why no newlines....
for line in event.text.split('\n'):
@ -77,7 +73,7 @@ class TwitchProcess(ChatProcess):
continue
if event.replies_to:
line = f"@{event.replies_to.user_name} {line}"
self.irc.send(line)
await self.irc.send(line)
def on_state_enter(self, new_state):
status_messages = {
@ -89,15 +85,16 @@ class TwitchProcess(ChatProcess):
sys_msg = SysMessage(self._name, message)
self.publish(sys_msg)
def on_connecting(self, next_state):
self.irc.connect()
async def on_connecting(self, next_state):
await self.irc.connect()
if self.__dict__.get('eventsub'):
self.eventsub.subscribe(self._channel_name)
await self.eventsub.subscribe(self._channel_name)
return STATES.READING
def on_reading(self, next_state):
async def on_reading(self, next_state):
try:
for event in chain(*(source.read(0.1) for source in self._sources)):
events = [event for source in self._sources async for event in source.read(0.1)]
for event in events:
# Retarget event
event.via = self._name
if self.bttv and isinstance(event, Message):
@ -107,8 +104,8 @@ class TwitchProcess(ChatProcess):
except TimeoutError:
return STATES.TIMEOUT
def on_timeout(self, next_state):
time.sleep(3)
async def on_timeout(self, next_state):
await asyncio.sleep(3)
return STATES.CONNECTING
def on_failure(self, next_state):

View file

@ -1,7 +1,7 @@
from itertools import chain
import re
import requests
import httpx
API_URL = 'https://api.betterttv.net/3/cached'
CDN_URL = 'https://cdn.betterttv.net/emote'
@ -11,9 +11,10 @@ word_regex = re.compile('\b([A-z0-9]+)\b')
class BTTV:
def __init__(self, user_id):
self.user_id = user_id
self._session = requests.Session()
self._session = httpx.AsyncClient()
self.emotes = {emote['code']: emote for emote in chain(self._get_global(), self._get_channel())}
async def setup(self):
self.emotes = {emote['code']: emote for emote in chain(await self._get_global(), await self._get_channel())}
def hydrate(self, event):
text, emotes = self.parse_emotes(event.text)
@ -37,13 +38,13 @@ class BTTV:
return output, used_emotes
def _get_global(self):
response = self._session.get(f'{API_URL}/emotes/global')
async def _get_global(self):
response = await self._session.get(f'{API_URL}/emotes/global')
response.raise_for_status()
return response.json()
def _get_channel(self):
response = self._session.get(f'{API_URL}/users/twitch/{self.user_id}')
async def _get_channel(self):
response = await self._session.get(f'{API_URL}/users/twitch/{self.user_id}')
response.raise_for_status()
data = response.json()
return chain(data['channelEmotes'], data['sharedEmotes'])

View file

@ -1,4 +1,4 @@
import requests
import httpx
API_URL = 'https://api.twitch.tv/helix'
@ -6,33 +6,33 @@ class TwitchAPI:
def __init__(self, client_id, token):
self._client_id = client_id
self._token = token
self._session = requests.Session()
self._session = httpx.AsyncClient()
self._session.headers.update({'Authorization': f'Bearer {self._token}', 'Client-ID': client_id, 'Accept': 'application/vnd.twitchtv.v5+json'})
def get_cheermotes(self, channel):
response = self._session.get(f'{API_URL}/bits/cheermotes', params={'channel_id': channel})
async def get_cheermotes(self, channel):
response = await self._session.get(f'{API_URL}/bits/cheermotes', params={'channel_id': channel})
response.raise_for_status()
return response.json()['data']
def get_user_details(self, username):
async def get_user_details(self, username):
data = dict(link=f'https://twitch.tv/{username}')
try:
user_response = self._session.get(f'{API_URL}/users', params={'login': username})
user_response = await self._session.get(f'{API_URL}/users', params={'login': username})
user_response.raise_for_status()
data['user'] = user_response.json()['data'][0]
channel_response = self._session.get(f'{API_URL}/channels', params={'broadcaster_id': data['user']['id']})
channel_response = await self._session.get(f'{API_URL}/channels', params={'broadcaster_id': data['user']['id']})
channel_response.raise_for_status()
data['channel'] = channel_response.json()['data'][0]
except (KeyError, IndexError):
return None
return data
def get_clips(self, username):
user_response = self._session.get(f'{API_URL}/users', params={'login': username})
async def get_clips(self, username):
user_response = await self._session.get(f'{API_URL}/users', params={'login': username})
user_response.raise_for_status()
channel_id = user_response.json()['data'][0]['id']
clips_response = self._session.get(f'{API_URL}/clips', params={'broadcaster_id': broadcaster_id})
clips_response = await self._session.get(f'{API_URL}/clips', params={'broadcaster_id': broadcaster_id})
clips_response.raise_for_status()
return clips_response.json()['data']

View file

@ -1,7 +1,8 @@
import json
import logging
import asyncio
from ovtk_audiencekit.utils import NonBlockingWebsocket
import websockets
from ovtk_audiencekit.core.Data import ovtk_user_id
from ovtk_audiencekit.events import Follow
@ -15,14 +16,13 @@ class TwitchEventSub:
self.api = api
self.mirror_socket = f'{eventsub_host}/ws/{ovtk_user_id}'
def subscribe(self, username):
self._ws = NonBlockingWebsocket(self.mirror_socket)
self._ws.start()
async def subscribe(self, username):
self._ws = await websockets.connect(self.mirror_socket)
greet = self._ws.recv()
greet = await self._ws.recv()
greet = json.loads(greet)
if len(greet['subs']) == 0:
target_user = self.api.get_user_details(username)
target_user = await self.api.get_user_details(username)
target_user_id = target_user['user']['id']
supported_eventsub_types = [
@ -37,8 +37,8 @@ class TwitchEventSub:
'condition': condition,
'version': '1',
}
self._ws.send(json.dumps(payload))
status = self._ws.recv()
await self._ws.send(json.dumps(payload))
status = await self._ws.recv()
status = json.loads(status)
if status['type'] != 'subsuccess':
logger.warning(f'Failed to subscribe to {sub_type} EventSub topics - these events will not come through!!')
@ -63,12 +63,15 @@ class TwitchEventSub:
user_name, user_id)
return norm_event
def read(self, timeout):
if self._ws.poll(timeout):
messages = self._ws.recv()
async def read(self, timeout):
try:
messages = await asyncio.wait_for(self._ws.recv(), timeout)
except asyncio.exceptions.TimeoutError:
return
for message in messages.splitlines():
if message == 'PING':
self._ws.send('PONG')
await self._ws.send('PONG')
continue
try:
data = json.loads(message)

View file

@ -4,13 +4,13 @@ import re
from itertools import chain, islice
import logging
from collections import OrderedDict
import asyncio
import websockets.exceptions
import websockets
from miniirc import ircv3_message_parser
from ovtk_audiencekit.events.Message import Message, USER_TYPE
from ovtk_audiencekit.events import Subscription
from ovtk_audiencekit.utils import NonBlockingWebsocket
from ..Events import Raid
@ -43,24 +43,25 @@ class TwitchIRC:
self._reply_buffer = OrderedDict()
self._group_gifts = {}
def connect(self):
self._ws = NonBlockingWebsocket(WEBSOCKET_ADDRESS)
self._ws.start()
self._ws.send(f'PASS oauth:{self._token}')
self._ws.send(f'NICK {self._username}')
response = self._ws.recv()
async def connect(self):
self._ws = await websockets.connect(WEBSOCKET_ADDRESS)
await self._ws.send(f'PASS oauth:{self._token}')
await self._ws.send(f'NICK {self._username}')
response = await self._ws.recv()
if any('Welcome, GLHF!' in msg for msg in response.splitlines()):
self._ws.send(f'JOIN #{self._channel_name}')
self._ws.send('CAP REQ :twitch.tv/tags')
self._ws.send('CAP REQ :twitch.tv/commands')
self._ws.send('CAP REQ :twitch.tv/membership')
await self._ws.send(f'JOIN #{self._channel_name}')
await self._ws.send('CAP REQ :twitch.tv/tags')
await self._ws.send('CAP REQ :twitch.tv/commands')
await self._ws.send('CAP REQ :twitch.tv/membership')
else:
raise TwitchIRCException(f'Got bad response during auth: {response}')
def read(self, timeout):
async def read(self, timeout):
try:
messages = await asyncio.wait_for(self._ws.recv(), timeout)
except asyncio.exceptions.TimeoutError:
return
try:
if self._ws.poll(timeout):
messages = self._ws.recv()
for message in messages.splitlines():
normalized = None
cmd, hostmask, tags, args = ircv3_message_parser(message)
@ -70,7 +71,7 @@ class TwitchIRC:
elif cmd == 'USERNOTICE':
normalized = self.normalize_event(hostmask, tags, args)
elif cmd == 'PING':
self._ws.send(f"PONG {' '.join(args)}")
await self._ws.send(f"PONG {' '.join(args)}")
elif cmd == 'RECONNECT':
raise TimeoutError('Twitch API requested timeout')
elif cmd == 'JOIN':
@ -87,12 +88,12 @@ class TwitchIRC:
self._reply_buffer.popitem()
yield normalized
except websockets.exceptions.ConnectionClosedError:
self.logger.info('Twitch websocket disconnected - trying reconnet')
self.connect()
self.logger.warning('Twitch websocket disconnected - trying reconnet')
await self.connect()
def send(self, message):
async def send(self, message):
irc_msg = f'PRIVMSG #{self._username} :{message}'
self._ws.send(irc_msg)
await self._ws.send(irc_msg)
def parse_badges(self, tags):
if isinstance(tags.get('badges'), str):

View file

@ -1,9 +1,8 @@
import os
import json
import webbrowser
from enum import Enum, auto
import requests
import httpx
from ovtk_audiencekit.chats import ChatProcess
from ovtk_audiencekit.events.Message import Message, SysMessage, USER_TYPE
@ -24,7 +23,7 @@ class YoutubeLivePollProcess(ChatProcess):
GOOGLE_OAUTH_TOKEN_URL = 'https://accounts.google.com/o/oauth2/token'
YOUTUBE_API_URL = 'https://www.googleapis.com/youtube/v3'
def __init__(self, *args, client_secrets_path=None):
async def setup(self, client_secrets_path=None):
if client_secrets_path is None or not os.path.exists(client_secrets_path):
raise ValueError('Missing client secrets')
@ -33,7 +32,6 @@ class YoutubeLivePollProcess(ChatProcess):
if client_secrets is None:
raise ValueError('Malformed client secrets file - missing installed section')
super().__init__(*args)
self._client_secrets = client_secrets
self._state_machine = self.bind_to_states(STATES)
@ -44,14 +42,14 @@ class YoutubeLivePollProcess(ChatProcess):
self._live_chat_id = None
self._page_token = None
def loop(self, next_state):
async def loop(self, next_state):
if self.state is None:
if os.path.exists('refresh_token.secret'):
with open('refresh_token.secret', 'r') as f:
self._refresh_token = f.read()
return STATES.REFRESH, STATES.WAITING_FOR_BROADCAST
return STATES.UNAUTHORIZED, STATES.WAITING_FOR_BROADCAST
return self._state_machine(self.state, next_state)
return await self._state_machine(self.state, next_state)
def on_unauthorized(self, next_state):
self.request_oauth_consent()
@ -66,8 +64,8 @@ class YoutubeLivePollProcess(ChatProcess):
self.get_fresh_access_token()
return next_state
def on_waiting_for_broadcast(self, next_state):
response = requests.get(f'{self.__class__.YOUTUBE_API_URL}/liveBroadcasts',
async def on_waiting_for_broadcast(self, next_state):
response = await httpx.get(f'{self.__class__.YOUTUBE_API_URL}/liveBroadcasts',
params={'part': 'snippet', 'broadcastStatus': 'active'},
headers={'Authorization': f'Bearer {self._access_token}'})
@ -83,8 +81,8 @@ class YoutubeLivePollProcess(ChatProcess):
return 30
def on_polling(self, next_state):
response = requests.get(f'{self.__class__.YOUTUBE_API_URL}/liveChat/messages',
async def on_polling(self, next_state):
response = await httpx.get(f'{self.__class__.YOUTUBE_API_URL}/liveChat/messages',
params={'liveChatId': self._live_chat_id,
'part': 'snippet,authorDetails',
'hl': 'en_US',
@ -95,7 +93,7 @@ class YoutubeLivePollProcess(ChatProcess):
if (response.status_code == 401):
return STATES.REFRESH, self.state
if response.status_code == requests.codes.ok:
if response.status_code == httpx.codes.ok:
data = response.json()
self._page_token = data['nextPageToken']
if len(data['items']):
@ -156,8 +154,8 @@ class YoutubeLivePollProcess(ChatProcess):
url = f"{self._client_secrets['auth_uri']}?{param_str}"
webbrowser.open(url)
def setup_oauth_consent(self, consent_code):
response = requests.post(self.__class__.GOOGLE_OAUTH_TOKEN_URL, data={
async def setup_oauth_consent(self, consent_code):
response = await httpx.post(self.__class__.GOOGLE_OAUTH_TOKEN_URL, data={
'code': consent_code,
'client_id': self._client_secrets['client_id'],
'client_secret': self._client_secrets['client_secret'],
@ -171,8 +169,8 @@ class YoutubeLivePollProcess(ChatProcess):
self._access_token = auth['access_token']
self._refresh_token = auth['refresh_token']
def get_fresh_access_token(self):
response = requests.post(self.__class__.GOOGLE_OAUTH_TOKEN_URL, data={
async def get_fresh_access_token(self):
response = await httpx.post(self.__class__.GOOGLE_OAUTH_TOKEN_URL, data={
'client_id': self._client_secrets['client_id'],
'client_secret': self._client_secrets['client_secret'],
'refresh_token': self._refresh_token,

View file

@ -59,6 +59,8 @@ def cli(loglevel, show_time=False):
logging.getLogger('numba').setLevel(logging.WARN)
logging.getLogger('hypercorn.error').setLevel(logging.WARN)
logging.getLogger('hypercorn.access').setLevel(logging.WARN)
logging.getLogger('httpx').setLevel(logging.WARN)
logging.getLogger('httpcore').setLevel(logging.INFO)
# Quiet warnings
if loglevel > logging.DEBUG:
warnings.filterwarnings("ignore")

View file

@ -1,5 +1,4 @@
import importlib
from multiprocessing import Lock
import asyncio
from datetime import datetime, timedelta
import logging
@ -61,14 +60,10 @@ class MainProcess:
self.max_concurrent = max_concurrent
self.chat_processes = {}
self.chat_tasks = set()
self.plugins = {}
self.event_queue = asyncio.Queue()
# Init websocket server (event bus)
# HACK: Must be done here to avoid shadowing its asyncio loop
self.server_process = WebsocketServerProcess(*self.bus_conf)
self.server_process.start()
# Save sys.path since some config will clobber it
self._initial_syspath = sys.path
@ -78,10 +73,6 @@ class MainProcess:
del self.plugins[plugin_name]
del plugin
def _get_event_from_pipe(self, pipe):
event = pipe.recv()
self.event_queue.put_nowait(event)
def _setup_webserver(self):
self.webserver = Quart(__name__, static_folder=None, template_folder=None)
listen = ':'.join(self.web_conf)
@ -137,10 +128,10 @@ class MainProcess:
if event is None:
break
else:
self.server_process.message_pipe.send(event)
await self.bus_server.send(event)
logger.debug(f'Event after plugin chain - {event}')
elif isinstance(event, Delete):
self.server_process.message_pipe.send(event)
await self.bus_server.send(event)
else:
logger.error(f'Unknown data in event loop - {event}')
@ -157,7 +148,6 @@ class MainProcess:
async def user_setup(self):
config = kdl.Document(list(parse_kdl_deep(self.config_path)))
stdin_lock = Lock()
# Load secrets
secrets = {}
if node := config.get('secrets'):
@ -181,8 +171,11 @@ class MainProcess:
chat_module = import_or_reload_mod(module_name,
default_package='ovtk_audiencekit.chats',
external=False)
chat_process = chat_module.Process(stdin_lock, chat_name, **node.props, **secrets_for_mod)
chat_process = chat_module.Process(chat_name, self.event_queue)
self.chat_processes[chat_name] = chat_process
res = chat_process.setup(**node.props, **secrets_for_mod)
if asyncio.iscoroutine(res):
await res
except Exception as e:
raise ValueError(f'Failed to initalize {module_name} module "{chat_name}" - {e}')
@ -190,13 +183,9 @@ class MainProcess:
logger.warning('No chats configured!')
# Start chat processes
loop = asyncio.get_event_loop()
for process in self.chat_processes.values():
process.start()
# Bridge pipe to asyncio event loop
pipe = process.event_pipe
# REVIEW: This does not work on windows!!!! add_reader is not implemented
# in a way that supports pipes on either windows loop runners
asyncio.get_event_loop().add_reader(pipe.fileno(), lambda pipe=pipe: self._get_event_from_pipe(pipe))
self.chat_tasks.add(loop.create_task(process.run()))
# Load plugins
global_ctx = {}
@ -228,6 +217,8 @@ class MainProcess:
except Exception as e:
raise ValueError(f'Failed to initalize {module_name} plugin "{plugin_name}" - {e}')
self.bus_server.update_eventclasses()
# Run plugin definitions
with progressbar(list(config.nodes), label=f"Executing {self.config_path}") as bar:
for node in bar:
@ -241,13 +232,10 @@ class MainProcess:
await plugin_module._kdl_call(node, global_ctx)
async def user_shutdown(self):
for chat_task in self.chat_tasks:
chat_task.cancel()
self.chat_tasks.clear()
for process_name, process in list(reversed(self.chat_processes.items())):
pipe = process.event_pipe
process.control_pipe.send(ShutdownRequest('root'))
process.join(5)
if process.exitcode is None:
process.terminate()
asyncio.get_event_loop().remove_reader(pipe.fileno())
del self.chat_processes[process_name]
for plugin_name in list(reversed(self.plugins.keys())):
# NOTE: The plugin will likely stick around in memory for a bit after this,
@ -255,6 +243,15 @@ class MainProcess:
self._unload_plugin(plugin_name)
sys.path = self._initial_syspath
async def _discount_repl(self):
# REVIEW: Not a good UX at the moment (as new logs clobber the terminal entry)
async for line in reader:
line = line.strip()
if line == b'reload':
self.reload_ev.set()
elif line == b'quit':
self.shutdown_ev.set()
async def run(self):
self.shutdown_ev = asyncio.Event()
self.reload_ev = asyncio.Event()
@ -263,26 +260,13 @@ class MainProcess:
try:
# System setup
## Bridge websocket server pipe to asyncio loop
## REVIEW: This does not work on windows!!!! add_reader is not implemented
## in a way that supports pipes on either windows loop runners
ws_pipe = self.server_process.message_pipe
loop.add_reader(ws_pipe.fileno(), lambda: self._get_event_from_pipe(ws_pipe))
## Register stdin handler
## Make stdin handler
reader = asyncio.StreamReader()
await loop.connect_read_pipe(lambda: asyncio.StreamReaderProtocol(reader), sys.stdin)
async def discount_repl():
# REVIEW: Not a good UX at the moment (as new logs clobber the terminal entry)
async for line in reader:
line = line.strip()
if line == b'reload':
self.reload_ev.set()
elif line == b'quit':
self.shutdown_ev.set()
self.cli_task = loop.create_task(discount_repl())
## Scheduler for timed tasks
self._skehdule = TimedScheduler(max_tasks=1)
self._skehdule.start()
self.cli_task = loop.create_task(self._discount_repl())
## Init websocket server (external end of the event bus)
self.bus_server = WebsocketServerProcess(self.event_queue, *self.bus_conf)
self.bus_task = loop.create_task(self.bus_server.run())
## UI server
serve_coro = self._setup_webserver()
self.webserver_task = loop.create_task(serve_coro)
@ -299,7 +283,7 @@ class MainProcess:
logger.info(f'Ready to rumble! Press Ctrl+C to shut down')
reload_task = loop.create_task(self.reload_ev.wait())
done, pending = await asyncio.wait([*user_tasks, self.webserver_task, reload_task], return_when=asyncio.FIRST_COMPLETED)
done, pending = await asyncio.wait([*user_tasks, self.webserver_task, self.bus_task, reload_task], return_when=asyncio.FIRST_COMPLETED)
if reload_task in done:
logger.warn('Reloading (some events may be missed!)')
@ -324,6 +308,7 @@ class MainProcess:
serve_coro = self._setup_webserver()
self.webserver_task = loop.create_task(serve_coro)
else:
logger.debug(f'Task {done} completed - assuming something went wrong!')
break
except KeyboardInterrupt:
pass
@ -341,4 +326,4 @@ class MainProcess:
task.cancel()
await self.user_shutdown()
self.webserver_task.cancel()
self.server_process.terminate()
self.bus_task.cancel()

View file

@ -1,34 +1,31 @@
import asyncio
import json
from multiprocessing import Process, Pipe
import logging
import websockets
from ovtk_audiencekit.events import Event
from ovtk_audiencekit.utils import get_subclasses
from ovtk_audiencekit.utils import get_subclasses, format_exception
logger = logging.getLogger(__name__)
class WebsocketServerProcess(Process):
def __init__(self, bind, port):
super().__init__()
class WebsocketServerProcess:
def __init__(self, event_queue, bind, port):
self._bind = bind
self._port = port
self._pipe, self._caller_pipe = Pipe()
self.event_queue = event_queue
self._send_queue = asyncio.Queue()
self.clients = set()
self.update_eventclasses()
def update_eventclasses(self):
self._event_classes = get_subclasses(Event)
@property
def message_pipe(self):
return self._caller_pipe
# Data input (external application socket -> plugin/chat pipe)
async def handle_websocket(self, ws, path):
async def _handle_client(self, ws, path):
self.clients.add(ws)
try:
async for message in ws:
@ -39,7 +36,7 @@ class WebsocketServerProcess(Process):
type = type[0]
event_class = next(cls for cls in self._event_classes if cls.__name__ == type)
event = event_class.hydrate(**data.get('data', {}))
self._pipe.send(event)
self.event_queue.put_nowait(event)
else:
logger.warn('Unknown data recieved on websocket', message)
except json.decoder.JSONDecodeError as e:
@ -51,47 +48,44 @@ class WebsocketServerProcess(Process):
except websockets.exceptions.ConnectionClosedError:
pass
except asyncio.CancelledError:
ws.close()
await ws.close()
finally:
self.clients.discard(ws)
# Data output (plugin/chat pipe -> external application socket)
async def handle_pipe(self, pipe_ready):
async def send(self, event):
await self._send_queue.put(event)
async def _send_loop(self):
while True:
# Let other co-routines process until file descriptor is readable
await pipe_ready.wait()
pipe_ready.clear()
# Check if messages exist on the pipe before attempting to recv
# to avoid accidentally blocking the event loop when file
# descriptor does stuff we don't expect
if not self._pipe.poll():
continue
event = self._pipe.recv()
event = await self._send_queue.get()
# Serialize and send to registered clients
serialized = event.serialize()
if self.clients:
await asyncio.wait([self.safe_send(client, serialized) for client in self.clients])
await asyncio.wait([self._safe_send(client, serialized) for client in self.clients])
async def safe_send(self, client, serialized):
async def _safe_send(self, client, serialized):
try:
await client.send(serialized)
except (websockets.exceptions.ConnectionClosedError, websockets.exceptions.ConnectionClosedOK):
self.clients.discard(client)
def run(self):
# Setup asyncio websocket server
start_server = websockets.serve(self.handle_websocket, self._bind, self._port)
asyncio.get_event_loop().run_until_complete(start_server)
async def run(self):
loop = asyncio.get_event_loop()
tasks = set()
# Make an awaitable object that flips when the pipe's underlying file descriptor is readable
pipe_ready = asyncio.Event()
# REVIEW: This does not work on windows!!!!
asyncio.get_event_loop().add_reader(self._pipe.fileno(), pipe_ready.set)
# Make and start our infinite pipe listener task
asyncio.get_event_loop().create_task(self.handle_pipe(pipe_ready))
# Setup websocket server (input)
self.ws_server = await websockets.serve(self._handle_client, self._bind, self._port)
tasks.add(loop.create_task(self.ws_server.serve_forever()))
# Setup sending loop (output)
tasks.add(loop.create_task(self._send_loop()))
# Keep the asyncio code running in this thread until explicitly stopped
try:
asyncio.get_event_loop().run_forever()
except KeyboardInterrupt:
return 0
await asyncio.gather(*tasks)
except Exception as e:
logger.critical(f'Failure in bus process - {e}')
logger.info(format_exception(e))
raise e
finally:
for task in tasks:
task.cancel()

View file

@ -4,7 +4,7 @@ import pickle
import os
import maya
from requests.exceptions import HTTPError
from httpx import HTTPError
from owoify.owoify import owoify, Owoness
from ovtk_audiencekit.core import PluginBase

View file

@ -1,6 +1,6 @@
from argparse import ArgumentError
from requests.exceptions import HTTPError
from httpx import HTTPError
from ovtk_audiencekit.core import PluginBase
from ovtk_audiencekit.plugins.builtins.Command import Command, CommandTypes

View file

@ -1,60 +0,0 @@
from multiprocessing import Process, Pipe
import logging
import asyncio
import websockets
logger = logging.getLogger(__name__)
class NonBlockingWebsocket(Process):
def __init__(self, ws_uri):
super().__init__()
self.uri = ws_uri
self._pipe, self._caller_pipe = Pipe()
self.daemon = True
self._send_queue = asyncio.Queue()
async def _setup(self):
self._ws = await websockets.connect(self.uri)
def send(self, data):
self._caller_pipe.send(data)
async def _send(self, pipe_ready):
while True:
await pipe_ready.wait()
pipe_ready.clear()
if not self._pipe.poll():
continue
data = self._pipe.recv()
await self._ws.send(data)
def poll(self, *args):
return self._caller_pipe.poll(*args)
def recv(self, *args):
return self._caller_pipe.recv(*args)
async def _read(self):
while True:
data = await self._ws.recv()
self._pipe.send(data)
def run(self):
loop = asyncio.new_event_loop()
# Setup ws client
loop.run_until_complete(self._setup())
# Make an awaitable object that flips when the pipe's underlying file descriptor is readable
pipe_ready = asyncio.Event()
loop.add_reader(self._pipe.fileno(), pipe_ready.set)
# Make and start our infinite tasks
loop.create_task(self._send(pipe_ready))
loop.create_task(self._read())
# Keep the asyncio code running in this thread until explicitly stopped
try:
loop.run_forever()
except KeyboardInterrupt:
return 0

View file

@ -1,4 +1,3 @@
from .NonBlockingWebsocket import NonBlockingWebsocket
from .make_sync import make_sync
from .get_subclasses import get_subclasses
from .format_exception import format_exception