175 lines
6.2 KiB
Python
175 lines
6.2 KiB
Python
import os.path
|
|
import uuid
|
|
import subprocess
|
|
import base64
|
|
|
|
import magic
|
|
|
|
from django import forms
|
|
from django.shortcuts import render, redirect
|
|
from django.utils.crypto import constant_time_compare
|
|
|
|
from haystack.views import SearchView, search_view_factory
|
|
from haystack.query import SearchQuerySet
|
|
from haystack.forms import SearchForm
|
|
|
|
from whispermaphone import settings
|
|
from .models import Thought, ThoughtForm, ALLOWED_MEDIA_TYPES
|
|
from .pagination import get_all_pages, get_page_slug, get_page
|
|
|
|
def check_authenticated(request):
|
|
authenticated = False
|
|
try:
|
|
if constant_time_compare(request.COOKIES["password"], settings.PASSWORD):
|
|
authenticated = True
|
|
except KeyError:
|
|
pass
|
|
return authenticated
|
|
|
|
|
|
def index(request):
|
|
authenticated = check_authenticated(request)
|
|
|
|
try:
|
|
highlighted_uuid = uuid.UUID(request.GET.get("show", ""))
|
|
except ValueError:
|
|
highlighted_uuid = ""
|
|
|
|
## Figure out what page we're viewing
|
|
pages = get_all_pages()
|
|
|
|
requested_slug = request.GET.get("page", default="")
|
|
requested_page = get_page(requested_slug, highlighted_uuid)
|
|
|
|
thoughts = requested_page.get_all_entries()
|
|
|
|
return render(request, "thoughts/index.html", {
|
|
"thoughts": thoughts,
|
|
"highlighted": highlighted_uuid,
|
|
"authenticated": authenticated,
|
|
"pages": pages,
|
|
"current_page_slug": requested_page.slug,
|
|
"first_page": requested_page.slug == pages[0].slug # if you're viewing the first page
|
|
})
|
|
|
|
|
|
def login(request):
|
|
if check_authenticated(request):
|
|
return redirect("post")
|
|
|
|
if request.method == "POST":
|
|
if constant_time_compare(request.POST["password"], settings.PASSWORD):
|
|
res = redirect("post")
|
|
res.set_cookie("password", request.POST["password"], max_age=60*60*24*365) # 1 year
|
|
return res
|
|
|
|
# Returning 401 here causes `links` to always prompt for HTTP basic auth, which is annoying.
|
|
# But the alternative is not following the HTTP spec, so I think this is fine.
|
|
return render(request, "thoughts/login.html", status=401)
|
|
|
|
|
|
def post(request):
|
|
if not check_authenticated(request):
|
|
return redirect("login")
|
|
|
|
editing = request.GET.get("editing", None)
|
|
try:
|
|
editing_thought = Thought.objects.get(uuid=editing)
|
|
editing_thought.timezone_offset = - editing_thought.timezone_offset / 60
|
|
except Thought.DoesNotExist:
|
|
editing_thought = None
|
|
|
|
if request.method == "POST":
|
|
# We post in hours, so we need to convert back to minutes for saving
|
|
# We can pass errors since form.is_valid catches most of them
|
|
# We just need to convert hours to minutes first, because otherwise it errors on non-integer values
|
|
values = request.POST.copy()
|
|
try:
|
|
values["timezone_offset"] = - float(values["timezone_offset"]) * 60
|
|
except (ValueError, KeyError):
|
|
pass
|
|
|
|
thought_form = ThoughtForm(values, request.FILES, instance=editing_thought or Thought())
|
|
|
|
if not thought_form.is_valid():
|
|
errors = thought_form.errors.as_data()
|
|
# Media formatting errors
|
|
try:
|
|
problem_field = list(errors.keys())[0]
|
|
message = list(errors.values())[0][0].messages[0]
|
|
error_line = f"{problem_field[0].upper()}{problem_field[1:]}: {message}"
|
|
except:
|
|
error_line = f"An unknown error occurred processing your request: {errors}"
|
|
|
|
return render(request, "thoughts/post.html", {
|
|
"form": thought_form,
|
|
"form_error": error_line
|
|
}, status=400)
|
|
|
|
if editing_thought:
|
|
thought = editing_thought
|
|
else:
|
|
# Create a thought object we can work with
|
|
# But don't save it the DB yet
|
|
thought = thought_form.save(commit=False)
|
|
|
|
# Do media processing (already validated)
|
|
if "media" in request.FILES:
|
|
chunk = next(thought.media.chunks(chunk_size=2048))
|
|
media_type = magic.from_buffer(chunk, mime="True")
|
|
|
|
thought.media.name = f"{thought.uuid}.{ALLOWED_MEDIA_TYPES[media_type]}"
|
|
|
|
if media_type == "audio/x-m4a" or media_type == "audio/x-hx-aac-adts":
|
|
# This is a hack-fix because I want to be able to upload audio
|
|
# In the future, this should be refactored to convert file types
|
|
# using ffmpeg.js on the client side (so there are 0 security concerns)
|
|
# and then the backend just has to check a 3 item whitelist
|
|
thought.save() # Save so that we have a file to work with
|
|
subprocess.run(["ffmpeg",
|
|
"-i", thought.media.path,
|
|
"-codec:a", "mp3", "-vn",
|
|
os.path.join(settings.MEDIA_ROOT, f"{thought.uuid}.mp3")
|
|
], check=True)
|
|
os.remove(os.path.join(settings.MEDIA_ROOT, thought.media.name)) # Remove the original file
|
|
thought.media.name = f"{thought.uuid}.mp3" # Update the file in the DB
|
|
|
|
# We need to make sure that if we remove an image, the alt text is removed with it
|
|
if not thought.media:
|
|
thought.media_alt = ""
|
|
|
|
# Save for real
|
|
thought.save()
|
|
|
|
# Redirect to the same page, so that we make a GET request to /post
|
|
if editing_thought:
|
|
return redirect(editing_thought)
|
|
return redirect("post")
|
|
|
|
return render(request, "thoughts/post.html", {
|
|
"form": ThoughtForm(instance=editing_thought) if editing_thought else ThoughtForm(),
|
|
"editing": not not editing_thought,
|
|
})
|
|
|
|
|
|
def about(request):
|
|
return render(request, "thoughts/about.html", {
|
|
"authenticated": check_authenticated(request)
|
|
})
|
|
|
|
class ThoughtsSearchForm(SearchForm):
|
|
q = forms.CharField(
|
|
required=False,
|
|
widget=forms.TextInput(attrs={"type": "search", "placeholder": "Search query"}),
|
|
)
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
|
|
search = search_view_factory(
|
|
template="thoughts/search/search.html",
|
|
view_class=SearchView,
|
|
form_class=ThoughtsSearchForm,
|
|
searchqueryset=SearchQuerySet().order_by("-posted")
|
|
)
|