import os.path import uuid import subprocess import base64 import collections import hashlib from itertools import islice import magic from django import forms from django.shortcuts import render, redirect from django.utils.crypto import constant_time_compare from django.views.decorators.http import condition from haystack.views import SearchView, search_view_factory from haystack.query import SearchQuerySet from haystack.forms import SearchForm from whispermaphone import settings from .models import Thought, ThoughtForm, ALLOWED_MEDIA_TYPES from .pagination import get_all_pages, get_page_slug # Python's itertools standard library is bad # Instead of providing functions # They provide "recipes" that you can copy-paste into your own program # => https://docs.python.org/3/library/itertools.html#itertools-recipes def sliding_window(iterable, n): # sliding_window('ABCDEFG', 4) --> ABCD BCDE CDEF DEFG it = iter(iterable) window = collections.deque(islice(it, n), maxlen=n) if len(window) == n: yield tuple(window) for x in it: window.append(x) yield tuple(window) def check_authenticated(request): authenticated = False try: if constant_time_compare(request.COOKIES["password"], settings.PASSWORD): authenticated = True except KeyError: pass return authenticated def get_highlighted_uuid(request): try: return uuid.UUID(request.GET.get("show", "")) except ValueError: return "" # Return the previous, current, and next page def get_pages(request): ## Figure out what page we're viewing pages = get_all_pages() requested_slug = request.GET.get("page", default="") # If we've passed a valid highlighted thought uuid, # that takes priority over a page= value, so we overwrite requested_slug highlighted_uuid = get_highlighted_uuid(request) if highlighted_uuid: try: requested_slug = get_page_slug(Thought.objects.get(uuid=highlighted_uuid)) except Thought.DoesNotExist: pass # When we get here, either: # no slug was passed, requested_slug is "" # a valid highlighted_uuid was passed, requested_slug is the slug of that page # requested_slug is an invalid slug # requested_slug is a valid slug # First item in pages should be the default previous_page, current_page, next_page = None, pages[0], (pages[1] if len(pages) > 1 else None) # We don't have any way of validating slugs or anything, # so we just have to loop though and see if the requested slug matches any pages for (prev, curr, nex) in sliding_window(pages, 3): if curr.slug == requested_slug: previous_page, current_page, next_page = prev, curr, nex # If you requested the last page, then this runs on the last iteration # It runs right before every matching slug, but it only matters if it's the last iteration if nex.slug == requested_slug: previous_page, current_page, next_page = curr, nex, None return previous_page, current_page, next_page # Takes a Queryset of Thoughts and returns a hash of: # the last-modified time for the most recently modified # the number of thoughts # and the current git commit (read from settings (which reads it from the git_commit file or randomly generates an id)) # I can't think of any edge cases. It's impossible to change page without changing the query params, the time, or the count # I still need to double check that different query params are treated differently def get_etag_from_thoughts(thoughts): count = thoughts.count() last_updated = thoughts.order_by("-last_updated").first().last_updated # Do I need to include query params? I'd hope that it was seen as a different URL hash_str = f"{count}:{last_updated}:{settings.COMMIT}" return hashlib.sha256(hash_str.encode()).hexdigest() def get_etag(request): previous_page, current_page, next_page = get_pages(request) thoughts = current_page.get_all_entries() # We're supposed to include quotes return f'"{get_etag_from_thoughts(thoughts)}"' @condition(etag_func=get_etag) def index(request): authenticated = check_authenticated(request) previous_page, current_page, next_page = get_pages(request) thoughts = current_page.get_all_entries() highlighted_uuid = get_highlighted_uuid(request) return render(request, "thoughts/index.html", { "thoughts": thoughts, "highlighted": highlighted_uuid, "authenticated": authenticated, "page": current_page, "previous_page_slug": getattr(previous_page, "slug", None), "next_page_slug": getattr(next_page, "slug", None), "is_first_page": previous_page is None, # if you're viewing the first page "is_last_page": next_page is None }) def login(request): if check_authenticated(request): return redirect("post") if request.method == "POST": if constant_time_compare(request.POST["password"], settings.PASSWORD): res = redirect("post") res.set_cookie("password", request.POST["password"], max_age=60*60*24*365) # 1 year return res # Returning 401 here causes `links` to always prompt for HTTP basic auth, which is annoying. # But the alternative is not following the HTTP spec, so I think this is fine. return render(request, "thoughts/login.html", status=401) def post(request): if not check_authenticated(request): return redirect("login") editing = request.GET.get("editing", None) try: editing_thought = Thought.objects.get(uuid=editing) editing_thought.timezone_offset = - editing_thought.timezone_offset / 60 except Thought.DoesNotExist: editing_thought = None if request.method == "POST": # We post in hours, so we need to convert back to minutes for saving # We can pass errors since form.is_valid catches most of them # We just need to convert hours to minutes first, because otherwise it errors on non-integer values values = request.POST.copy() try: values["timezone_offset"] = - float(values["timezone_offset"]) * 60 except (ValueError, KeyError): pass thought_form = ThoughtForm(values, request.FILES, instance=editing_thought or Thought()) if not thought_form.is_valid(): errors = thought_form.errors.as_data() # Media formatting errors try: problem_field = list(errors.keys())[0] message = list(errors.values())[0][0].messages[0] error_line = f"{problem_field[0].upper()}{problem_field[1:]}: {message}" except: error_line = f"An unknown error occurred processing your request: {errors}" return render(request, "thoughts/post.html", { "form": thought_form, "form_error": error_line }, status=400) if editing_thought: thought = editing_thought else: # Create a thought object we can work with # But don't save it the DB yet thought = thought_form.save(commit=False) # Do media processing (already validated) if "media" in request.FILES: chunk = next(thought.media.chunks(chunk_size=2048)) media_type = magic.from_buffer(chunk, mime="True") thought.media.name = f"{thought.uuid}.{ALLOWED_MEDIA_TYPES[media_type]}" if media_type == "audio/x-m4a" or media_type == "audio/x-hx-aac-adts": # This is a hack-fix because I want to be able to upload audio # In the future, this should be refactored to convert file types # using ffmpeg.js on the client side (so there are 0 security concerns) # and then the backend just has to check a 3 item whitelist thought.save() # Save so that we have a file to work with subprocess.run(["ffmpeg", "-i", thought.media.path, "-codec:a", "mp3", "-vn", os.path.join(settings.MEDIA_ROOT, f"{thought.uuid}.mp3") ], check=True) os.remove(os.path.join(settings.MEDIA_ROOT, thought.media.name)) # Remove the original file thought.media.name = f"{thought.uuid}.mp3" # Update the file in the DB # We need to make sure that if we remove an image, the alt text is removed with it if not thought.media: thought.media_alt = "" # Save for real thought.save() # Redirect to the same page, so that we make a GET request to /post if editing_thought: return redirect(editing_thought) return redirect("post") return render(request, "thoughts/post.html", { "form": ThoughtForm(instance=editing_thought) if editing_thought else ThoughtForm(), "editing": not not editing_thought, }) def about(request): return render(request, "thoughts/about.html", { "authenticated": check_authenticated(request) }) class ThoughtsSearchForm(SearchForm): q = forms.CharField( required=False, widget=forms.TextInput(attrs={"type": "search", "placeholder": "Search query"}), ) def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) search = search_view_factory( template="thoughts/search/search.html", view_class=SearchView, form_class=ThoughtsSearchForm, searchqueryset=SearchQuerySet().order_by("-posted") )