Basic plugins support #5
|
@ -2,15 +2,20 @@ from abc import ABC, abstractmethod
|
|||
from multiprocessing import Process, Pipe, Queue
|
||||
from aenum import Enum, auto
|
||||
from traceback import format_exception
|
||||
import time
|
||||
|
||||
|
||||
class GracefulShutdownException(Exception):
|
||||
pass
|
||||
|
||||
class LIFECYCLE_MESSAGES(Enum):
|
||||
SHUTDOWN = auto()
|
||||
|
||||
|
||||
class ChatProcess(Process, ABC):
|
||||
def __init__(self):
|
||||
def __init__(self, event_poll_frequency):
|
||||
super().__init__()
|
||||
self._event_poll_frequency = event_poll_frequency
|
||||
|
||||
self._message_queue = Queue()
|
||||
self._pipe, self._caller_pipe = Pipe()
|
||||
|
||||
|
@ -83,22 +88,38 @@ class ChatProcess(Process, ABC):
|
|||
else:
|
||||
self.state = response
|
||||
|
||||
if self._pipe.poll(timeout):
|
||||
incoming_message = self._pipe.recv()
|
||||
if incoming_message['type'] == LIFECYCLE_MESSAGES.SHUTDOWN:
|
||||
break
|
||||
message_type = incoming_message['type']
|
||||
args = {k: v for k, v in incoming_message.items() if k != 'type'}
|
||||
response = self.process_messages(message_type, args, self._next_state)
|
||||
if response is None:
|
||||
pass
|
||||
elif type(response) is tuple:
|
||||
self.state, self._next_state = response
|
||||
while timeout is None or timeout > 0:
|
||||
if timeout is None:
|
||||
period = self._event_poll_frequency
|
||||
else:
|
||||
self.state = response
|
||||
period = min(self._event_poll_frequency, timeout)
|
||||
if self._pipe.poll():
|
||||
incoming_message = self._pipe.recv()
|
||||
if incoming_message['type'] == LIFECYCLE_MESSAGES.SHUTDOWN:
|
||||
raise GracefulShutdownException()
|
||||
message_type = incoming_message['type']
|
||||
args = {k: v for k, v in incoming_message.items() if k != 'type'}
|
||||
response = self.process_messages(message_type, args, self._next_state)
|
||||
if response is None:
|
||||
continue
|
||||
elif type(response) is tuple:
|
||||
self.state, self._next_state = response
|
||||
else:
|
||||
self.state = response
|
||||
|
||||
if timeout is None:
|
||||
timeout = 0
|
||||
time.sleep(period)
|
||||
if timeout is not None:
|
||||
timeout -= period
|
||||
|
||||
if timeout is not None and timeout > 0:
|
||||
time.sleep(timeout)
|
||||
|
||||
|
||||
except GracefulShutdownException:
|
||||
return 0
|
||||
except Exception as e:
|
||||
print(f'Failure in {self.__class__.__name__}: {e}')
|
||||
print(''.join(format_exception(None, e, e.__traceback__)))
|
||||
return -1
|
||||
return 0
|
||||
|
|
|
@ -12,24 +12,32 @@ class CONTROL_MESSAGES(Enum):
|
|||
START_STOP = auto()
|
||||
|
||||
|
||||
class STATES(Enum):
|
||||
PAUSED = auto()
|
||||
RUNNING = auto()
|
||||
|
||||
|
||||
class FakeChat(ChatProcess):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
def __init__(self, *args):
|
||||
super().__init__(*args)
|
||||
self._max_messages_per_chunk = 1
|
||||
self._max_delay = 10
|
||||
self._running = False
|
||||
|
||||
def process_messages(self, message_type, args, next_state):
|
||||
if message_type == CONTROL_MESSAGES.START_STOP:
|
||||
self._running = not self._running
|
||||
text = 'Fake chat activated!' if self._running else 'Disabled fake chat'
|
||||
running = not self.state == STATES.RUNNING
|
||||
text = 'Fake chat activated!' if running else 'Disabled fake chat'
|
||||
self._message_queue.put({
|
||||
'text': text,
|
||||
'author_name': 'Fake Chat', 'author_id': self.__class__.__name__, 'author_type': AUTHOR_TYPES.SYSTEM
|
||||
})
|
||||
|
||||
return STATES.RUNNING if running else STATES.PAUSED
|
||||
|
||||
def loop(self, next_state):
|
||||
if not self._running:
|
||||
if self.state is None:
|
||||
return STATES.PAUSED
|
||||
elif self.state == STATES.PAUSED:
|
||||
return None
|
||||
|
||||
while range(int(random.random() * (self._max_messages_per_chunk + 1))):
|
||||
|
|
|
@ -27,8 +27,8 @@ class YoutubeLiveProcess(ChatProcess):
|
|||
GOOGLE_OAUTH_TOKEN_URL = 'https://accounts.google.com/o/oauth2/token'
|
||||
YOUTUBE_API_URL = 'https://www.googleapis.com/youtube/v3'
|
||||
|
||||
def __init__(self, client_secrets):
|
||||
super().__init__()
|
||||
def __init__(self, client_secrets, *args):
|
||||
super().__init__(*args)
|
||||
self._client_secrets = client_secrets
|
||||
self._state_machine = self.bind_to_states(STATES)
|
||||
|
||||
|
|
9
main.py
9
main.py
|
@ -1,21 +1,21 @@
|
|||
import json
|
||||
import os
|
||||
import time
|
||||
|
||||
from chats import YoutubeLive
|
||||
from chats.ChatProcess import LIFECYCLE_MESSAGES
|
||||
from chats.FakeChat import FakeChat
|
||||
from ui.App import App
|
||||
|
||||
|
||||
EVENT_POLL_FREQ = 0.1
|
||||
GOOGLE_API_SECRETS_PATH = 'googleapi.secret.json'
|
||||
|
||||
if __name__ == '__main__':
|
||||
chat_processes = [FakeChat()]
|
||||
chat_processes = [FakeChat(EVENT_POLL_FREQ)]
|
||||
if os.path.exists(GOOGLE_API_SECRETS_PATH):
|
||||
with open(GOOGLE_API_SECRETS_PATH, 'r') as f:
|
||||
client_secrets = json.load(f)['installed']
|
||||
|
||||
chat_processes.append(YoutubeLive.YoutubeLiveProcess(client_secrets))
|
||||
chat_processes.append(YoutubeLive.YoutubeLiveProcess(client_secrets, EVENT_POLL_FREQ))
|
||||
else:
|
||||
print('No client secrets - disabling youtube chat client. Hit "f" to enable a testing client')
|
||||
|
||||
|
@ -37,6 +37,7 @@ if __name__ == '__main__':
|
|||
'type': YoutubeLive.CONTROL_MESSAGES.OAUTH_CONSENT_TOKEN,
|
||||
'token': code
|
||||
})
|
||||
time.sleep(EVENT_POLL_FREQ)
|
||||
|
||||
for process in chat_processes:
|
||||
process.control_pipe.send({'type': LIFECYCLE_MESSAGES.SHUTDOWN})
|
||||
|
|
Loading…
Reference in New Issue