142 lines
5.0 KiB
Python
142 lines
5.0 KiB
Python
import asyncpg
|
|
import yaml
|
|
from datetime import datetime, timedelta, timezone
|
|
from ipaddress import ip_address
|
|
from sanic import Sanic, response
|
|
|
|
app = Sanic(name="openproxyherder")
|
|
config = yaml.full_load(open("config.yaml").read())
|
|
|
|
@app.route("/api/v1/add", methods=["POST"])
|
|
async def add(request):
|
|
data = request.json
|
|
submitted_proxies = data.get("proxies")
|
|
if not submitted_proxies:
|
|
return response.text("no proxies given", status=400)
|
|
|
|
submitted_ips = [proxy["ip"] for proxy in submitted_proxies]
|
|
|
|
async with app.pool.acquire() as connection:
|
|
current_proxy_results = await connection.fetch(
|
|
"select ip, port from proxies where host(ip) = ANY($1)", submitted_ips
|
|
)
|
|
current_proxies = [(p["ip"], p["port"]) for p in current_proxy_results]
|
|
|
|
new_proxies = []
|
|
for proxy in submitted_proxies:
|
|
if (ip_address(proxy["ip"]), int(proxy["port"])) not in current_proxies:
|
|
new_proxies.append(
|
|
(
|
|
proxy["ip"],
|
|
int(proxy["port"]),
|
|
proxy["proxy_type"],
|
|
datetime.now(tz=timezone.utc),
|
|
proxy.get("comment", ""),
|
|
)
|
|
)
|
|
async with app.ctx.pool.acquire() as connection:
|
|
result = await connection.copy_records_to_table(
|
|
"proxies",
|
|
records=new_proxies,
|
|
columns=[
|
|
"ip",
|
|
"port",
|
|
"proxy_type",
|
|
"submitted_at",
|
|
"comment",
|
|
],
|
|
)
|
|
|
|
return response.text(f"ok, added {len(new_proxies)} new proxies")
|
|
|
|
@app.route("/api/v1/getproxies")
|
|
async def getproxies(request):
|
|
proxy_types = request.args["proxy_type"]
|
|
if not proxy_types:
|
|
return response.text("No proxy_type specified", status=400)
|
|
amount = int(request.args.get("amount"))
|
|
tdelta = int(request.args.get("timedelta"))
|
|
max_num_failures = int(request.args.get("max_num_failures"))
|
|
|
|
select_query = """
|
|
select id, host(ip) as ip, port, host(exit_ip) as exit_ip, proxy_type, comment from proxies
|
|
where status=ANY($1) and num_failures < $2 and last_checked < $3
|
|
and proxy_type=ANY($4) order by last_checked asc limit $5"""
|
|
|
|
async with app.ctx.pool.acquire() as connection:
|
|
proxy_results = await connection.fetch(
|
|
select_query,
|
|
["unscanned"],
|
|
max_num_failures,
|
|
datetime.now(tz=timezone.utc),
|
|
proxy_types,
|
|
amount,
|
|
)
|
|
|
|
proxies = [dict(p) for p in proxy_results]
|
|
if len(proxies) < amount:
|
|
more_proxy_results = await connection.fetch(
|
|
select_query,
|
|
["active", "inactive"],
|
|
max_num_failures,
|
|
datetime.now(tz=timezone.utc) - timedelta(seconds=tdelta),
|
|
proxy_types,
|
|
amount - len(proxies),
|
|
)
|
|
proxies.extend([dict(p) for p in more_proxy_results])
|
|
proxy_ids = [p["id"] for p in proxies]
|
|
await connection.fetch("update proxies set status='scanning' where id = ANY($1)", proxy_ids)
|
|
|
|
return response.json({"proxies": proxies}, status=200)
|
|
|
|
@app.route("/api/v1/updateproxies", methods=["POST"])
|
|
async def updateproxies(request):
|
|
data = request.json
|
|
submitted_proxies = data.get("proxies")
|
|
if not submitted_proxies:
|
|
return response.text("no proxies given", status=400)
|
|
|
|
update_active_query = """
|
|
update proxies set exit_ip=$1, proxy_type=$2, status=$3,
|
|
last_seen=$4, last_checked=$4, comment=$5, num_failures=0 where id=$6"""
|
|
update_inactive_query = """
|
|
update proxies set status=$1, last_checked=$2, comment=$3,
|
|
num_failures=num_failures + 1 where id=$4"""
|
|
|
|
async with app.ctx.pool.acquire() as connection:
|
|
for proxy in submitted_proxies:
|
|
if proxy["status"] == "active":
|
|
await connection.fetch(
|
|
update_active_query,
|
|
proxy["exit_ip"],
|
|
proxy["proxy_type"],
|
|
proxy["status"],
|
|
datetime.now(tz=timezone.utc),
|
|
proxy["comment"],
|
|
proxy["id"]
|
|
)
|
|
elif proxy["status"] == "inactive":
|
|
await connection.fetch(
|
|
update_inactive_query,
|
|
proxy["status"],
|
|
datetime.now(tz=timezone.utc),
|
|
proxy["comment"],
|
|
proxy["id"]
|
|
)
|
|
|
|
return response.text("ok")
|
|
|
|
@app.listener("before_server_start")
|
|
async def start_db(app, loop):
|
|
DB_CONFIG = {
|
|
"host": config["db_host"],
|
|
"port": config["db_port"],
|
|
"user": config["db_user"],
|
|
"password": config["db_password"],
|
|
"database": config["db_name"],
|
|
}
|
|
app.ctx.pool = await asyncpg.create_pool(**DB_CONFIG, loop=loop, max_size=100)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
app.run(host=config["listen_host"], port=config["listen_port"], access_log=config["access_log"]) |