ManifoldMarketManager/src/application.py

212 lines
7.7 KiB
Python

"""Contains functions which are needed to run the runner script, but nowhere else."""
from __future__ import annotations
from asyncio import get_event_loop, new_event_loop, set_event_loop
from dataclasses import dataclass
from datetime import datetime, timedelta
from logging import getLogger
from os import getenv
from pathlib import Path
from sqlite3 import PARSE_COLNAMES, PARSE_DECLTYPES, connect
from traceback import format_exc
from typing import TYPE_CHECKING, cast
from telegram import __version__ as TG_VER
try:
from telegram import __version_info__
except ImportError:
__version_info__ = (0, 0, 0, 0, 0) # type: ignore[assignment]
if __version_info__ < (20, 0, 0, "alpha", 1):
raise RuntimeError(
f"This example is not compatible with your current PTB version {TG_VER}. To view the "
f"{TG_VER} version of this example, "
f"visit https://docs.python-telegram-bot.org/en/v{TG_VER}/examples.html"
)
from telegram import InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import Application, CallbackQueryHandler
from src import market, require_env
from src.consts import EnvironmentVariable, MarketStatus, Response
if TYPE_CHECKING: # pragma: no cover
from sqlite3 import Connection
from telegram import Update
from telegram.ext import ContextTypes
from src import Market
logger = getLogger(__name__)
@dataclass
class State:
"""Keeps track of global state for while the Telegram Bot is running."""
application: Application = None # type: ignore
last_response: Response = Response.NO_ACTION
last_text: str = ""
state = State()
keyboard1 = [
[
InlineKeyboardButton("Do Nothing", callback_data=Response.NO_ACTION),
InlineKeyboardButton("Resolve to Default", callback_data=Response.USE_DEFAULT),
],
[InlineKeyboardButton("Cancel Market", callback_data=Response.CANCEL)],
]
keyboard2 = [
[
InlineKeyboardButton("Yes", callback_data="YES"),
InlineKeyboardButton("No", callback_data="NO"),
],
]
@require_env(EnvironmentVariable.DBName)
def register_db() -> Connection:
"""Get a connection to the appropriate database for this bot."""
name = getenv("DBName")
if name is None:
raise EnvironmentError()
do_initialize = not Path(name).exists()
conn = connect(name, detect_types=PARSE_COLNAMES | PARSE_DECLTYPES)
if do_initialize:
conn.execute("CREATE TABLE markets "
"(id INTEGER, market Market, check_rate REAL, last_checked TIMESTAMP);")
conn.commit()
logger.info("Database up and initialized.")
return conn
async def buttons(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Parse the CallbackQuery and update the message text."""
logger.info("Got into the buttons handler")
query = update.callback_query
if query is None or query.data is None:
raise ValueError()
# CallbackQueries need to be answered, even if no notification to the user is needed
# Some clients may have trouble otherwise. See https://core.telegram.org/bots/api#callbackquery
await query.answer()
logger.info("Got a response from Telegram (%r)", query.data)
if query.data in ("YES", "NO"):
state.last_text += "\n" + query.data
await query.edit_message_text(text=state.last_text)
if query.data != "YES":
logger.info("Was not told yes. Backing out to ask again")
reply_markup = InlineKeyboardMarkup(keyboard1)
await query.edit_message_reply_markup(reply_markup=reply_markup)
else:
logger.info("Confirmation received, shutting dowm Telegram subsystem")
get_event_loop().stop() # lets telegram bot know it can stop
else:
state.last_response = Response(int(query.data))
logger.info("This corresponds to %r", state.last_response)
reply_markup = InlineKeyboardMarkup(keyboard2)
state.last_text += f"\nSelected option: {state.last_response.name}. Are you sure?"
await query.edit_message_text(text=state.last_text)
await query.edit_message_reply_markup(reply_markup=reply_markup)
@require_env(EnvironmentVariable.TelegramAPIKey, EnvironmentVariable.TelegramChatID)
def tg_main(text: str) -> Response:
"""Run the bot."""
async def post_init(self): # type: ignore
reply_markup = InlineKeyboardMarkup(keyboard1)
chat_id = getenv("TelegramChatID")
if chat_id is None:
raise EnvironmentError()
await self.bot.send_message(text=text, reply_markup=reply_markup, chat_id=int(chat_id))
application = Application.builder().token(cast(str, getenv("TelegramAPIKey"))).post_init(post_init).build()
application.add_handler(CallbackQueryHandler(buttons))
state.application = application
state.last_text = text
set_event_loop(new_event_loop())
application.run_polling()
return state.last_response
def watch_reply(conn: Connection, id_: int, mkt: Market, console_only: bool = False) -> None:
"""Watch for a reply from the bot manager in order to check the bot's work."""
text = (f"Hey, we need to resolve {id_} to {mkt.resolve_to()}. It currently has a value of {mkt.current_answer()}."
f"This market is called: {mkt.market.question}\n\n")
text += mkt.explain_abstract()
try:
text += "\n\n" + mkt.explain_specific()
except Exception:
print(format_exc())
logger.exception("Unable to explain a market's resolution automatically")
if not console_only:
response = tg_main(text)
else:
if input(text + " Use this default value? (y/N) ").lower().startswith("y"):
response = Response.USE_DEFAULT
elif input("Cancel the market? (y/N) ").lower().startswith("y"):
response = Response.CANCEL
else:
response = Response.NO_ACTION
if response == Response.NO_ACTION:
return
elif response == Response.USE_DEFAULT:
resp = mkt.resolve()
elif response == Response.CANCEL:
resp = mkt.cancel()
if mkt.status != MarketStatus.RESOLVED:
raise RuntimeError(resp)
conn.execute(
"DELETE FROM markets WHERE id = ?;",
(id_, )
)
conn.commit()
@require_env(EnvironmentVariable.ManifoldAPIKey, EnvironmentVariable.DBName)
def main(refresh: bool = False, console_only: bool = False) -> None:
"""Go through watched markets and act on rules (resolve, trade, etc)."""
conn = register_db()
mkt: market.Market
for id_, mkt, check_rate, last_checked in conn.execute("SELECT * FROM markets"):
msg = f"Currently checking ID {id_}: {mkt.market.question}"
print(msg)
logger.info(msg)
# print(mkt.explain_abstract())
# print("\n\n" + mkt.explain_specific() + "\n\n")
check = (refresh or not last_checked or (datetime.now() > last_checked + timedelta(hours=check_rate)))
msg = f' - [{"x" if check else " "}] Should I check?'
print(msg)
logger.info(msg)
if check:
check = mkt.should_resolve()
msg = f' - [{"x" if check else " "}] Is elligible to resolve (to {mkt.resolve_to()})?'
print(msg)
logger.info(msg)
if check:
watch_reply(conn, id_, mkt, console_only)
if mkt.market.isResolved:
msg = " - [x] Market resolved, removing from db"
print(msg)
logger.info(msg)
conn.execute(
"DELETE FROM markets WHERE id = ?;",
(id_, )
)
conn.commit()
conn.execute(
"UPDATE markets SET last_checked = ?, market = ? WHERE id = ?;",
(datetime.now(), mkt, id_)
)
conn.commit()
conn.close()