Basic plugins support #5

Merged
skeh merged 4 commits from plugins into main 2021-04-08 01:55:48 +00:00
7 changed files with 158 additions and 64 deletions

View File

@ -2,6 +2,11 @@ from abc import ABC, abstractmethod
from multiprocessing import Process, Pipe, Queue
from aenum import Enum, auto
from traceback import format_exception
import time
class GracefulShutdownException(Exception):
pass
class LIFECYCLE_MESSAGES(Enum):
@ -9,14 +14,20 @@ class LIFECYCLE_MESSAGES(Enum):
class ChatProcess(Process, ABC):
def __init__(self):
def __init__(self, event_poll_frequency):
super().__init__()
self._event_poll_frequency = event_poll_frequency
self._message_queue = Queue()
self._pipe, self._caller_pipe = Pipe()
self._state = None
self._next_state = None
@abstractmethod
def keybinds(self):
return []
@abstractmethod
def loop(self, next_state):
pass
@ -83,22 +94,38 @@ class ChatProcess(Process, ABC):
else:
self.state = response
if self._pipe.poll(timeout):
incoming_message = self._pipe.recv()
if incoming_message['type'] == LIFECYCLE_MESSAGES.SHUTDOWN:
break
message_type = incoming_message['type']
args = {k: v for k, v in incoming_message.items() if k != 'type'}
response = self.process_messages(message_type, args, self._next_state)
if response is None:
pass
elif type(response) is tuple:
self.state, self._next_state = response
while timeout is None or timeout > 0:
if timeout is None:
period = self._event_poll_frequency
else:
self.state = response
period = min(self._event_poll_frequency, timeout)
if self._pipe.poll():
incoming_message = self._pipe.recv()
if incoming_message['type'] == LIFECYCLE_MESSAGES.SHUTDOWN:
raise GracefulShutdownException()
message_type = incoming_message['type']
args = {k: v for k, v in incoming_message.items() if k != 'type'}
response = self.process_messages(message_type, args, self._next_state)
if response is None:
continue
elif type(response) is tuple:
self.state, self._next_state = response
else:
self.state = response
if timeout is None:
timeout = 0
time.sleep(period)
if timeout is not None:
timeout -= period
if timeout is not None and timeout > 0:
time.sleep(timeout)
except GracefulShutdownException:
return 0
except Exception as e:
print(f'Failure in {self.__class__.__name__}: {e}')
print(''.join(format_exception(None, e, e.__traceback__)))
return -1
return 0

View File

@ -1,4 +1,3 @@
import time
import random
from enum import Enum, auto
@ -12,24 +11,38 @@ class CONTROL_MESSAGES(Enum):
START_STOP = auto()
class STATES(Enum):
PAUSED = auto()
RUNNING = auto()
class FakeChat(ChatProcess):
def __init__(self):
super().__init__()
def __init__(self, *args):
super().__init__(*args)
self._max_messages_per_chunk = 1
self._max_delay = 10
self._running = False
@property
def keybinds(self):
return {
ord('f'): {'type': CONTROL_MESSAGES.START_STOP}
}
def process_messages(self, message_type, args, next_state):
if message_type == CONTROL_MESSAGES.START_STOP:
self._running = not self._running
text = 'Fake chat activated!' if self._running else 'Disabled fake chat'
running = not self.state == STATES.RUNNING
text = 'Fake chat activated!' if running else 'Disabled fake chat'
self._message_queue.put({
'text': text,
'author_name': 'Fake Chat', 'author_id': self.__class__.__name__, 'author_type': AUTHOR_TYPES.SYSTEM
})
return STATES.RUNNING if running else STATES.PAUSED
def loop(self, next_state):
if not self._running:
if self.state is None:
return STATES.PAUSED
elif self.state == STATES.PAUSED:
return None
while range(int(random.random() * (self._max_messages_per_chunk + 1))):

View File

@ -27,8 +27,8 @@ class YoutubeLiveProcess(ChatProcess):
GOOGLE_OAUTH_TOKEN_URL = 'https://accounts.google.com/o/oauth2/token'
YOUTUBE_API_URL = 'https://www.googleapis.com/youtube/v3'
def __init__(self, client_secrets):
super().__init__()
def __init__(self, client_secrets, *args):
super().__init__(*args)
self._client_secrets = client_secrets
self._state_machine = self.bind_to_states(STATES)
@ -39,22 +39,10 @@ class YoutubeLiveProcess(ChatProcess):
self._live_chat_id = None
self._page_token = None
@classmethod
def normalize_message(cls, message):
if message['authorDetails']['isChatOwner']:
author_type = AUTHOR_TYPES.OWNER
elif message['authorDetails']['isChatModerator']:
author_type = AUTHOR_TYPES.MODERATOR
elif message['authorDetails']['isChatSponsor']:
author_type = AUTHOR_TYPES.PATRON
else:
author_type = AUTHOR_TYPES.USER
@property
def keybinds(self):
return {
'text': message['snippet']['displayMessage'],
'author_name': message['authorDetails']['displayName'],
'author_id': message['authorDetails']['channelId'],
'author_type': author_type,
ord('r'): {'type': CONTROL_MESSAGES.RESTART}
}
def loop(self, next_state):
@ -156,6 +144,24 @@ class YoutubeLiveProcess(ChatProcess):
})
return None, None
@classmethod
def normalize_message(cls, message):
if message['authorDetails']['isChatOwner']:
author_type = AUTHOR_TYPES.OWNER
elif message['authorDetails']['isChatModerator']:
author_type = AUTHOR_TYPES.MODERATOR
elif message['authorDetails']['isChatSponsor']:
author_type = AUTHOR_TYPES.PATRON
else:
author_type = AUTHOR_TYPES.USER
return {
'text': message['snippet']['displayMessage'],
'author_name': message['authorDetails']['displayName'],
'author_id': message['authorDetails']['channelId'],
'author_type': author_type,
}
def request_oauth_consent(self):
params = {
'client_id': self._client_secrets['client_id'],

15
main.py
View File

@ -1,25 +1,29 @@
import json
import os
import time
from chats import YoutubeLive
from chats.ChatProcess import LIFECYCLE_MESSAGES
from chats.FakeChat import FakeChat
from ui.App import App
from plugins.PogCount import PogCounter
EVENT_POLL_FREQ = 0.1
GOOGLE_API_SECRETS_PATH = 'googleapi.secret.json'
if __name__ == '__main__':
chat_processes = [FakeChat()]
chat_processes = [FakeChat(EVENT_POLL_FREQ)]
if os.path.exists(GOOGLE_API_SECRETS_PATH):
with open(GOOGLE_API_SECRETS_PATH, 'r') as f:
client_secrets = json.load(f)['installed']
chat_processes.append(YoutubeLive.YoutubeLiveProcess(client_secrets))
chat_processes.append(YoutubeLive.YoutubeLiveProcess(client_secrets, EVENT_POLL_FREQ))
else:
print('No client secrets - disabling youtube chat client. Hit "f" to enable a testing client')
app = App(chat_processes)
plugins = [PogCounter('../live-status.txt', prefix='Pog count: ')]
app = App(chat_processes, plugins)
for process in chat_processes:
process.start()
app.start()
@ -37,6 +41,7 @@ if __name__ == '__main__':
'type': YoutubeLive.CONTROL_MESSAGES.OAUTH_CONSENT_TOKEN,
'token': code
})
time.sleep(EVENT_POLL_FREQ)
for process in chat_processes:
process.control_pipe.send({'type': LIFECYCLE_MESSAGES.SHUTDOWN})

36
plugins/PogCount.py Normal file
View File

@ -0,0 +1,36 @@
class PogCounter:
def __init__(self, out, prefix=None):
self.out = out
self.prefix = prefix
self._count = 0
self._timer = 0
self._dirty = True
def normalize(self, text):
def remove_dups(text):
last_char = None
for char in list(text):
if last_char == char:
continue
last_char = char
yield char
return ''.join(remove_dups(text.lower()))
def tick(self, dt):
self._timer += dt
if self._timer > 1 and self._dirty:
with open(self.out, 'w') as f:
f.write(f"{self.prefix}{self._count}")
self._timer = 0
self._dirty = False
def handle_event(self, event):
pass
def on_message(self, message):
normalized = self.normalize(message['text'])
if any(phrase in normalized for phrase in ['pog', 'p o g']):
self._count += 1
self._dirty = True
return message

1
plugins/__init__.py Normal file
View File

@ -0,0 +1 @@
from chats import AUTHOR_TYPES

View File

@ -11,19 +11,20 @@ from chats import FakeChat, YoutubeLive
class App(Process):
def __init__(self, chat_processes):
def __init__(self, chat_processes, plugins):
super().__init__()
self._running = False
self._display_surf = None
self._title = 'Chat monitor'
self._chat_processes = chat_processes
self._plugins = plugins
self._chat_processes = {proc.__class__: proc for proc in chat_processes}
self._log_scroll = 0
self.size = self.width, self.height = 500, 1200
self._title = 'Chat monitor'
self._scroll_speed = 20
self._max_fps = 60
self.chat_log = deque(maxlen=100)
self.size = self.width, self.height = 500, 1200
self._log_scroll = 0
self._running = False
self._display_surf = None
@property
def deamon(self):
@ -38,25 +39,30 @@ class App(Process):
def on_event(self, event):
if event.type == pygame.QUIT:
self._running = False
elif event.type == pygame.KEYDOWN and event.key == ord('r'):
self._chat_processes[YoutubeLive.YoutubeLiveProcess].control_pipe.send(
{'type': YoutubeLive.CONTROL_MESSAGES.RESTART}
)
elif event.type == pygame.KEYDOWN and event.key == ord('f'):
self._chat_processes[FakeChat.FakeChat].control_pipe.send(
{'type': FakeChat.CONTROL_MESSAGES.START_STOP}
)
elif event.type == pygame.KEYDOWN:
for process in self._chat_processes:
bound_action = process.keybinds.get(event.key)
if bound_action is not None:
process.control_pipe.send(bound_action)
def tick(self, dt):
for process in self._chat_processes:
if not process.message_queue.empty():
message = process.message_queue.get()
for plugin in self._plugins:
message = plugin.on_message(message)
if message is None:
break
else:
message_view = MessageView(message, self.size)
self.chat_log.append(message_view)
self._log_scroll += message_view.rect.height
for message in self.chat_log:
message.tick(dt)
for name, process in self._chat_processes.items():
if not process.message_queue.empty():
new_message = process.message_queue.get()
message = MessageView(new_message, self.size)
self.chat_log.append(message)
self._log_scroll += message.rect.height
for plugin in self._plugins:
plugin.tick(dt)
if (self._log_scroll > 0):
self._log_scroll -= min(max((dt / 1000) * (self._log_scroll) * self._scroll_speed, 0.25), self.height / 4)