212 lines
7.7 KiB
Python
212 lines
7.7 KiB
Python
"""Contains functions which are needed to run the runner script, but nowhere else."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from asyncio import get_event_loop, new_event_loop, set_event_loop
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, timedelta
|
|
from logging import getLogger
|
|
from os import getenv
|
|
from pathlib import Path
|
|
from sqlite3 import PARSE_COLNAMES, PARSE_DECLTYPES, connect
|
|
from traceback import format_exc
|
|
from typing import TYPE_CHECKING, cast
|
|
|
|
from telegram import __version__ as TG_VER
|
|
|
|
try:
|
|
from telegram import __version_info__
|
|
except ImportError:
|
|
__version_info__ = (0, 0, 0, 0, 0) # type: ignore[assignment]
|
|
|
|
if __version_info__ < (20, 0, 0, "alpha", 1):
|
|
raise RuntimeError(
|
|
f"This example is not compatible with your current PTB version {TG_VER}. To view the "
|
|
f"{TG_VER} version of this example, "
|
|
f"visit https://docs.python-telegram-bot.org/en/v{TG_VER}/examples.html"
|
|
)
|
|
|
|
from telegram import InlineKeyboardButton, InlineKeyboardMarkup
|
|
from telegram.ext import Application, CallbackQueryHandler
|
|
|
|
from src import market, require_env
|
|
from src.consts import EnvironmentVariable, MarketStatus, Response
|
|
|
|
if TYPE_CHECKING: # pragma: no cover
|
|
from sqlite3 import Connection
|
|
|
|
from telegram import Update
|
|
from telegram.ext import ContextTypes
|
|
|
|
from src import Market
|
|
|
|
logger = getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class State:
|
|
"""Keeps track of global state for while the Telegram Bot is running."""
|
|
|
|
application: Application = None # type: ignore
|
|
last_response: Response = Response.NO_ACTION
|
|
last_text: str = ""
|
|
|
|
|
|
state = State()
|
|
keyboard1 = [
|
|
[
|
|
InlineKeyboardButton("Do Nothing", callback_data=Response.NO_ACTION),
|
|
InlineKeyboardButton("Resolve to Default", callback_data=Response.USE_DEFAULT),
|
|
],
|
|
[InlineKeyboardButton("Cancel Market", callback_data=Response.CANCEL)],
|
|
]
|
|
keyboard2 = [
|
|
[
|
|
InlineKeyboardButton("Yes", callback_data="YES"),
|
|
InlineKeyboardButton("No", callback_data="NO"),
|
|
],
|
|
]
|
|
|
|
|
|
@require_env(EnvironmentVariable.DBName)
|
|
def register_db() -> Connection:
|
|
"""Get a connection to the appropriate database for this bot."""
|
|
name = getenv("DBName")
|
|
if name is None:
|
|
raise EnvironmentError()
|
|
do_initialize = not Path(name).exists()
|
|
conn = connect(name, detect_types=PARSE_COLNAMES | PARSE_DECLTYPES)
|
|
if do_initialize:
|
|
conn.execute("CREATE TABLE markets "
|
|
"(id INTEGER, market Market, check_rate REAL, last_checked TIMESTAMP);")
|
|
conn.commit()
|
|
logger.info("Database up and initialized.")
|
|
return conn
|
|
|
|
|
|
async def buttons(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
|
"""Parse the CallbackQuery and update the message text."""
|
|
logger.info("Got into the buttons handler")
|
|
query = update.callback_query
|
|
if query is None or query.data is None:
|
|
raise ValueError()
|
|
|
|
# CallbackQueries need to be answered, even if no notification to the user is needed
|
|
# Some clients may have trouble otherwise. See https://core.telegram.org/bots/api#callbackquery
|
|
await query.answer()
|
|
logger.info("Got a response from Telegram (%r)", query.data)
|
|
if query.data in ("YES", "NO"):
|
|
state.last_text += "\n" + query.data
|
|
await query.edit_message_text(text=state.last_text)
|
|
if query.data != "YES":
|
|
logger.info("Was not told yes. Backing out to ask again")
|
|
reply_markup = InlineKeyboardMarkup(keyboard1)
|
|
await query.edit_message_reply_markup(reply_markup=reply_markup)
|
|
else:
|
|
logger.info("Confirmation received, shutting dowm Telegram subsystem")
|
|
get_event_loop().stop() # lets telegram bot know it can stop
|
|
else:
|
|
state.last_response = Response(int(query.data))
|
|
logger.info("This corresponds to %r", state.last_response)
|
|
reply_markup = InlineKeyboardMarkup(keyboard2)
|
|
state.last_text += f"\nSelected option: {state.last_response.name}. Are you sure?"
|
|
await query.edit_message_text(text=state.last_text)
|
|
await query.edit_message_reply_markup(reply_markup=reply_markup)
|
|
|
|
|
|
@require_env(EnvironmentVariable.TelegramAPIKey, EnvironmentVariable.TelegramChatID)
|
|
def tg_main(text: str) -> Response:
|
|
"""Run the bot."""
|
|
async def post_init(self): # type: ignore
|
|
reply_markup = InlineKeyboardMarkup(keyboard1)
|
|
chat_id = getenv("TelegramChatID")
|
|
if chat_id is None:
|
|
raise EnvironmentError()
|
|
await self.bot.send_message(text=text, reply_markup=reply_markup, chat_id=int(chat_id))
|
|
|
|
application = Application.builder().token(cast(str, getenv("TelegramAPIKey"))).post_init(post_init).build()
|
|
application.add_handler(CallbackQueryHandler(buttons))
|
|
|
|
state.application = application
|
|
state.last_text = text
|
|
|
|
set_event_loop(new_event_loop())
|
|
application.run_polling()
|
|
return state.last_response
|
|
|
|
|
|
def watch_reply(conn: Connection, id_: int, mkt: Market, console_only: bool = False) -> None:
|
|
"""Watch for a reply from the bot manager in order to check the bot's work."""
|
|
text = (f"Hey, we need to resolve {id_} to {mkt.resolve_to()}. It currently has a value of {mkt.current_answer()}."
|
|
f"This market is called: {mkt.market.question}\n\n")
|
|
text += mkt.explain_abstract()
|
|
try:
|
|
text += "\n\n" + mkt.explain_specific()
|
|
except Exception:
|
|
print(format_exc())
|
|
logger.exception("Unable to explain a market's resolution automatically")
|
|
if not console_only:
|
|
response = tg_main(text)
|
|
else:
|
|
if input(text + " Use this default value? (y/N) ").lower().startswith("y"):
|
|
response = Response.USE_DEFAULT
|
|
elif input("Cancel the market? (y/N) ").lower().startswith("y"):
|
|
response = Response.CANCEL
|
|
else:
|
|
response = Response.NO_ACTION
|
|
|
|
if response == Response.NO_ACTION:
|
|
return
|
|
elif response == Response.USE_DEFAULT:
|
|
resp = mkt.resolve()
|
|
elif response == Response.CANCEL:
|
|
resp = mkt.cancel()
|
|
if mkt.status != MarketStatus.RESOLVED:
|
|
raise RuntimeError(resp)
|
|
conn.execute(
|
|
"DELETE FROM markets WHERE id = ?;",
|
|
(id_, )
|
|
)
|
|
conn.commit()
|
|
|
|
|
|
@require_env(EnvironmentVariable.ManifoldAPIKey, EnvironmentVariable.DBName)
|
|
def main(refresh: bool = False, console_only: bool = False) -> None:
|
|
"""Go through watched markets and act on rules (resolve, trade, etc)."""
|
|
conn = register_db()
|
|
mkt: market.Market
|
|
for id_, mkt, check_rate, last_checked in conn.execute("SELECT * FROM markets"):
|
|
msg = f"Currently checking ID {id_}: {mkt.market.question}"
|
|
print(msg)
|
|
logger.info(msg)
|
|
# print(mkt.explain_abstract())
|
|
# print("\n\n" + mkt.explain_specific() + "\n\n")
|
|
check = (refresh or not last_checked or (datetime.now() > last_checked + timedelta(hours=check_rate)))
|
|
msg = f' - [{"x" if check else " "}] Should I check?'
|
|
print(msg)
|
|
logger.info(msg)
|
|
if check:
|
|
check = mkt.should_resolve()
|
|
msg = f' - [{"x" if check else " "}] Is elligible to resolve (to {mkt.resolve_to()})?'
|
|
print(msg)
|
|
logger.info(msg)
|
|
if check:
|
|
watch_reply(conn, id_, mkt, console_only)
|
|
|
|
if mkt.market.isResolved:
|
|
msg = " - [x] Market resolved, removing from db"
|
|
print(msg)
|
|
logger.info(msg)
|
|
conn.execute(
|
|
"DELETE FROM markets WHERE id = ?;",
|
|
(id_, )
|
|
)
|
|
conn.commit()
|
|
|
|
conn.execute(
|
|
"UPDATE markets SET last_checked = ?, market = ? WHERE id = ?;",
|
|
(datetime.now(), mkt, id_)
|
|
)
|
|
conn.commit()
|
|
conn.close()
|