WhisperMaPhone/thoughts/views.py

218 lines
8.1 KiB
Python

import os.path
import uuid
import subprocess
import base64
import collections
from itertools import islice
import magic
from django import forms
from django.shortcuts import render, redirect
from django.utils.crypto import constant_time_compare
from haystack.views import SearchView, search_view_factory
from haystack.query import SearchQuerySet
from haystack.forms import SearchForm
from whispermaphone import settings
from .models import Thought, ThoughtForm, ALLOWED_MEDIA_TYPES
from .pagination import get_all_pages, get_page_slug
# Python's itertools standard library is bad
# Instead of providing functions
# They provide "receipes" that you can copy-paste into your own program
# => https://docs.python.org/3/library/itertools.html#itertools-recipes
def sliding_window(iterable, n):
# sliding_window('ABCDEFG', 4) --> ABCD BCDE CDEF DEFG
it = iter(iterable)
window = collections.deque(islice(it, n), maxlen=n)
if len(window) == n:
yield tuple(window)
for x in it:
window.append(x)
yield tuple(window)
def check_authenticated(request):
authenticated = False
try:
if constant_time_compare(request.COOKIES["password"], settings.PASSWORD):
authenticated = True
except KeyError:
pass
return authenticated
def index(request):
authenticated = check_authenticated(request)
try:
highlighted_uuid = uuid.UUID(request.GET.get("show", ""))
except ValueError:
highlighted_uuid = ""
## Figure out what page we're viewing
pages = get_all_pages()
requested_slug = request.GET.get("page", default="")
# If we've passed a valid highlighted thought uuid,
# that takes priority over a page= value, so we overwrite requested_slug
if highlighted_uuid:
try:
requested_slug = get_page_slug(Thought.objects.get(uuid=highlighted_uuid))
except Thought.DoesNotExist:
pass
# When we get here, either:
# no slug was passed, requested_slug is ""
# a valid highlighted_uuid was passed, requested_slug is the slug of that page
# requested_slug is an invalid slug
# requested_slug is a valid slug
# First item in pages should be the default
previous_page, current_page, next_page = None, pages[0], pages[1]
# We don't have any way of validating slugs or anything,
# so we just have to loop though and see if the requested slug matches any pages
for (prev, curr, nex) in sliding_window(pages, 3):
if curr.slug == requested_slug:
previous_page, current_page, next_page = prev, curr, nex
# If you requested the last page, then this runs on the last iteration
# It runs right before every matching slug, but it only matters if it's the last iteration
if nex.slug == requested_slug:
previous_page, current_page, next_page = curr, nex, None
thoughts = current_page.get_all_entries()
return render(request, "thoughts/index.html", {
"thoughts": thoughts,
"highlighted": highlighted_uuid,
"authenticated": authenticated,
"page": current_page,
"previous_page_slug": getattr(previous_page, "slug", None),
"next_page_slug": getattr(next_page, "slug", None),
"is_first_page": current_page.slug == pages[0].slug, # if you're viewing the first page
"is_last_page": current_page.slug == pages[-1].slug
})
def login(request):
if check_authenticated(request):
return redirect("post")
if request.method == "POST":
if constant_time_compare(request.POST["password"], settings.PASSWORD):
res = redirect("post")
res.set_cookie("password", request.POST["password"], max_age=60*60*24*365) # 1 year
return res
# Returning 401 here causes `links` to always prompt for HTTP basic auth, which is annoying.
# But the alternative is not following the HTTP spec, so I think this is fine.
return render(request, "thoughts/login.html", status=401)
def post(request):
if not check_authenticated(request):
return redirect("login")
editing = request.GET.get("editing", None)
try:
editing_thought = Thought.objects.get(uuid=editing)
editing_thought.timezone_offset = - editing_thought.timezone_offset / 60
except Thought.DoesNotExist:
editing_thought = None
if request.method == "POST":
# We post in hours, so we need to convert back to minutes for saving
# We can pass errors since form.is_valid catches most of them
# We just need to convert hours to minutes first, because otherwise it errors on non-integer values
values = request.POST.copy()
try:
values["timezone_offset"] = - float(values["timezone_offset"]) * 60
except (ValueError, KeyError):
pass
thought_form = ThoughtForm(values, request.FILES, instance=editing_thought or Thought())
if not thought_form.is_valid():
errors = thought_form.errors.as_data()
# Media formatting errors
try:
problem_field = list(errors.keys())[0]
message = list(errors.values())[0][0].messages[0]
error_line = f"{problem_field[0].upper()}{problem_field[1:]}: {message}"
except:
error_line = f"An unknown error occurred processing your request: {errors}"
return render(request, "thoughts/post.html", {
"form": thought_form,
"form_error": error_line
}, status=400)
if editing_thought:
thought = editing_thought
else:
# Create a thought object we can work with
# But don't save it the DB yet
thought = thought_form.save(commit=False)
# Do media processing (already validated)
if "media" in request.FILES:
chunk = next(thought.media.chunks(chunk_size=2048))
media_type = magic.from_buffer(chunk, mime="True")
thought.media.name = f"{thought.uuid}.{ALLOWED_MEDIA_TYPES[media_type]}"
if media_type == "audio/x-m4a" or media_type == "audio/x-hx-aac-adts":
# This is a hack-fix because I want to be able to upload audio
# In the future, this should be refactored to convert file types
# using ffmpeg.js on the client side (so there are 0 security concerns)
# and then the backend just has to check a 3 item whitelist
thought.save() # Save so that we have a file to work with
subprocess.run(["ffmpeg",
"-i", thought.media.path,
"-codec:a", "mp3", "-vn",
os.path.join(settings.MEDIA_ROOT, f"{thought.uuid}.mp3")
], check=True)
os.remove(os.path.join(settings.MEDIA_ROOT, thought.media.name)) # Remove the original file
thought.media.name = f"{thought.uuid}.mp3" # Update the file in the DB
# We need to make sure that if we remove an image, the alt text is removed with it
if not thought.media:
thought.media_alt = ""
# Save for real
thought.save()
# Redirect to the same page, so that we make a GET request to /post
if editing_thought:
return redirect(editing_thought)
return redirect("post")
return render(request, "thoughts/post.html", {
"form": ThoughtForm(instance=editing_thought) if editing_thought else ThoughtForm(),
"editing": not not editing_thought,
})
def about(request):
return render(request, "thoughts/about.html", {
"authenticated": check_authenticated(request)
})
class ThoughtsSearchForm(SearchForm):
q = forms.CharField(
required=False,
widget=forms.TextInput(attrs={"type": "search", "placeholder": "Search query"}),
)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
search = search_view_factory(
template="thoughts/search/search.html",
view_class=SearchView,
form_class=ThoughtsSearchForm,
searchqueryset=SearchQuerySet().order_by("-posted")
)