ovtk_audiencekit/chats/Twitch/TwitchProcess.py

105 lines
3.5 KiB
Python

import time
from enum import Enum, auto
from itertools import chain
from chats import ChatProcess
from events import Message, SysMessage
from .sources.TwitchIRC import TwitchIRC, TwitchIRCException
from .sources.TwitchAPI import TwitchAPI
from .sources.TwitchEventSub import TwitchEventSub
from .sources.BTTV import BTTV
class STATES(Enum):
CONNECTING = auto()
TIMEOUT = auto()
READING = auto()
valid_emote_resolutions = [1.0, 2.0, 3.0, 4.0]
class TwitchProcess(ChatProcess):
def __init__(self, *args,
# Shared
channel_name=None, client_id=None, access_token=None,
# IRC options
botname=None, emote_res=4.0,
# EventSub options
eventsub_host='wss://ovtk.skeh.site/twitch',
# Inheritance boilerplate
**kwargs):
super().__init__(*args, **kwargs)
if channel_name is None or client_id is None or access_token is None:
raise ValueError('Twitch chat is missing config requirements')
if emote_res not in valid_emote_resolutions:
raise ValueError(f'Invalid emote res - must be one of: {", ".join(valid_emote_resolutions)}')
if botname is None:
botname = channel_name
self._state_machine = self.bind_to_states(STATES)
self._channel_name = channel_name
self._username = botname
self._client_id = client_id
self._token = access_token
self.state = STATES.CONNECTING
self.api = TwitchAPI(self._client_id, self._token)
self.shared.api = self.api
cheermotes = self.api.get_cheermotes(self._channel_name)
target_data = self.api.get_user_details(self._channel_name)
self.shared.target_data = target_data
self.shared.users = []
self.irc = TwitchIRC(self._channel_name, self._username, self._token, cheermotes, emote_res, self.shared)
self.eventsub = TwitchEventSub(self.api, eventsub_host)
self.bttv = BTTV(target_data['user']['id'])
def loop(self, next_state):
return self._state_machine(self.state, next_state)
def on_control_event(self, event, next_state):
if isinstance(event, Message):
# Twitch.... why... why no newlines....
for line in event.text.split('\n'):
if not line.strip():
continue
if event.replies_to:
line = f"@{event.replies_to.user_name} {line}"
self.irc.send(line)
def on_state_enter(self, new_state):
status_messages = {
STATES.READING: 'Connected to channel!',
}
message = status_messages.get(new_state)
if message is not None:
sys_msg = SysMessage(self._name, message)
self.publish(sys_msg)
def on_connecting(self, next_state):
self.irc.connect()
self.eventsub.subscribe(self._channel_name)
return STATES.READING
def on_reading(self, next_state):
try:
for event in chain(self.irc.read(0.1), self.eventsub.read(0.1)):
# Retarget event
event.via = self._name
if isinstance(event, Message):
event = self.bttv.hydrate(event)
self.publish(event)
return 0
except TimeoutError:
return STATES.TIMEOUT
def on_timeout(self, next_state):
time.sleep(3)
return STATES.CONNECTING
def on_failure(self, next_state):
return None