255 lines
9.5 KiB
Python
255 lines
9.5 KiB
Python
import os.path
|
|
import uuid
|
|
import subprocess
|
|
import base64
|
|
import collections
|
|
import hashlib
|
|
from itertools import islice
|
|
|
|
import magic
|
|
|
|
from django import forms
|
|
from django.shortcuts import render, redirect
|
|
from django.utils.crypto import constant_time_compare
|
|
from django.views.decorators.http import condition
|
|
|
|
from haystack.views import SearchView, search_view_factory
|
|
from haystack.query import SearchQuerySet
|
|
from haystack.forms import SearchForm
|
|
|
|
from whispermaphone import settings
|
|
from .models import Thought, ThoughtForm, ALLOWED_MEDIA_TYPES
|
|
from .pagination import get_all_pages, get_page_slug
|
|
|
|
# Python's itertools standard library is bad
|
|
# Instead of providing functions
|
|
# They provide "recipes" that you can copy-paste into your own program
|
|
# => https://docs.python.org/3/library/itertools.html#itertools-recipes
|
|
def sliding_window(iterable, n):
|
|
# sliding_window('ABCDEFG', 4) --> ABCD BCDE CDEF DEFG
|
|
it = iter(iterable)
|
|
window = collections.deque(islice(it, n), maxlen=n)
|
|
if len(window) == n:
|
|
yield tuple(window)
|
|
for x in it:
|
|
window.append(x)
|
|
yield tuple(window)
|
|
|
|
|
|
def check_authenticated(request):
|
|
authenticated = False
|
|
try:
|
|
if constant_time_compare(request.COOKIES["password"], settings.PASSWORD):
|
|
authenticated = True
|
|
except KeyError:
|
|
pass
|
|
return authenticated
|
|
|
|
def get_highlighted_uuid(request):
|
|
try:
|
|
return uuid.UUID(request.GET.get("show", ""))
|
|
except ValueError:
|
|
return ""
|
|
|
|
# Return the previous, current, and next page
|
|
def get_pages(request):
|
|
## Figure out what page we're viewing
|
|
pages = get_all_pages()
|
|
|
|
requested_slug = request.GET.get("page", default="")
|
|
|
|
# If we've passed a valid highlighted thought uuid,
|
|
# that takes priority over a page= value, so we overwrite requested_slug
|
|
highlighted_uuid = get_highlighted_uuid(request)
|
|
if highlighted_uuid:
|
|
try:
|
|
requested_slug = get_page_slug(Thought.objects.get(uuid=highlighted_uuid))
|
|
except Thought.DoesNotExist:
|
|
pass
|
|
|
|
# When we get here, either:
|
|
# no slug was passed, requested_slug is ""
|
|
# a valid highlighted_uuid was passed, requested_slug is the slug of that page
|
|
# requested_slug is an invalid slug
|
|
# requested_slug is a valid slug
|
|
# First item in pages should be the default
|
|
previous_page, current_page, next_page = None, pages[0], (pages[1] if len(pages) > 1 else None)
|
|
# We don't have any way of validating slugs or anything,
|
|
# so we just have to loop though and see if the requested slug matches any pages
|
|
for (prev, curr, nex) in sliding_window(pages, 3):
|
|
if curr.slug == requested_slug:
|
|
previous_page, current_page, next_page = prev, curr, nex
|
|
# If you requested the last page, then this runs on the last iteration
|
|
# It runs right before every matching slug, but it only matters if it's the last iteration
|
|
if nex.slug == requested_slug:
|
|
previous_page, current_page, next_page = curr, nex, None
|
|
|
|
return previous_page, current_page, next_page
|
|
|
|
|
|
# Takes a Queryset of Thoughts and returns a hash of:
|
|
# the last-modified time for the most recently modified
|
|
# the number of thoughts
|
|
# and the current git commit (read from settings (which reads it from the git_commit file or randomly generates an id))
|
|
# I can't think of any edge cases. It's impossible to change page without changing the query params, the time, or the count
|
|
# I still need to double check that different query params are treated differently
|
|
def get_etag_from_thoughts(thoughts):
|
|
count = thoughts.count()
|
|
last_updated = thoughts.order_by("-last_updated").first().last_updated
|
|
# Do I need to include query params? I'd hope that it was seen as a different URL
|
|
|
|
hash_str = f"{count}:{last_updated}:{settings.COMMIT}"
|
|
return hashlib.sha256(hash_str.encode()).hexdigest()
|
|
|
|
|
|
def get_etag(request):
|
|
previous_page, current_page, next_page = get_pages(request)
|
|
|
|
thoughts = current_page.get_all_entries()
|
|
|
|
# We're supposed to include quotes
|
|
return f'"{get_etag_from_thoughts(thoughts)}"'
|
|
|
|
|
|
@condition(etag_func=get_etag)
|
|
def index(request):
|
|
authenticated = check_authenticated(request)
|
|
|
|
previous_page, current_page, next_page = get_pages(request)
|
|
|
|
thoughts = current_page.get_all_entries()
|
|
|
|
highlighted_uuid = get_highlighted_uuid(request)
|
|
|
|
return render(request, "thoughts/index.html", {
|
|
"thoughts": thoughts,
|
|
"highlighted": highlighted_uuid,
|
|
"authenticated": authenticated,
|
|
"page": current_page,
|
|
"previous_page_slug": getattr(previous_page, "slug", None),
|
|
"next_page_slug": getattr(next_page, "slug", None),
|
|
"is_first_page": previous_page is None, # if you're viewing the first page
|
|
"is_last_page": next_page is None
|
|
})
|
|
|
|
|
|
def login(request):
|
|
if check_authenticated(request):
|
|
return redirect("post")
|
|
|
|
if request.method == "POST":
|
|
if constant_time_compare(request.POST["password"], settings.PASSWORD):
|
|
res = redirect("post")
|
|
res.set_cookie("password", request.POST["password"], max_age=60*60*24*365) # 1 year
|
|
return res
|
|
|
|
# Returning 401 here causes `links` to always prompt for HTTP basic auth, which is annoying.
|
|
# But the alternative is not following the HTTP spec, so I think this is fine.
|
|
return render(request, "thoughts/login.html", status=401)
|
|
|
|
|
|
def post(request):
|
|
if not check_authenticated(request):
|
|
return redirect("login")
|
|
|
|
editing = request.GET.get("editing", None)
|
|
try:
|
|
editing_thought = Thought.objects.get(uuid=editing)
|
|
editing_thought.timezone_offset = - editing_thought.timezone_offset / 60
|
|
except Thought.DoesNotExist:
|
|
editing_thought = None
|
|
|
|
if request.method == "POST":
|
|
# We post in hours, so we need to convert back to minutes for saving
|
|
# We can pass errors since form.is_valid catches most of them
|
|
# We just need to convert hours to minutes first, because otherwise it errors on non-integer values
|
|
values = request.POST.copy()
|
|
try:
|
|
values["timezone_offset"] = - float(values["timezone_offset"]) * 60
|
|
except (ValueError, KeyError):
|
|
pass
|
|
|
|
thought_form = ThoughtForm(values, request.FILES, instance=editing_thought or Thought())
|
|
|
|
if not thought_form.is_valid():
|
|
errors = thought_form.errors.as_data()
|
|
# Media formatting errors
|
|
try:
|
|
problem_field = list(errors.keys())[0]
|
|
message = list(errors.values())[0][0].messages[0]
|
|
error_line = f"{problem_field[0].upper()}{problem_field[1:]}: {message}"
|
|
except:
|
|
error_line = f"An unknown error occurred processing your request: {errors}"
|
|
|
|
return render(request, "thoughts/post.html", {
|
|
"form": thought_form,
|
|
"form_error": error_line
|
|
}, status=400)
|
|
|
|
if editing_thought:
|
|
thought = editing_thought
|
|
else:
|
|
# Create a thought object we can work with
|
|
# But don't save it the DB yet
|
|
thought = thought_form.save(commit=False)
|
|
|
|
# Do media processing (already validated)
|
|
if "media" in request.FILES:
|
|
chunk = next(thought.media.chunks(chunk_size=2048))
|
|
media_type = magic.from_buffer(chunk, mime="True")
|
|
|
|
thought.media.name = f"{thought.uuid}.{ALLOWED_MEDIA_TYPES[media_type]}"
|
|
|
|
if media_type == "audio/x-m4a" or media_type == "audio/x-hx-aac-adts":
|
|
# This is a hack-fix because I want to be able to upload audio
|
|
# In the future, this should be refactored to convert file types
|
|
# using ffmpeg.js on the client side (so there are 0 security concerns)
|
|
# and then the backend just has to check a 3 item whitelist
|
|
thought.save() # Save so that we have a file to work with
|
|
subprocess.run(["ffmpeg",
|
|
"-i", thought.media.path,
|
|
"-codec:a", "mp3", "-vn",
|
|
os.path.join(settings.MEDIA_ROOT, f"{thought.uuid}.mp3")
|
|
], check=True)
|
|
os.remove(os.path.join(settings.MEDIA_ROOT, thought.media.name)) # Remove the original file
|
|
thought.media.name = f"{thought.uuid}.mp3" # Update the file in the DB
|
|
|
|
# We need to make sure that if we remove an image, the alt text is removed with it
|
|
if not thought.media:
|
|
thought.media_alt = ""
|
|
|
|
# Save for real
|
|
thought.save()
|
|
|
|
# Redirect to the same page, so that we make a GET request to /post
|
|
if editing_thought:
|
|
return redirect(editing_thought)
|
|
return redirect("post")
|
|
|
|
return render(request, "thoughts/post.html", {
|
|
"form": ThoughtForm(instance=editing_thought) if editing_thought else ThoughtForm(),
|
|
"editing": not not editing_thought,
|
|
})
|
|
|
|
|
|
def about(request):
|
|
return render(request, "thoughts/about.html", {
|
|
"authenticated": check_authenticated(request)
|
|
})
|
|
|
|
class ThoughtsSearchForm(SearchForm):
|
|
q = forms.CharField(
|
|
required=False,
|
|
widget=forms.TextInput(attrs={"type": "search", "placeholder": "Search query"}),
|
|
)
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
|
|
search = search_view_factory(
|
|
template="thoughts/search/search.html",
|
|
view_class=SearchView,
|
|
form_class=ThoughtsSearchForm,
|
|
searchqueryset=SearchQuerySet().order_by("-posted")
|
|
)
|