openproxyherder/openproxyherder.py

142 lines
5.0 KiB
Python

import asyncpg
import yaml
from datetime import datetime, timedelta, timezone
from ipaddress import ip_address
from sanic import Sanic, response
app = Sanic(name="openproxyherder")
config = yaml.full_load(open("config.yaml").read())
@app.route("/api/v1/add", methods=["POST"])
async def add(request):
data = request.json
submitted_proxies = data.get("proxies")
if not submitted_proxies:
return response.text("no proxies given", status=400)
submitted_ips = [proxy["ip"] for proxy in submitted_proxies]
async with app.pool.acquire() as connection:
current_proxy_results = await connection.fetch(
"select ip, port from proxies where host(ip) = ANY($1)", submitted_ips
)
current_proxies = [(p["ip"], p["port"]) for p in current_proxy_results]
new_proxies = []
for proxy in submitted_proxies:
if (ip_address(proxy["ip"]), int(proxy["port"])) not in current_proxies:
new_proxies.append(
(
proxy["ip"],
int(proxy["port"]),
proxy["proxy_type"],
datetime.now(tz=timezone.utc),
proxy.get("comment", ""),
)
)
async with app.ctx.pool.acquire() as connection:
result = await connection.copy_records_to_table(
"proxies",
records=new_proxies,
columns=[
"ip",
"port",
"proxy_type",
"submitted_at",
"comment",
],
)
return response.text(f"ok, added {len(new_proxies)} new proxies")
@app.route("/api/v1/getproxies")
async def getproxies(request):
proxy_types = request.args["proxy_type"]
if not proxy_types:
return response.text("No proxy_type specified", status=400)
amount = int(request.args.get("amount"))
tdelta = int(request.args.get("timedelta"))
max_num_failures = int(request.args.get("max_num_failures"))
select_query = """
select id, host(ip) as ip, port, host(exit_ip) as exit_ip, proxy_type, comment from proxies
where status=ANY($1) and num_failures < $2 and last_checked < $3
and proxy_type=ANY($4) order by last_checked asc limit $5"""
async with app.ctx.pool.acquire() as connection:
proxy_results = await connection.fetch(
select_query,
["unscanned"],
max_num_failures,
datetime.now(tz=timezone.utc),
proxy_types,
amount,
)
proxies = [dict(p) for p in proxy_results]
if len(proxies) < amount:
more_proxy_results = await connection.fetch(
select_query,
["active", "inactive"],
max_num_failures,
datetime.now(tz=timezone.utc) - timedelta(seconds=tdelta),
proxy_types,
amount - len(proxies),
)
proxies.extend([dict(p) for p in more_proxy_results])
proxy_ids = [p["id"] for p in proxies]
await connection.fetch("update proxies set status='scanning' where id = ANY($1)", proxy_ids)
return response.json({"proxies": proxies}, status=200)
@app.route("/api/v1/updateproxies", methods=["POST"])
async def updateproxies(request):
data = request.json
submitted_proxies = data.get("proxies")
if not submitted_proxies:
return response.text("no proxies given", status=400)
update_active_query = """
update proxies set exit_ip=$1, proxy_type=$2, status=$3,
last_seen=$4, last_checked=$4, comment=$5, num_failures=0 where id=$6"""
update_inactive_query = """
update proxies set status=$1, last_checked=$2, comment=$3,
num_failures=num_failures + 1 where id=$4"""
async with app.ctx.pool.acquire() as connection:
for proxy in submitted_proxies:
if proxy["status"] == "active":
await connection.fetch(
update_active_query,
proxy["exit_ip"],
proxy["proxy_type"],
proxy["status"],
datetime.now(tz=timezone.utc),
proxy["comment"],
proxy["id"]
)
elif proxy["status"] == "inactive":
await connection.fetch(
update_inactive_query,
proxy["status"],
datetime.now(tz=timezone.utc),
proxy["comment"],
proxy["id"]
)
return response.text("ok")
@app.listener("before_server_start")
async def start_db(app, loop):
DB_CONFIG = {
"host": config["db_host"],
"port": config["db_port"],
"user": config["db_user"],
"password": config["db_password"],
"database": config["db_name"],
}
app.ctx.pool = await asyncpg.create_pool(**DB_CONFIG, loop=loop, max_size=100)
if __name__ == "__main__":
app.run(host=config["listen_host"], port=config["listen_port"], access_log=config["access_log"])