Compare commits
6 Commits
5c152c75f6
...
0e014fbca2
Author | SHA1 | Date |
---|---|---|
Derek | 0e014fbca2 | |
Derek | 0c000866ca | |
Derek | ebb0c0bbbf | |
Derek | 6405ead2f8 | |
Derek | 64d8e0408f | |
Derek | c734bd86c9 |
127
main.py
127
main.py
|
@ -1,6 +1,7 @@
|
|||
from flask import Flask, request, abort, url_for
|
||||
import telebot
|
||||
import click
|
||||
from telebot.apihelper import ApiException
|
||||
import json
|
||||
import time
|
||||
|
||||
|
@ -27,21 +28,38 @@ def oops(message, e):
|
|||
bot.send_message(message.chat.id, "Looks like something went wrong, send @skehmatics the following but note it may contain confidential information:")
|
||||
bot.send_message(message.chat.id, str(e))
|
||||
|
||||
def is_admin(chat, user):
|
||||
return bot.get_chat(chat).type not in ['group', 'supergroup'] or bot.get_chat_member(chat, user).status in ['creator', 'administrator']
|
||||
|
||||
|
||||
@bot.channel_post_handler(func=lambda msg: channel_commands(msg, ['start']))
|
||||
@bot.message_handler(commands=['start'])
|
||||
def start(message):
|
||||
bot.reply_to(message, "Heyo, itsa me, the notify bot!")
|
||||
bot.reply_to(message, "Heyo, itsa me, the notify bot!\nUse /register to get started")
|
||||
|
||||
@bot.channel_post_handler(func=lambda msg: channel_commands(msg, ['register']))
|
||||
@bot.message_handler(commands=['register'])
|
||||
def register(message):
|
||||
try:
|
||||
old_chat = Chat.query.filter_by(chat_id=message.chat.id).first()
|
||||
old_chat = Chat.query.filter_by(chat_id=message.chat.id, user_id=message.from_user.id).first()
|
||||
if old_chat:
|
||||
bot.send_message(message.chat.id, "It looks like you already have this bot activated with token `{token}`. If you want to revoke that token, use /revoke instead.".format(token=old_chat.token), parse_mode='Markdown')
|
||||
bot.send_message(message.chat.id, "It looks like you already have this bot activated. If you want to revoke the token, use /revoke instead.", parse_mode='Markdown')
|
||||
return
|
||||
chat = Chat(message.chat.id, message.chat.username)
|
||||
if not is_admin(message.chat.id, message.from_user.id):
|
||||
bot.send_message(message.chat.id, "It looks like you're not an admin here. Go bug one of them instead.", parse_mode='Markdown')
|
||||
return
|
||||
chat = Chat(message.chat.id, message.from_user.id)
|
||||
db.session.add(chat)
|
||||
db.session.commit()
|
||||
bot.send_message(chat.chat_id, "Here's your shiny new token: `{token}`".format(token=chat.token), parse_mode='Markdown')
|
||||
help(message, chat=chat)
|
||||
try:
|
||||
bot.send_message(chat.user_id, "Here's your shiny new token for {name}: `{token}`".format(token=chat.token, name=chat.get_friendly_name(bot)), parse_mode='Markdown')
|
||||
bot.send_message(message.chat.id, "Badabing, badaboom. Token PM'ed to you!", parse_mode='Markdown')
|
||||
except ApiException as e:
|
||||
if e.result.status_code == 403:
|
||||
bot.send_message(message.chat.id, "Badabing, bada...bish. We tried to PM a token to you, but were blocked. Send `/manage` in a PM to get things sorted!", parse_mode='Markdown')
|
||||
else:
|
||||
raise e
|
||||
help(message)
|
||||
except Exception as e:
|
||||
oops(message, e)
|
||||
|
||||
|
@ -49,36 +67,102 @@ def start(message):
|
|||
@bot.message_handler(commands=['revoke'])
|
||||
def revoke(message, chat=None):
|
||||
if chat is None:
|
||||
chat = Chat.query.filter_by(chat_id=message.chat.id).first()
|
||||
chat = Chat.query.filter_by(chat_id=message.chat.id, user_id=message.from_user.id).first()
|
||||
if chat is None:
|
||||
bot.send_message(message.chat.id, "Can't seem to find any tokens you own for this chat.")
|
||||
return
|
||||
try:
|
||||
chat_id = chat.chat_id
|
||||
db.session.delete(chat)
|
||||
db.session.commit()
|
||||
bot.send_message(chat_id, "Poof! Token for this chat is gone. Use /start to make a new one if you want")
|
||||
if message is not None:
|
||||
bot.send_message(chat_id, "Poof! Your token for this chat is gone. Use /register to make a new one if you want")
|
||||
except Exception as e:
|
||||
if message is None:
|
||||
raise e
|
||||
oops(message, e)
|
||||
|
||||
@bot.channel_post_handler(func=lambda msg: channel_commands(msg, ['ping']))
|
||||
@bot.message_handler(commands=['ping'])
|
||||
def ping(message):
|
||||
bot.reply_to(message, "Pong!")
|
||||
|
||||
@bot.channel_post_handler(func=lambda msg: channel_commands(msg, ['help']))
|
||||
@bot.message_handler(commands=['help'])
|
||||
def help(message, chat=None):
|
||||
if chat is None:
|
||||
chat = Chat.query.filter_by(chat_id=message.chat.id).first()
|
||||
if chat is None:
|
||||
bot.send_message(message.chat.id, "You're not currently registered! Use /start to get all setup")
|
||||
else:
|
||||
bot.send_message(message.chat.id, """You can send messages to this chat via a post request like so\
|
||||
def help(message):
|
||||
bot.send_message(message.chat.id, """You can send messages to this chat via a post request like so\
|
||||
\
|
||||
```bash
|
||||
curl -X POST -H "Content-Type: application/json" {url} --data '{{"text": "My cool message!"}}'
|
||||
```\
|
||||
If your token becomes compromised, or you don't want to use this bot anymore, use /revoke
|
||||
If you're using this bot in a Channel, you'll have to append all commands with the bot username
|
||||
See [the wiki](https://git.skehsucks.xyz/skeh/py_notify_bot/wiki) for more info""".format(url=url_for('send', token=chat.token, _external=True, _scheme='https')), parse_mode='Markdown')
|
||||
See [the wiki](https://git.skehsucks.xyz/skeh/py_notify_bot/wiki) for more info""".format(url=url_for('send', token='YOURTOKENHERE', _external=True, _scheme='https')), parse_mode='Markdown')
|
||||
|
||||
@bot.channel_post_handler(func=lambda msg: channel_commands(msg, ['ping']))
|
||||
@bot.message_handler(commands=['ping'])
|
||||
def ping(message):
|
||||
bot.reply_to(message, "Pong!")
|
||||
|
||||
|
||||
@bot.channel_post_handler(func=lambda msg: channel_commands(msg, ['manage']))
|
||||
@bot.message_handler(commands=['manage'])
|
||||
def manage_start(message, callback=None):
|
||||
if callback:
|
||||
chats = Chat.query.filter_by(user_id=callback.from_user.id)
|
||||
else:
|
||||
chats = Chat.query.filter_by(user_id=message.from_user.id)
|
||||
|
||||
markup = telebot.types.InlineKeyboardMarkup(row_width=1)
|
||||
for chat in chats:
|
||||
button = telebot.types.InlineKeyboardButton(chat.get_friendly_name(bot), callback_data='manage:{id}'.format(id=chat.chat_id))
|
||||
markup.add(button)
|
||||
|
||||
text = "Select a chat" if chats.count() > 0 else "No registered chats to manage! Use /register",
|
||||
if callback:
|
||||
bot.edit_message_text(text, reply_markup=markup, chat_id=callback.message.chat.id, message_id=callback.message.message_id)
|
||||
else:
|
||||
bot.send_message(message.chat.id, text, reply_markup=markup)
|
||||
|
||||
@bot.callback_query_handler(func=lambda call: call.data.startswith('managestart:'))
|
||||
def manage_back_handler(callback):
|
||||
manage_start(None, callback=callback)
|
||||
|
||||
@bot.callback_query_handler(func=lambda call: call.data.startswith('manage:'))
|
||||
def manage_show_settings(callback):
|
||||
chat_id = callback.data[callback.data.index(':')+1:]
|
||||
chat = Chat.query.filter_by(chat_id=chat_id).first()
|
||||
|
||||
markup = telebot.types.InlineKeyboardMarkup(row_width=2)
|
||||
show_button = telebot.types.InlineKeyboardButton("Show token", callback_data='manageshow:{id}'.format(id=chat.chat_id))
|
||||
revoke_button = telebot.types.InlineKeyboardButton("Revoke / un-register", callback_data='managerevoke:{id}:ask'.format(id=chat.chat_id))
|
||||
back_button = telebot.types.InlineKeyboardButton("\u00ab Back", callback_data='managestart:')
|
||||
markup.add(show_button, revoke_button, back_button)
|
||||
|
||||
bot.edit_message_text("Chat \"{name}\"\n token: `<hidden>`".format(name=chat.get_friendly_name(bot)), reply_markup=markup, parse_mode='Markdown', chat_id=callback.message.chat.id, message_id=callback.message.message_id)
|
||||
|
||||
@bot.callback_query_handler(func=lambda call: call.data.startswith('manageshow:'))
|
||||
def manage_show_token(callback):
|
||||
chat_id = callback.data[callback.data.index(':')+1:]
|
||||
chat = Chat.query.filter_by(chat_id=chat_id).first()
|
||||
|
||||
markup = telebot.types.InlineKeyboardMarkup(row_width=2)
|
||||
show_button = telebot.types.InlineKeyboardButton("Hide token", callback_data='manage:{id}'.format(id=chat.chat_id))
|
||||
revoke_button = telebot.types.InlineKeyboardButton("Revoke / un-register", callback_data='managerevoke:{id}:ask'.format(id=chat.chat_id))
|
||||
back_button = telebot.types.InlineKeyboardButton("\u00ab Back", callback_data='managestart:')
|
||||
markup.add(show_button, revoke_button, back_button)
|
||||
|
||||
bot.edit_message_text("Chat \"{name}\"\n token: `{token}`".format(name=chat.get_friendly_name(bot), token=chat.token), reply_markup=markup, parse_mode='Markdown', chat_id=callback.message.chat.id, message_id=callback.message.message_id)
|
||||
|
||||
@bot.callback_query_handler(func=lambda call: call.data.startswith('managerevoke:'))
|
||||
def manage_revoke_token(callback):
|
||||
chat_id, mode = callback.data.split(':')[1:]
|
||||
chat = Chat.query.filter_by(chat_id=chat_id).first()
|
||||
|
||||
if mode == 'ask':
|
||||
markup = telebot.types.InlineKeyboardMarkup(row_width=2)
|
||||
yes_button = telebot.types.InlineKeyboardButton("Yes", callback_data='managerevoke:{id}:yes'.format(id=chat.chat_id))
|
||||
no_button = telebot.types.InlineKeyboardButton("No", callback_data='manage:{id}'.format(id=chat.chat_id))
|
||||
markup.add(no_button, yes_button)
|
||||
bot.edit_message_text("Are you sure you want to revoke token for \"{name}\"?".format(name=chat.get_friendly_name(bot)), reply_markup=markup, chat_id=callback.message.chat.id, message_id=callback.message.message_id)
|
||||
else:
|
||||
revoke(None, chat=chat)
|
||||
manage_back_handler(callback)
|
||||
|
||||
|
||||
@app.route('/')
|
||||
|
@ -99,7 +183,6 @@ def send(token):
|
|||
else:
|
||||
abort(415)
|
||||
|
||||
|
||||
@app.route('/webhook/<key>/', methods=['POST'])
|
||||
def webhook(key):
|
||||
if key == app.config['bot']['token']:
|
||||
|
|
12
models.py
12
models.py
|
@ -5,11 +5,15 @@ db = SQLAlchemy()
|
|||
|
||||
class Chat(db.Model):
|
||||
id = db.Column(db.Integer, primary_key=True)
|
||||
chat_id = db.Column(db.Integer, unique=True, nullable=False)
|
||||
username = db.Column(db.String(), unique=True, nullable=True)
|
||||
chat_id = db.Column(db.Integer, nullable=False)
|
||||
user_id = db.Column(db.Integer, index=True, nullable=False)
|
||||
token = db.Column(db.String(43), index=True, nullable=False)
|
||||
|
||||
def __init__(self, chat_id, username=None):
|
||||
def __init__(self, chat_id, user_id):
|
||||
self.chat_id = chat_id
|
||||
self.username = username
|
||||
self.user_id = user_id
|
||||
self.token = secrets.token_urlsafe()
|
||||
|
||||
def get_friendly_name(self, bot):
|
||||
chat = bot.get_chat(self.chat_id)
|
||||
return chat.title or chat.username or chat.id
|
||||
|
|
Loading…
Reference in New Issue