stream-chat/chats/YoutubeLive.py
2021-04-09 21:48:39 -07:00

202 lines
7.7 KiB
Python

import os
import json
import webbrowser
from enum import Enum, auto
import requests
from . import AUTHOR_TYPES, Message
from .ChatProcess import ChatProcess
class STATES(Enum):
WAITING_FOR_BROADCAST = auto()
POLLING = auto()
REFRESH = auto()
UNAUTHORIZED = auto()
WAITING_FOR_CONSENT_CODE = auto()
FAILURE = auto()
class CONTROL_MESSAGES(Enum):
NEEDS_OAUTH_CONSENT_TOKEN = auto()
OAUTH_CONSENT_TOKEN = auto()
RESTART = auto()
class Process(ChatProcess):
CHAT_NAME = 'YouTube Live'
GOOGLE_OAUTH_TOKEN_URL = 'https://accounts.google.com/o/oauth2/token'
YOUTUBE_API_URL = 'https://www.googleapis.com/youtube/v3'
def __init__(self, *args, client_secrets_path=None):
if client_secrets_path is None or not os.path.exists(client_secrets_path):
raise ValueError('Missing client secrets')
with open(client_secrets_path, 'r') as f:
client_secrets = json.load(f).get('installed')
if client_secrets is None:
raise ValueError('Malformed client secrets file - missing installed section')
super().__init__(*args)
self._client_secrets = client_secrets
self._state_machine = self.bind_to_states(STATES)
self._consent_code = None
self._refresh_token = None
self._access_token = None
self._stream_title = None
self._live_chat_id = None
self._page_token = None
@property
def keybinds(self):
return {
ord('r'): {'type': CONTROL_MESSAGES.RESTART}
}
def loop(self, next_state):
if self.state is None:
if os.path.exists('refresh_token.secret'):
with open('refresh_token.secret', 'r') as f:
self._refresh_token = f.read()
return STATES.REFRESH, STATES.WAITING_FOR_BROADCAST
return STATES.UNAUTHORIZED, STATES.WAITING_FOR_BROADCAST
return self._state_machine(self.state, next_state)
def on_unauthorized(self, next_state):
self.request_oauth_consent()
self._pipe.send(CONTROL_MESSAGES.NEEDS_OAUTH_CONSENT_TOKEN)
return None
def on_waiting_for_consent_code(self, next_state):
pass
def on_failure(self, next_state):
pass
def on_refresh(self, next_state):
self.get_fresh_access_token()
return next_state
def on_waiting_for_broadcast(self, next_state):
response = requests.get(f'{Process.YOUTUBE_API_URL}/liveBroadcasts',
params={'part': 'snippet', 'broadcastStatus': 'active'},
headers={'Authorization': f'Bearer {self._access_token}'})
if (response.status_code == 401):
return STATES.REFRESH, self.state
items = response.json()['items']
if len(items) == 1:
stream_snippet = items[0]['snippet']
self._live_chat_id = stream_snippet['liveChatId']
self._stream_title = stream_snippet['title']
return STATES.POLLING
return 30
def on_polling(self, next_state):
response = requests.get(f'{Process.YOUTUBE_API_URL}/liveChat/messages',
params={'liveChatId': self._live_chat_id,
'part': 'snippet,authorDetails',
'hl': 'en_US',
'pageToken': self._page_token},
headers={'Authorization': f'Bearer {self._access_token}'}
)
if (response.status_code == 401):
return STATES.REFRESH, self.state
if response.status_code == requests.codes.ok:
data = response.json()
self._page_token = data['nextPageToken']
if len(data['items']):
for message in data['items']:
normalized = Process.normalize_message(message)
self._message_queue.put(normalized)
sleep_milis = max(data['pollingIntervalMillis'], 5000)
return sleep_milis / 1000
else:
print(response.json())
return STATES.FAILURE
def on_state_enter(self, new_state):
status_messages = {
STATES.WAITING_FOR_BROADCAST: 'Waiting for active broadcast...',
STATES.POLLING: f'''Tuning into "{self._stream_title}" - ready to rumble ~\n''',
STATES.UNAUTHORIZED: 'Unauthorized - see terminal',
STATES.FAILURE: 'YouTube API returned a bad status, polling stopped - see terminal',
}
message = status_messages.get(new_state)
if message is not None:
sys_msg = Message(message, Process.CHAT_NAME, self.__class__.__name__, AUTHOR_TYPES.SYSTEM)
self._message_queue.put(sys_msg)
def process_messages(self, message_type, args, next_state):
if message_type == CONTROL_MESSAGES.OAUTH_CONSENT_TOKEN:
self.setup_oauth_consent(args['token'])
return next_state
elif message_type == CONTROL_MESSAGES.RESTART:
if self.state == STATES.POLLING:
self._message_queue.put(Message('Restart requested, but service is in healthy state! Ignoring...',
Process.CHAT_NAME, self.__class__.__name__, AUTHOR_TYPES.SYSTEM))
else:
self._message_queue.put(Message('Restarting service...',
Process.CHAT_NAME, self.__class__.__name__, AUTHOR_TYPES.SYSTEM))
return None, None
@classmethod
def normalize_message(cls, message):
if message['authorDetails']['isChatOwner']:
author_type = AUTHOR_TYPES.OWNER
elif message['authorDetails']['isChatModerator']:
author_type = AUTHOR_TYPES.MODERATOR
elif message['authorDetails']['isChatSponsor']:
author_type = AUTHOR_TYPES.PATRON
else:
author_type = AUTHOR_TYPES.USER
text = message['snippet']['displayMessage']
author_name = message['authorDetails']['displayName']
author_id = message['authorDetails']['channelId']
return Message(text, author_name, author_id, author_type)
def request_oauth_consent(self):
params = {
'client_id': self._client_secrets['client_id'],
'redirect_uri': 'urn:ietf:wg:oauth:2.0:oob',
'scope': 'https://www.googleapis.com/auth/youtube',
'response_type': 'code',
}
param_str = '&'.join(f'{k}={v}' for (k, v) in params.items())
url = f"{self._client_secrets['auth_uri']}?{param_str}"
webbrowser.open(url)
def setup_oauth_consent(self, consent_code):
response = requests.post(Process.GOOGLE_OAUTH_TOKEN_URL, data={
'code': consent_code,
'client_id': self._client_secrets['client_id'],
'client_secret': self._client_secrets['client_secret'],
'redirect_uri': 'urn:ietf:wg:oauth:2.0:oob',
'grant_type': 'authorization_code',
})
response.raise_for_status()
auth = response.json()
with open('refresh_token.secret', 'w') as f:
f.write(auth['refresh_token'])
self._access_token = auth['access_token']
self._refresh_token = auth['refresh_token']
def get_fresh_access_token(self):
response = requests.post(Process.GOOGLE_OAUTH_TOKEN_URL, data={
'client_id': self._client_secrets['client_id'],
'client_secret': self._client_secrets['client_secret'],
'refresh_token': self._refresh_token,
'grant_type': 'refresh_token'
})
response.raise_for_status()
auth = response.json()
self._access_token = auth['access_token']