169 lines
6.0 KiB
Python
169 lines
6.0 KiB
Python
import os.path
|
|
import uuid
|
|
import subprocess
|
|
import base64
|
|
|
|
import magic
|
|
|
|
from django.shortcuts import render, redirect
|
|
from django.utils.crypto import constant_time_compare
|
|
|
|
from whispermaphone import settings
|
|
from .models import Thought, ThoughtForm, ALLOWED_MEDIA_TYPES
|
|
|
|
from .pagination import get_all_pages, get_page_slug
|
|
|
|
def check_authenticated(request):
|
|
authenticated = False
|
|
try:
|
|
if constant_time_compare(request.COOKIES["password"], settings.PASSWORD):
|
|
print("Authorization success")
|
|
authenticated = True
|
|
except KeyError:
|
|
pass
|
|
return authenticated
|
|
|
|
|
|
def index(request):
|
|
authenticated = check_authenticated(request)
|
|
|
|
try:
|
|
highlighted_uuid = uuid.UUID(request.GET.get("show", ""))
|
|
except ValueError:
|
|
highlighted_uuid = ""
|
|
|
|
# Figure out what page we're viewing
|
|
pages = get_all_pages()
|
|
|
|
# First item in pages should be listed first
|
|
requested_page = pages[0]
|
|
requested_slug = request.GET.get("page", default=requested_page.slug)
|
|
|
|
# show=uuid takes priority over page
|
|
if highlighted_uuid:
|
|
try:
|
|
highlighted_thought = Thought.objects.get(uuid=highlighted_uuid)
|
|
requested_slug = get_page_slug(highlighted_thought)
|
|
except Thought.DoesNotExist:
|
|
pass
|
|
|
|
if requested_page.slug != requested_slug:
|
|
for p in pages:
|
|
if p.slug == requested_slug:
|
|
requested_page = p
|
|
|
|
thoughts = requested_page.get_all_entries()
|
|
|
|
return render(request, "whispermaphone/index.html", {
|
|
"thoughts": thoughts,
|
|
"highlighted": highlighted_uuid,
|
|
"authenticated": authenticated,
|
|
"pages": pages,
|
|
"current_page": requested_slug,
|
|
"first_page": requested_page == pages[0] # if you're viewing the first page
|
|
})
|
|
|
|
|
|
def login(request):
|
|
if check_authenticated(request):
|
|
return redirect("post")
|
|
|
|
if request.method == "POST":
|
|
if constant_time_compare(request.POST["password"], settings.PASSWORD):
|
|
res = redirect("post")
|
|
res.set_cookie("password", request.POST["password"])
|
|
return res
|
|
|
|
# Returning 401 here causes `links` to always prompt for HTTP basic auth, which is annoying.
|
|
# But the alternative is not following the HTTP spec, so I think this is fine.
|
|
return render(request, "whispermaphone/login.html", status=401)
|
|
|
|
|
|
def post(request):
|
|
if not check_authenticated(request):
|
|
return redirect("login")
|
|
|
|
editing = request.GET.get("editing", None)
|
|
try:
|
|
editing_thought = Thought.objects.get(uuid=editing)
|
|
editing_thought.timezone_offset = - editing_thought.timezone_offset / 60
|
|
except Thought.DoesNotExist:
|
|
editing_thought = None
|
|
|
|
if request.method == "POST":
|
|
# We post in hours, so we need to convert back to minutes for saving
|
|
# We can pass errors since form.is_valid catches most of them
|
|
# We just need to convert hours to minutes first, because otherwise it errors on non-integer values
|
|
values = request.POST.copy()
|
|
try:
|
|
values["timezone_offset"] = - float(values["timezone_offset"]) * 60
|
|
except (ValueError, KeyError):
|
|
pass
|
|
|
|
thought_form = ThoughtForm(values, request.FILES, instance=editing_thought or Thought())
|
|
|
|
if not thought_form.is_valid():
|
|
errors = thought_form.errors.as_data()
|
|
# Media formatting errors
|
|
try:
|
|
problem_field = list(errors.keys())[0]
|
|
message = list(errors.values())[0][0].messages[0]
|
|
error_line = f"{problem_field[0].upper()}{problem_field[1:]}: {message}"
|
|
except:
|
|
error_line = f"An unknown error occurred processing your request: {errors}"
|
|
|
|
return render(request, "whispermaphone/post.html", {
|
|
"form": thought_form,
|
|
"form_error": error_line
|
|
}, status=400)
|
|
|
|
if editing_thought:
|
|
thought = editing_thought
|
|
else:
|
|
# Create a thought object we can work with
|
|
# But don't save it the DB yet
|
|
thought = thought_form.save(commit=False)
|
|
|
|
# Do media processing (already validated)
|
|
if "media" in request.FILES:
|
|
chunk = next(thought.media.chunks(chunk_size=2048))
|
|
media_type = magic.from_buffer(chunk, mime="True")
|
|
|
|
thought.media.name = f"{thought.uuid}.{ALLOWED_MEDIA_TYPES[media_type]}"
|
|
|
|
if media_type == "audio/x-m4a":
|
|
# This is a hack-fix because I want to be able to upload audio
|
|
# In the future, this should be refactored to convert file types
|
|
# using ffmpeg.js on the client side (so there are 0 security concerns)
|
|
# and then the backend just has to check a 3 item whitelist
|
|
thought.save() # Save so that we have a file to work with
|
|
subprocess.run(["ffmpeg",
|
|
"-i", thought.media.path,
|
|
"-codec:a", "aac", "-vn",
|
|
os.path.join(settings.MEDIA_ROOT, f"{thought.uuid}.aac")
|
|
], check=True)
|
|
os.remove(os.path.join(settings.MEDIA_ROOT, f"{thought.uuid}.m4a")) # Remove the original file
|
|
thought.media.name = f"{thought.uuid}.aac" # Update the file in the DB
|
|
else:
|
|
# We need to make sure that if we remove an image, the alt text is removed with it
|
|
thought.media_alt = ""
|
|
|
|
# Save for real
|
|
thought.save()
|
|
|
|
# Redirect to the same page, so that we make a GET request to /post
|
|
if editing_thought:
|
|
return redirect(editing_thought)
|
|
return redirect("post")
|
|
|
|
return render(request, "whispermaphone/post.html", {
|
|
"form": ThoughtForm(instance=editing_thought) if editing_thought else ThoughtForm(),
|
|
"editing": not not editing_thought,
|
|
})
|
|
|
|
|
|
def about(request):
|
|
return render(request, "whispermaphone/about.html", {
|
|
"authenticated": check_authenticated(request)
|
|
})
|