diff --git a/chats/ChatProcess.py b/chats/ChatProcess.py index a7d8221..5901f83 100644 --- a/chats/ChatProcess.py +++ b/chats/ChatProcess.py @@ -2,6 +2,11 @@ from abc import ABC, abstractmethod from multiprocessing import Process, Pipe, Queue from aenum import Enum, auto from traceback import format_exception +import time + + +class GracefulShutdownException(Exception): + pass class LIFECYCLE_MESSAGES(Enum): @@ -9,14 +14,20 @@ class LIFECYCLE_MESSAGES(Enum): class ChatProcess(Process, ABC): - def __init__(self): + def __init__(self, event_poll_frequency): super().__init__() + self._event_poll_frequency = event_poll_frequency + self._message_queue = Queue() self._pipe, self._caller_pipe = Pipe() self._state = None self._next_state = None + @abstractmethod + def keybinds(self): + return [] + @abstractmethod def loop(self, next_state): pass @@ -83,22 +94,38 @@ class ChatProcess(Process, ABC): else: self.state = response - if self._pipe.poll(timeout): - incoming_message = self._pipe.recv() - if incoming_message['type'] == LIFECYCLE_MESSAGES.SHUTDOWN: - break - message_type = incoming_message['type'] - args = {k: v for k, v in incoming_message.items() if k != 'type'} - response = self.process_messages(message_type, args, self._next_state) - if response is None: - pass - elif type(response) is tuple: - self.state, self._next_state = response + while timeout is None or timeout > 0: + if timeout is None: + period = self._event_poll_frequency else: - self.state = response + period = min(self._event_poll_frequency, timeout) + if self._pipe.poll(): + incoming_message = self._pipe.recv() + if incoming_message['type'] == LIFECYCLE_MESSAGES.SHUTDOWN: + raise GracefulShutdownException() + message_type = incoming_message['type'] + args = {k: v for k, v in incoming_message.items() if k != 'type'} + response = self.process_messages(message_type, args, self._next_state) + if response is None: + continue + elif type(response) is tuple: + self.state, self._next_state = response + else: + self.state = response + if timeout is None: + timeout = 0 + time.sleep(period) + if timeout is not None: + timeout -= period + + if timeout is not None and timeout > 0: + time.sleep(timeout) + + + except GracefulShutdownException: + return 0 except Exception as e: print(f'Failure in {self.__class__.__name__}: {e}') print(''.join(format_exception(None, e, e.__traceback__))) return -1 - return 0 diff --git a/chats/FakeChat.py b/chats/FakeChat.py index 397084e..083030b 100644 --- a/chats/FakeChat.py +++ b/chats/FakeChat.py @@ -1,4 +1,3 @@ -import time import random from enum import Enum, auto @@ -12,24 +11,38 @@ class CONTROL_MESSAGES(Enum): START_STOP = auto() +class STATES(Enum): + PAUSED = auto() + RUNNING = auto() + + class FakeChat(ChatProcess): - def __init__(self): - super().__init__() + def __init__(self, *args): + super().__init__(*args) self._max_messages_per_chunk = 1 self._max_delay = 10 - self._running = False + + @property + def keybinds(self): + return { + ord('f'): {'type': CONTROL_MESSAGES.START_STOP} + } def process_messages(self, message_type, args, next_state): if message_type == CONTROL_MESSAGES.START_STOP: - self._running = not self._running - text = 'Fake chat activated!' if self._running else 'Disabled fake chat' + running = not self.state == STATES.RUNNING + text = 'Fake chat activated!' if running else 'Disabled fake chat' self._message_queue.put({ 'text': text, 'author_name': 'Fake Chat', 'author_id': self.__class__.__name__, 'author_type': AUTHOR_TYPES.SYSTEM }) + return STATES.RUNNING if running else STATES.PAUSED + def loop(self, next_state): - if not self._running: + if self.state is None: + return STATES.PAUSED + elif self.state == STATES.PAUSED: return None while range(int(random.random() * (self._max_messages_per_chunk + 1))): diff --git a/chats/YoutubeLive.py b/chats/YoutubeLive.py index 25bbc5b..e9c4bae 100644 --- a/chats/YoutubeLive.py +++ b/chats/YoutubeLive.py @@ -27,8 +27,8 @@ class YoutubeLiveProcess(ChatProcess): GOOGLE_OAUTH_TOKEN_URL = 'https://accounts.google.com/o/oauth2/token' YOUTUBE_API_URL = 'https://www.googleapis.com/youtube/v3' - def __init__(self, client_secrets): - super().__init__() + def __init__(self, client_secrets, *args): + super().__init__(*args) self._client_secrets = client_secrets self._state_machine = self.bind_to_states(STATES) @@ -39,22 +39,10 @@ class YoutubeLiveProcess(ChatProcess): self._live_chat_id = None self._page_token = None - @classmethod - def normalize_message(cls, message): - if message['authorDetails']['isChatOwner']: - author_type = AUTHOR_TYPES.OWNER - elif message['authorDetails']['isChatModerator']: - author_type = AUTHOR_TYPES.MODERATOR - elif message['authorDetails']['isChatSponsor']: - author_type = AUTHOR_TYPES.PATRON - else: - author_type = AUTHOR_TYPES.USER - + @property + def keybinds(self): return { - 'text': message['snippet']['displayMessage'], - 'author_name': message['authorDetails']['displayName'], - 'author_id': message['authorDetails']['channelId'], - 'author_type': author_type, + ord('r'): {'type': CONTROL_MESSAGES.RESTART} } def loop(self, next_state): @@ -156,6 +144,24 @@ class YoutubeLiveProcess(ChatProcess): }) return None, None + @classmethod + def normalize_message(cls, message): + if message['authorDetails']['isChatOwner']: + author_type = AUTHOR_TYPES.OWNER + elif message['authorDetails']['isChatModerator']: + author_type = AUTHOR_TYPES.MODERATOR + elif message['authorDetails']['isChatSponsor']: + author_type = AUTHOR_TYPES.PATRON + else: + author_type = AUTHOR_TYPES.USER + + return { + 'text': message['snippet']['displayMessage'], + 'author_name': message['authorDetails']['displayName'], + 'author_id': message['authorDetails']['channelId'], + 'author_type': author_type, + } + def request_oauth_consent(self): params = { 'client_id': self._client_secrets['client_id'], diff --git a/main.py b/main.py index 7fee0dd..05c7002 100644 --- a/main.py +++ b/main.py @@ -1,25 +1,29 @@ import json import os +import time from chats import YoutubeLive from chats.ChatProcess import LIFECYCLE_MESSAGES from chats.FakeChat import FakeChat from ui.App import App +from plugins.PogCount import PogCounter - +EVENT_POLL_FREQ = 0.1 GOOGLE_API_SECRETS_PATH = 'googleapi.secret.json' if __name__ == '__main__': - chat_processes = [FakeChat()] + chat_processes = [FakeChat(EVENT_POLL_FREQ)] if os.path.exists(GOOGLE_API_SECRETS_PATH): with open(GOOGLE_API_SECRETS_PATH, 'r') as f: client_secrets = json.load(f)['installed'] - - chat_processes.append(YoutubeLive.YoutubeLiveProcess(client_secrets)) + chat_processes.append(YoutubeLive.YoutubeLiveProcess(client_secrets, EVENT_POLL_FREQ)) else: print('No client secrets - disabling youtube chat client. Hit "f" to enable a testing client') - app = App(chat_processes) + plugins = [PogCounter('../live-status.txt', prefix='Pog count: ')] + + app = App(chat_processes, plugins) + for process in chat_processes: process.start() app.start() @@ -37,6 +41,7 @@ if __name__ == '__main__': 'type': YoutubeLive.CONTROL_MESSAGES.OAUTH_CONSENT_TOKEN, 'token': code }) + time.sleep(EVENT_POLL_FREQ) for process in chat_processes: process.control_pipe.send({'type': LIFECYCLE_MESSAGES.SHUTDOWN}) diff --git a/plugins/PogCount.py b/plugins/PogCount.py new file mode 100644 index 0000000..ff66e5b --- /dev/null +++ b/plugins/PogCount.py @@ -0,0 +1,36 @@ +class PogCounter: + def __init__(self, out, prefix=None): + self.out = out + self.prefix = prefix + + self._count = 0 + self._timer = 0 + self._dirty = True + + def normalize(self, text): + def remove_dups(text): + last_char = None + for char in list(text): + if last_char == char: + continue + last_char = char + yield char + return ''.join(remove_dups(text.lower())) + + def tick(self, dt): + self._timer += dt + if self._timer > 1 and self._dirty: + with open(self.out, 'w') as f: + f.write(f"{self.prefix}{self._count}") + self._timer = 0 + self._dirty = False + + def handle_event(self, event): + pass + + def on_message(self, message): + normalized = self.normalize(message['text']) + if any(phrase in normalized for phrase in ['pog', 'p o g']): + self._count += 1 + self._dirty = True + return message diff --git a/plugins/__init__.py b/plugins/__init__.py new file mode 100644 index 0000000..64ec7ab --- /dev/null +++ b/plugins/__init__.py @@ -0,0 +1 @@ +from chats import AUTHOR_TYPES diff --git a/ui/App.py b/ui/App.py index ca9e15e..2876d92 100644 --- a/ui/App.py +++ b/ui/App.py @@ -11,19 +11,20 @@ from chats import FakeChat, YoutubeLive class App(Process): - def __init__(self, chat_processes): + def __init__(self, chat_processes, plugins): super().__init__() - self._running = False - self._display_surf = None - self._title = 'Chat monitor' + self._chat_processes = chat_processes + self._plugins = plugins - self._chat_processes = {proc.__class__: proc for proc in chat_processes} - self._log_scroll = 0 + self.size = self.width, self.height = 500, 1200 + self._title = 'Chat monitor' self._scroll_speed = 20 self._max_fps = 60 self.chat_log = deque(maxlen=100) - self.size = self.width, self.height = 500, 1200 + self._log_scroll = 0 + self._running = False + self._display_surf = None @property def deamon(self): @@ -38,25 +39,30 @@ class App(Process): def on_event(self, event): if event.type == pygame.QUIT: self._running = False - elif event.type == pygame.KEYDOWN and event.key == ord('r'): - self._chat_processes[YoutubeLive.YoutubeLiveProcess].control_pipe.send( - {'type': YoutubeLive.CONTROL_MESSAGES.RESTART} - ) - elif event.type == pygame.KEYDOWN and event.key == ord('f'): - self._chat_processes[FakeChat.FakeChat].control_pipe.send( - {'type': FakeChat.CONTROL_MESSAGES.START_STOP} - ) + elif event.type == pygame.KEYDOWN: + for process in self._chat_processes: + bound_action = process.keybinds.get(event.key) + if bound_action is not None: + process.control_pipe.send(bound_action) def tick(self, dt): + for process in self._chat_processes: + if not process.message_queue.empty(): + message = process.message_queue.get() + for plugin in self._plugins: + message = plugin.on_message(message) + if message is None: + break + else: + message_view = MessageView(message, self.size) + self.chat_log.append(message_view) + self._log_scroll += message_view.rect.height + for message in self.chat_log: message.tick(dt) - for name, process in self._chat_processes.items(): - if not process.message_queue.empty(): - new_message = process.message_queue.get() - message = MessageView(new_message, self.size) - self.chat_log.append(message) - self._log_scroll += message.rect.height + for plugin in self._plugins: + plugin.tick(dt) if (self._log_scroll > 0): self._log_scroll -= min(max((dt / 1000) * (self._log_scroll) * self._scroll_speed, 0.25), self.height / 4)