import asyncpg import yaml from datetime import datetime, timedelta from ipaddress import ip_address from sanic import Sanic, response app = Sanic(name="openproxyherder") config = yaml.full_load(open("config.yaml").read()) @app.route("/api/v1/add", methods=["POST"]) async def add(request): data = request.json submitted_proxies = data.get("proxies") if not submitted_proxies: return response.text("no proxies given", status=400) submitted_ips = [proxy["ip"] for proxy in submitted_proxies] async with app.pool.acquire() as connection: current_proxy_results = await connection.fetch( "select ip, port from proxies where host(ip) = ANY($1)", submitted_ips ) current_proxies = [(p["ip"], p["port"]) for p in current_proxy_results] new_proxies = [] for proxy in submitted_proxies: if (ip_address(proxy["ip"]), int(proxy["port"])) not in current_proxies: new_proxies.append( ( proxy["ip"], int(proxy["port"]), proxy["proxy_type"], datetime.now(tz='UTC'), proxy.get("comment", ""), ) ) async with app.ctx.pool.acquire() as connection: result = await connection.copy_records_to_table( "proxies", records=new_proxies, columns=[ "ip", "port", "proxy_type", "submitted_at", "comment", ], ) return response.text(f"ok, added {len(new_proxies)} new proxies") @app.route("/api/v1/getproxies") async def getproxies(request): proxy_types = request.args["proxy_type"] if not proxy_types: return response.text("No proxy_type specified", status=400) amount = int(request.args.get("amount")) tdelta = int(request.args.get("timedelta")) max_num_failures = int(request.args.get("max_num_failures")) select_query = """ select id, host(ip) as ip, port, host(exit_ip) as exit_ip, proxy_type, comment from proxies where status=ANY($1) and num_failures < $2 and last_checked < $3 and proxy_type=ANY($4) order by last_checked asc limit $5""" async with app.ctx.pool.acquire() as connection: proxy_results = await connection.fetch( select_query, ["unscanned"], max_num_failures, datetime.now(tz='UTC'), proxy_types, amount, ) proxies = [dict(p) for p in proxy_results] if len(proxies) < amount: more_proxy_results = await connection.fetch( select_query, ["active", "inactive"], max_num_failures, datetime.now(tz='UTC') - timedelta(seconds=tdelta), proxy_types, amount - len(proxies), ) proxies.extend([dict(p) for p in more_proxy_results]) proxy_ids = [p["id"] for p in proxies] await connection.fetch("update proxies set status='scanning' where id = ANY($1)", proxy_ids) return response.json({"proxies": proxies}, status=200) @app.route("/api/v1/updateproxies", methods=["POST"]) async def updateproxies(request): data = request.json submitted_proxies = data.get("proxies") if not submitted_proxies: return response.text("no proxies given", status=400) update_active_query = """ update proxies set exit_ip=$1, proxy_type=$2, status=$3, last_seen=$4, last_checked=$4, comment=$5, num_failures=0 where id=$6""" update_inactive_query = """ update proxies set status=$1, last_checked=$2, comment=$3, num_failures=num_failures + 1 where id=$4""" async with app.ctx.pool.acquire() as connection: for proxy in submitted_proxies: if proxy["status"] == "active": await connection.fetch( update_active_query, proxy["exit_ip"], proxy["proxy_type"], proxy["status"], datetime.now(tz='UTC'), proxy["comment"], proxy["id"] ) elif proxy["status"] == "inactive": await connection.fetch( update_inactive_query, proxy["status"], datetime.now(tz='UTC'), proxy["comment"], proxy["id"] ) return response.text("ok") @app.listener("before_server_start") async def start_db(app, loop): DB_CONFIG = { "host": config["db_host"], "port": config["db_port"], "user": config["db_user"], "password": config["db_password"], "database": config["db_name"], } app.ctx.pool = await asyncpg.create_pool(**DB_CONFIG, loop=loop, max_size=100) if __name__ == "__main__": app.run(host=config["listen_host"], port=config["listen_port"], access_log=config["access_log"])