Compare commits

...

2 Commits

Author SHA1 Message Date
grym fb54971d14 Improve token flow 2023-02-10 20:59:20 -05:00
grym 3eead1a769 add secret= function 2023-02-10 20:43:15 -05:00
2 changed files with 65 additions and 31 deletions

94
oxo.py
View File

@ -44,37 +44,44 @@ def post_to(
data: t.Iterable[t.Dict],
expires: t.Optional[int],
token_dir: t.Optional[Path],
use_long_name: bool,
) -> str:
retval = []
with httpx.Client() as client:
data_dict = {}
if expires:
data_dict["expires"] = expires
if use_long_name:
data_dict["secret"] = use_long_name
for d in data:
if isinstance(d.get("file"), io.BufferedReader):
if expires:
res = client.post(
base_url, files=d, data={"expires": expires, "secret": ""}
)
else:
res = client.post(base_url, files=d)
token = res.headers.get("x-token")
remote_url = res.text.strip()
if token:
token_data = TokenData(
token=token,
oxo_url=remote_url,
)
err_console.print(f"To remove post, {token_data.curl_rm}")
err_console.print(
f"To update expiration date, {token_data.curl_expiry}"
)
if token_dir:
fname = Path(d.get("file").name).name
token_dir.joinpath(f"{fname}.token").write_text(
json.dumps(asdict(token_data))
)
res = client.post(base_url, files=d, data=data_dict)
else:
res = client.post(base_url, data=d)
remote_url = res.text.strip()
res = client.post(base_url, data={**d, **data_dict})
res.raise_for_status()
remote_url = res.text.strip()
token = res.headers.get("x-token")
if token:
token_data = TokenData(
token=token,
oxo_url=remote_url,
)
err_console.print(f"To remove post, {token_data.curl_rm}")
err_console.print(
f"To update expiration date, {token_data.curl_expiry}"
)
if token_dir:
# todo rewrite when drop 3.7
if d.get("url"):
fname = d.get("url").rpartition("/")[2]
elif d.get("file").name:
fname = Path(d.get("file").name).name
else:
raise ValueError(f"{d}")
token_dir.joinpath(f"{fname}.token").write_text(
json.dumps(asdict(token_data))
)
retval.append(remote_url)
return " ".join(retval)
@ -85,12 +92,14 @@ def post_files(
*,
expires: t.Optional[int],
token_dir: t.Optional[Path],
use_long_name: bool,
):
return post_to(
base_url,
data=({"file": f.open("rb")} for f in files),
expires=expires,
token_dir=token_dir,
use_long_name=use_long_name,
)
@ -100,12 +109,14 @@ def post_repost(
*,
expires: t.Optional[int],
token_dir: t.Optional[Path],
use_long_name: bool,
):
return post_to(
base_url,
data=({"url": u.strip()} for u in urls),
expires=expires,
token_dir=token_dir,
use_long_name=use_long_name,
)
@ -121,6 +132,7 @@ def post_shorten(
data=({"shorten": u.strip()} for u in urls),
expires=expires,
token_dir=token_dir,
use_long_name=False,
)
@ -152,6 +164,9 @@ def files(
files: t.List[Path] = typer.Argument(
..., min=1, exists=True, file_okay=True, dir_okay=False, resolve_path=True
),
use_long_name: bool = typer.Option(
False, help="If true, create a harder to guess name."
),
base_url: str = BASE_URL,
expires: t.Optional[int] = typer.Option(
None, help="Expiration time, in hours or epoch milliseconds"
@ -179,12 +194,23 @@ def files(
token_dir.mkdir(exist_ok=True, parents=True)
else:
token_dir = None
typer.echo(post_files(base_url, files, expires=expires, token_dir=token_dir))
typer.echo(
post_files(
base_url,
files,
expires=expires,
token_dir=token_dir,
use_long_name=use_long_name,
)
)
@app.command()
def repost(
urls: t.List[str] = typer.Argument(..., min=1),
use_long_name: bool = typer.Option(
False, help="If true, create a harder to guess name."
),
base_url=BASE_URL,
expires: t.Optional[int] = typer.Option(
None, help="Expiration time, in hours or epoch milliseconds"
@ -192,7 +218,15 @@ def repost(
token_cache_dir: str = TOKEN_CACHE_DIR,
):
"""Repost one or more urls."""
typer.echo(post_repost(base_url, urls, expires, token_cache_dir))
typer.echo(
post_repost(
base_url,
urls,
expires=expires,
token_dir=token_cache_dir,
use_long_name=use_long_name,
)
)
@app.command()
@ -234,12 +268,12 @@ def delete(
token_files = token_cache_dir.glob("*.token")
tokens = []
good_tokens = []
for t in token_files:
for token in token_files:
try:
tokens.append(TokenData(**json.loads(t.read_text())))
good_tokens.append(t)
tokens.append(TokenData(**json.loads(token.read_text())))
good_tokens.append(token)
except Exception as e:
typer.secho(f"Ignoring {t}: {e}", err=True, fg=typer.colors.YELLOW)
typer.secho(f"Ignoring {token}: {e}", err=True, fg=typer.colors.YELLOW)
with httpx.Client() as client:
for token, token_file in zip(tokens, good_tokens):
try:

View File

@ -14,7 +14,7 @@ dependencies = ["typer[all]",
'importlib-metadata; python_version<"3.8"']
[project.optional-dependencies]
dev = ["pre-commit","pytest", "ipython"]
dev = ["pre-commit","pytest", "ipython", "ruff", "pudb"]
[project.scripts]
oxo = "oxo:app"