WhisperMaPhone/thoughts/views.py

255 lines
9.5 KiB
Python

import os.path
import uuid
import subprocess
import base64
import collections
import hashlib
from itertools import islice
import magic
from django import forms
from django.shortcuts import render, redirect
from django.utils.crypto import constant_time_compare
from django.views.decorators.http import condition
from haystack.views import SearchView, search_view_factory
from haystack.query import SearchQuerySet
from haystack.forms import SearchForm
from whispermaphone import settings
from .models import Thought, ThoughtForm, ALLOWED_MEDIA_TYPES
from .pagination import get_all_pages, get_page_slug
# Python's itertools standard library is bad
# Instead of providing functions
# They provide "recipes" that you can copy-paste into your own program
# => https://docs.python.org/3/library/itertools.html#itertools-recipes
def sliding_window(iterable, n):
# sliding_window('ABCDEFG', 4) --> ABCD BCDE CDEF DEFG
it = iter(iterable)
window = collections.deque(islice(it, n), maxlen=n)
if len(window) == n:
yield tuple(window)
for x in it:
window.append(x)
yield tuple(window)
def check_authenticated(request):
authenticated = False
try:
if constant_time_compare(request.COOKIES["password"], settings.PASSWORD):
authenticated = True
except KeyError:
pass
return authenticated
def get_highlighted_uuid(request):
try:
return uuid.UUID(request.GET.get("show", ""))
except ValueError:
return ""
# Return the previous, current, and next page
def get_pages(request):
## Figure out what page we're viewing
pages = get_all_pages()
requested_slug = request.GET.get("page", default="")
# If we've passed a valid highlighted thought uuid,
# that takes priority over a page= value, so we overwrite requested_slug
highlighted_uuid = get_highlighted_uuid(request)
if highlighted_uuid:
try:
requested_slug = get_page_slug(Thought.objects.get(uuid=highlighted_uuid))
except Thought.DoesNotExist:
pass
# When we get here, either:
# no slug was passed, requested_slug is ""
# a valid highlighted_uuid was passed, requested_slug is the slug of that page
# requested_slug is an invalid slug
# requested_slug is a valid slug
# First item in pages should be the default
previous_page, current_page, next_page = None, pages[0], (pages[1] if len(pages) > 1 else None)
# We don't have any way of validating slugs or anything,
# so we just have to loop though and see if the requested slug matches any pages
for (prev, curr, nex) in sliding_window(pages, 3):
if curr.slug == requested_slug:
previous_page, current_page, next_page = prev, curr, nex
# If you requested the last page, then this runs on the last iteration
# It runs right before every matching slug, but it only matters if it's the last iteration
if nex.slug == requested_slug:
previous_page, current_page, next_page = curr, nex, None
return previous_page, current_page, next_page
# Takes a Queryset of Thoughts and returns a hash of:
# the last-modified time for the most recently modified
# the number of thoughts
# and the current git commit (read from settings (which reads it from the git_commit file or randomly generates an id))
# I can't think of any edge cases. It's impossible to change page without changing the query params, the time, or the count
# I still need to double check that different query params are treated differently
def get_etag_from_thoughts(thoughts):
count = thoughts.count()
last_updated = thoughts.order_by("-last_updated").first().last_updated
# Do I need to include query params? I'd hope that it was seen as a different URL
hash_str = f"{count}:{last_updated}:{settings.COMMIT}"
return hashlib.sha256(hash_str.encode()).hexdigest()
def get_etag(request):
previous_page, current_page, next_page = get_pages(request)
thoughts = current_page.get_all_entries()
# We're supposed to include quotes
return f'"{get_etag_from_thoughts(thoughts)}"'
@condition(etag_func=get_etag)
def index(request):
authenticated = check_authenticated(request)
previous_page, current_page, next_page = get_pages(request)
thoughts = current_page.get_all_entries()
highlighted_uuid = get_highlighted_uuid(request)
return render(request, "thoughts/index.html", {
"thoughts": thoughts,
"highlighted": highlighted_uuid,
"authenticated": authenticated,
"page": current_page,
"previous_page_slug": getattr(previous_page, "slug", None),
"next_page_slug": getattr(next_page, "slug", None),
"is_first_page": previous_page is None, # if you're viewing the first page
"is_last_page": next_page is None
})
def login(request):
if check_authenticated(request):
return redirect("post")
if request.method == "POST":
if constant_time_compare(request.POST["password"], settings.PASSWORD):
res = redirect("post")
res.set_cookie("password", request.POST["password"], max_age=60*60*24*365) # 1 year
return res
# Returning 401 here causes `links` to always prompt for HTTP basic auth, which is annoying.
# But the alternative is not following the HTTP spec, so I think this is fine.
return render(request, "thoughts/login.html", status=401)
def post(request):
if not check_authenticated(request):
return redirect("login")
editing = request.GET.get("editing", None)
try:
editing_thought = Thought.objects.get(uuid=editing)
editing_thought.timezone_offset = - editing_thought.timezone_offset / 60
except Thought.DoesNotExist:
editing_thought = None
if request.method == "POST":
# We post in hours, so we need to convert back to minutes for saving
# We can pass errors since form.is_valid catches most of them
# We just need to convert hours to minutes first, because otherwise it errors on non-integer values
values = request.POST.copy()
try:
values["timezone_offset"] = - float(values["timezone_offset"]) * 60
except (ValueError, KeyError):
pass
thought_form = ThoughtForm(values, request.FILES, instance=editing_thought or Thought())
if not thought_form.is_valid():
errors = thought_form.errors.as_data()
# Media formatting errors
try:
problem_field = list(errors.keys())[0]
message = list(errors.values())[0][0].messages[0]
error_line = f"{problem_field[0].upper()}{problem_field[1:]}: {message}"
except:
error_line = f"An unknown error occurred processing your request: {errors}"
return render(request, "thoughts/post.html", {
"form": thought_form,
"form_error": error_line
}, status=400)
if editing_thought:
thought = editing_thought
else:
# Create a thought object we can work with
# But don't save it the DB yet
thought = thought_form.save(commit=False)
# Do media processing (already validated)
if "media" in request.FILES:
chunk = next(thought.media.chunks(chunk_size=2048))
media_type = magic.from_buffer(chunk, mime="True")
thought.media.name = f"{thought.uuid}.{ALLOWED_MEDIA_TYPES[media_type]}"
if media_type == "audio/x-m4a" or media_type == "audio/x-hx-aac-adts":
# This is a hack-fix because I want to be able to upload audio
# In the future, this should be refactored to convert file types
# using ffmpeg.js on the client side (so there are 0 security concerns)
# and then the backend just has to check a 3 item whitelist
thought.save() # Save so that we have a file to work with
subprocess.run(["ffmpeg",
"-i", thought.media.path,
"-codec:a", "mp3", "-vn",
os.path.join(settings.MEDIA_ROOT, f"{thought.uuid}.mp3")
], check=True)
os.remove(os.path.join(settings.MEDIA_ROOT, thought.media.name)) # Remove the original file
thought.media.name = f"{thought.uuid}.mp3" # Update the file in the DB
# We need to make sure that if we remove an image, the alt text is removed with it
if not thought.media:
thought.media_alt = ""
# Save for real
thought.save()
# Redirect to the same page, so that we make a GET request to /post
if editing_thought:
return redirect(editing_thought)
return redirect("post")
return render(request, "thoughts/post.html", {
"form": ThoughtForm(instance=editing_thought) if editing_thought else ThoughtForm(),
"editing": not not editing_thought,
})
def about(request):
return render(request, "thoughts/about.html", {
"authenticated": check_authenticated(request)
})
class ThoughtsSearchForm(SearchForm):
q = forms.CharField(
required=False,
widget=forms.TextInput(attrs={"type": "search", "placeholder": "Search query"}),
)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
search = search_view_factory(
template="thoughts/search/search.html",
view_class=SearchView,
form_class=ThoughtsSearchForm,
searchqueryset=SearchQuerySet().order_by("-posted")
)