import logging
import os
import tempfile
from collections.abc import Generator
from dataclasses import dataclass
from django.conf import settings
from django.contrib import messages
from django.contrib.auth.decorators import login_required
from django.db import IntegrityError
from django.db import transaction
from django.shortcuts import get_object_or_404
from django.shortcuts import redirect
from django.shortcuts import render
from . import services
from .forms import ResourceCreateForm, ResourceUpdateThumbnailsForm
from .forms import ResourceUpdateMetadataForm
from .models import PDFPageSnapshot, ResourceSubcategory
from .models import PDFResource
from .models import Resource
from .s3 import get_presigned_obj_url, upload_files_to_s3, upload_to_s3
logger = logging.getLogger(__name__)
# I want to create a dataclass here to hold the resource information to pass to the view
@dataclass
class ResourceInfo:
id: int
name: str
description: str
card_description: str
main_resource_category_name: str
main_resource_category_colour_css_class: str
main_resource_badge_foreground_colour: str
subcategories: str | None
age_range: str | None
pdf_filenames: list[str]
pdf_urls: list[str]
snapshot_urls: dict[str, list[str]]
thumbnail_filenames: list[str]
thumbnail_urls: list[str]
feature_slot: int
created: str
updated: str
@login_required
def create_featured(request):
return render(request, "resources/create_featured_resource.html")
def _extract_metadata_from_resource(resource_obj) -> ResourceInfo | None:
"""
This function extracts the resource information from the model object and returns it as a
ResourceInfo object
:param resource_obj:
:return:
"""
pdf_resource_filenames = [
x.file_name for x in PDFResource.objects.filter(resource=resource_obj).all()
]
pdf_resources = PDFResource.objects.filter(resource=resource_obj).all()
snapshot_dict = {}
for p in pdf_resources:
snapshot_dict[p.file_name] = [
x.file_name for x in PDFPageSnapshot.objects.filter(pdf_file=p).all()
]
snapshot_url_dict = {}
# Iterate through the snapshot dict and generate the URLs
for k, v in snapshot_dict.items():
snapshot_url_dict[k] = [
get_presigned_obj_url(
settings.AWS_STORAGE_BUCKET_NAME,
f"snapshotted_pages/{f}",
)
for f in v
]
pdf_urls = [
get_presigned_obj_url(settings.AWS_STORAGE_BUCKET_NAME, f"pdfuploads/{f}")
for f in pdf_resource_filenames
]
thumbnail_urls = [
get_presigned_obj_url(settings.AWS_STORAGE_BUCKET_NAME, f"thumbnails/{f}")
for f in resource_obj.thumbnail_filenames
]
try:
if resource_obj.subcategories:
arc_name = resource_obj.subcategories.name
else:
arc_name = None
return ResourceInfo(
id=resource_obj.id,
name=resource_obj.name,
description=resource_obj.description,
card_description=resource_obj.card_description,
main_resource_category_name=resource_obj.main_resource_category.name,
main_resource_category_colour_css_class=resource_obj.main_resource_category.colour_css_class,
main_resource_badge_foreground_colour=resource_obj.main_resource_category.badge_foreground_colour,
subcategories=arc_name,
age_range=resource_obj.age_range,
pdf_filenames=pdf_resource_filenames,
pdf_urls=pdf_urls,
snapshot_urls=snapshot_url_dict,
thumbnail_filenames=resource_obj.thumbnail_filenames,
thumbnail_urls=thumbnail_urls,
feature_slot=resource_obj.feature_slot,
created=resource_obj.created_at,
updated=resource_obj.updated_at,
)
except Exception as e:
logging.exception(f"Error extracting resource information: {e}")
return None
@login_required
def index(request):
resource_objs = Resource.objects.all()
resource_list = [_extract_metadata_from_resource(r) for r in resource_objs]
featured_resources = [r for r in resource_list if r.feature_slot]
featured_resources = sorted(featured_resources, key=lambda resource: resource.feature_slot)
context = {"resource_list": resource_list, "featured_resources": featured_resources}
return render(request, "resources/resource_list.html", context)
def _write_pdf_to_tempdir(f) -> str:
temp_dir = tempfile.mkdtemp()
file_path = os.path.join(temp_dir, f.name)
with open(file_path, "wb") as destination:
for chunk in f.chunks():
destination.write(chunk)
return file_path
def create_metadata(pdf_files) -> Generator[tuple[services.PDFMetadata, str], None, None]:
with tempfile.TemporaryDirectory() as temp_dir:
for pdf_file in pdf_files:
file_path = os.path.join(temp_dir, pdf_file.name)
with open(file_path, "wb") as temp_file:
for chunk in pdf_file.chunks():
temp_file.write(chunk)
metadata = services.get_pdf_metadata_from_path(file_path)
snapshot_images = services.export_pages_as_images(file_path)
yield metadata, snapshot_images
@login_required
def create_resource(request):
if request.method == "POST":
form = ResourceCreateForm(request.POST, request.FILES)
if form.is_valid():
pdf_files = form.cleaned_data["pdf_files"]
thumbnail_files = form.cleaned_data["thumbnail_files"]
name = form.cleaned_data["name"]
description = form.cleaned_data["description"]
card_description = form.cleaned_data["card_description"]
resource_type = form.cleaned_data["resource_type"]
age_range = form.cleaned_data["age_range"]
curriculum = form.cleaned_data["curriculum"]
main_resource_category = form.cleaned_data["main_resource_category"]
subcategories = form.cleaned_data["subcategories"]
feature_slot = form.cleaned_data["feature_slot"]
# We use get here because we know these categories exist
subcategories_objs = [ResourceSubcategory.objects.get(name=x) for x in subcategories]
try:
with transaction.atomic():
resource = Resource(
name=name,
description=description,
card_description=card_description,
resource_type=resource_type,
age_range=age_range,
curriculum=curriculum,
main_resource_category=main_resource_category,
feature_slot=feature_slot,
)
resource.save()
resource.subcategories.set(subcategories_objs)
metadata_generator = create_metadata(pdf_files)
snapshotted_pages = []
for metadata, snapshot_images in metadata_generator:
pdf_resource = PDFResource.objects.create(
resource=resource,
file_name=os.path.basename(metadata.file_name),
file_size=metadata.file_size,
)
for snapshot_image in snapshot_images:
PDFPageSnapshot.objects.create(
name="test",
file_name=os.path.basename(snapshot_image),
pdf_file=pdf_resource,
)
snapshotted_pages.append(snapshot_images)
resource.thumbnail_filenames = [f.name for f in thumbnail_files]
resource.save()
# Reset the file pointers for pdf_files
for pdf_file in pdf_files:
pdf_file.seek(0)
if not upload_to_s3(pdf_files, thumbnail_files, snapshotted_pages):
raise Exception("Error uploading files to S3")
return redirect("resources:resource_detail", resource_id=resource.id)
except IntegrityError:
slot = form.cleaned_data["feature_slot"]
messages.add_message(request, messages.ERROR, f"Feature slot {slot} is already "
"in use. Quit this form and remove from existing "
"resource.")
except Exception:
logger.exception("Error creating resource")
form.add_error(None, "An error occurred while creating the resource.")
else:
# extract form errors
errors = {}
for field in form:
if field.errors:
errors[field.name] = field.errors
# add non-field errors
if form.non_field_errors():
errors["non_field_errors"] = form.non_field_errors()
# render form with errors
return render(
request,
"resources/resource_create.html",
{"form": form, "errors": errors},
)
else:
form = ResourceCreateForm()
return render(request, "resources/resource_create.html", {"form": form})
@login_required
def resource_detail(request, resource_id):
"""
This function returns the resource detail page.
"""
resource_obj = get_object_or_404(Resource, pk=resource_id)
resource_metadata = _extract_metadata_from_resource(resource_obj)
resource = {
"id": resource_obj.id,
"name": resource_obj.name,
"description": resource_obj.description,
"resource_type": resource_obj.resource_type.name,
"main_resource_category": resource_obj.main_resource_category.name,
"additional_resource_category": (
resource_obj.subcategories.name
if resource_obj.subcategories
else None
),
"age_range": resource_obj.age_range,
"curriculum": resource_obj.curriculum,
"pdf_filenames": resource_metadata.pdf_filenames,
"pdf_urls": resource_metadata.pdf_urls,
"thumbnails": list(
zip(
resource_metadata.thumbnail_urls,
resource_metadata.thumbnail_filenames,
strict=False,
),
),
"thumbnail_filenames": resource_metadata.thumbnail_filenames,
"thumbnail_urls": resource_metadata.thumbnail_urls,
"snapshot_urls": resource_metadata.snapshot_urls,
"created": resource_metadata.created,
"updated": resource_metadata.updated,
}
return render(request, "resources/resource_detail.html", {"resource": resource})
def update_resource_thumbnails(request, pk):
resource = get_object_or_404(Resource, pk=pk)
if request.method == "POST":
form = ResourceUpdateThumbnailsForm(request.POST, request.FILES)
if form.is_valid():
thumbnail_files = form.cleaned_data["thumbnail_files"]
resource.thumbnail_filenames = [f.name for f in thumbnail_files]
upload_files_to_s3(thumbnail_files, "thumbnails")
resource.save()
return redirect("resources:resource_detail", resource_id=resource.id)
else:
form = ResourceUpdateThumbnailsForm(resource=pk)
return render(request, "resources/update_thumbnails.html", {"form": form, "resource": resource})
@login_required
def hx_download_button(request):
"""
This is an HTMX view that is called when the user clicks the download button.
:param:
:return:
"""
pdf = request.GET.get("rn")
res = Resource.objects.get(pdf_filename=pdf)
return render(
request,
"resources/hx_download_button.html",
{"pdf_url": _extract_metadata_from_resource(res).pdf_url},
)
@login_required
def update_resource_metadata(request, pk): # Change resource_id to pk
resource = get_object_or_404(Resource, pk=pk)
if request.method == "POST":
form = ResourceUpdateMetadataForm(request.POST, instance=resource)
if form.is_valid():
form.save()
return redirect(
"resources:resource_detail",
resource_id=resource.pk,
) # Use pk instead of resource_id
else:
form = ResourceUpdateMetadataForm(instance=resource)
return render(
request,
"resources/resource_metadata_update.html",
{"form": form, "resource": resource},
)