import logging import os import tempfile from collections.abc import Generator from dataclasses import dataclass import boto3 from botocore.exceptions import ClientError from botocore.utils import Path from django.conf import settings from django.contrib import messages from django.contrib.auth.decorators import login_required from django.db import IntegrityError from django.db import transaction from django.shortcuts import get_object_or_404 from django.shortcuts import redirect from django.shortcuts import render from . import services from .forms import ResourceCreateForm from .forms import ResourceUpdateMetadataForm from .models import PDFPageSnapshot from .models import PDFResource from .models import Resource logger = logging.getLogger(__name__) # I want to create a dataclass here to hold the resource information to pass to the view @dataclass class ResourceInfo: id: int name: str description: str card_description: str main_resource_category_name: str additional_resource_category_name: str | None age_range: str | None pdf_filenames: list[str] pdf_urls: list[str] snapshot_urls: dict[str, list[str]] thumbnail_filenames: list[str] thumbnail_urls: list[str] feature_slot: int created: str updated: str @login_required def create_featured(request): return render(request, "resources/create_featured_resource.html") def _extract_metadata_from_resource(resource_obj) -> ResourceInfo | None: """ This function extracts the resource information from the model object and returns it as a ResourceInfo object :param resource_obj: :return: """ pdf_resource_filenames = [ x.file_name for x in PDFResource.objects.filter(resource=resource_obj).all() ] pdf_resources = PDFResource.objects.filter(resource=resource_obj).all() snapshot_dict = {} for p in pdf_resources: snapshot_dict[p.file_name] = [ x.file_name for x in PDFPageSnapshot.objects.filter(pdf_file=p).all() ] snapshot_url_dict = {} # Iterate through the snapshot dict and generate the URLs for k, v in snapshot_dict.items(): snapshot_url_dict[k] = [ get_presigned_obj_url( settings.AWS_STORAGE_BUCKET_NAME, f"snapshotted_pages/{f}", ) for f in v ] pdf_urls = [ get_presigned_obj_url(settings.AWS_STORAGE_BUCKET_NAME, f"pdfuploads/{f}") for f in pdf_resource_filenames ] thumbnail_urls = [ get_presigned_obj_url(settings.AWS_STORAGE_BUCKET_NAME, f"thumbnails/{f}") for f in resource_obj.thumbnail_filenames ] try: if resource_obj.additional_resource_category: arc_name = resource_obj.additional_resource_category.name else: arc_name = None return ResourceInfo( id=resource_obj.id, name=resource_obj.name, description=resource_obj.description, card_description=resource_obj.card_description, main_resource_category_name=resource_obj.main_resource_category.name, additional_resource_category_name=arc_name, age_range=resource_obj.age_range, pdf_filenames=pdf_resource_filenames, pdf_urls=pdf_urls, snapshot_urls=snapshot_url_dict, thumbnail_filenames=resource_obj.thumbnail_filenames, thumbnail_urls=thumbnail_urls, feature_slot=resource_obj.feature_slot, created=resource_obj.created_at, updated=resource_obj.updated_at, ) except Exception: logging.exception("Error extracting resource information") return None @login_required def index(request): resource_objs = Resource.objects.all() resource_list = [_extract_metadata_from_resource(r) for r in resource_objs] featured_resources = [r for r in resource_list if r.feature_slot] featured_resources = sorted(featured_resources, key=lambda resource: resource.feature_slot) context = {"resource_list": resource_list, "featured_resources": featured_resources} return render(request, "resources/resource_list.html", context) def get_presigned_obj_url(bucket_name, obj_name, expiration=3600) -> str | None: client = boto3.client( "s3", endpoint_url=settings.AWS_S3_ENDPOINT_URL, aws_access_key_id=settings.AWS_ACCESS_KEY_ID, aws_secret_access_key=settings.AWS_SECRET_ACCESS_KEY, region_name=settings.AWS_S3_REGION_NAME, ) logger.info("Client created", extra={"client": client}) try: response = client.generate_presigned_url( "get_object", Params={"Bucket": bucket_name, "Key": obj_name}, ExpiresIn=expiration, ) except ClientError as e: logger.exception("Error generating presigned URL", extra={"error": e}) return None return response def upload_to_s3(pdf_files, thumbnail_files, snappedshotted_pages) -> bool: session = boto3.Session() client = session.client( "s3", endpoint_url=settings.AWS_S3_ENDPOINT_URL, aws_access_key_id=settings.AWS_ACCESS_KEY_ID, aws_secret_access_key=settings.AWS_SECRET_ACCESS_KEY, region_name=settings.AWS_S3_REGION_NAME, ) try: for pdf_file in pdf_files: logger.info("Uploading {pdf_file.name} to S3") client.upload_fileobj( pdf_file, settings.AWS_STORAGE_BUCKET_NAME, f"pdfuploads/{pdf_file.name}", ) for f in thumbnail_files: logger.info("Uploading {f.name} to S3") client.upload_fileobj( f, settings.AWS_STORAGE_BUCKET_NAME, f"thumbnails/{f.name}", ) if len(snappedshotted_pages[0]) == 1: for img in snappedshotted_pages[0]: logger.info("Uploading {img} to S3") client.upload_file( img, settings.AWS_STORAGE_BUCKET_NAME, f"snapshotted_pages/{Path(img).name}", ) else: for lst in snappedshotted_pages: for img in lst: logger.info("Uploading {img} to S3") client.upload_file( img, settings.AWS_STORAGE_BUCKET_NAME, f"snapshotted_pages/{Path(img).name}", ) return True except ClientError: logging.exception("Error uploading files to S3") return False def _write_pdf_to_tempdir(f) -> str: temp_dir = tempfile.mkdtemp() file_path = os.path.join(temp_dir, f.name) with open(file_path, "wb") as destination: for chunk in f.chunks(): destination.write(chunk) return file_path def create_metadata( pdf_files, ) -> Generator[tuple[services.PDFMetadata, str], None, None]: with tempfile.TemporaryDirectory() as temp_dir: for pdf_file in pdf_files: file_path = os.path.join(temp_dir, pdf_file.name) with open(file_path, "wb") as temp_file: for chunk in pdf_file.chunks(): temp_file.write(chunk) metadata = services.get_pdf_metadata_from_path(file_path) snapshot_images = services.export_pages_as_images(file_path) yield metadata, snapshot_images #FIXME: Can't find a caller for this - delete it @transaction.atomic def create_resource_objects(resource, metadata_generator, thumbnail_files): for metadata, snapshot_images in metadata_generator: pdf_resource = PDFResource.objects.create( resource=resource, file_name=os.path.basename(metadata.file_name), file_size=metadata.file_size, ) for snapshot_image in snapshot_images: PDFPageSnapshot.objects.create( name="test", file_name=os.path.basename(snapshot_image), pdf_file=pdf_resource, ) resource.thumbnail_filenames = [f.name for f in thumbnail_files] resource.save() @login_required def create_resource(request): if request.method == "POST": form = ResourceCreateForm(request.POST, request.FILES) if form.is_valid(): pdf_files = form.cleaned_data["pdf_files"] thumbnail_files = form.cleaned_data["thumbnail_files"] name = form.cleaned_data["name"] description = form.cleaned_data["description"] card_description = form.cleaned_data["card_description"] resource_type = form.cleaned_data["resource_type"] age_range = form.cleaned_data["age_range"] curriculum = form.cleaned_data["curriculum"] main_resource_category = form.cleaned_data["main_resource_category"] additional_resource_category = form.cleaned_data["additional_resource_category"] feature_slot = form.cleaned_data["feature_slot"] try: with transaction.atomic(): resource = Resource.objects.create( name=name, description=description, card_description=card_description, resource_type=resource_type, age_range=age_range, curriculum=curriculum, main_resource_category=main_resource_category, additional_resource_category=additional_resource_category, feature_slot=feature_slot, ) metadata_generator = create_metadata(pdf_files) snapshotted_pages = [] for metadata, snapshot_images in metadata_generator: pdf_resource = PDFResource.objects.create( resource=resource, file_name=os.path.basename(metadata.file_name), file_size=metadata.file_size, ) for snapshot_image in snapshot_images: PDFPageSnapshot.objects.create( name="test", file_name=os.path.basename(snapshot_image), pdf_file=pdf_resource, ) snapshotted_pages.append(snapshot_images) resource.thumbnail_filenames = [f.name for f in thumbnail_files] resource.save() # Reset the file pointers for pdf_files for pdf_file in pdf_files: pdf_file.seek(0) if not upload_to_s3(pdf_files, thumbnail_files, snapshotted_pages): raise Exception("Error uploading files to S3") return redirect("resources:resource_detail", resource_id=resource.id) except IntegrityError: slot = form.cleaned_data["feature_slot"] messages.add_message(request, messages.ERROR, f"Feature slot {slot} is already " "in use. Quit this form and remove from existing resource.") except Exception: logger.exception("Error creating resource") form.add_error(None, "An error occurred while creating the resource.") else: # extract form errors errors = {} for field in form: if field.errors: errors[field.name] = field.errors # add non-field errors if form.non_field_errors(): errors["non_field_errors"] = form.non_field_errors() # render form with errors return render( request, "resources/resource_create.html", {"form": form, "errors": errors}, ) else: form = ResourceCreateForm() return render(request, "resources/resource_create.html", {"form": form}) @login_required def resource_detail(request, resource_id): """ This function returns the resource detail page. """ resource_obj = get_object_or_404(Resource, pk=resource_id) resource_metadata = _extract_metadata_from_resource(resource_obj) resource = { "id": resource_obj.id, "name": resource_obj.name, "description": resource_obj.description, "resource_type": resource_obj.resource_type.name, "main_resource_category": resource_obj.main_resource_category.name, "additional_resource_category": ( resource_obj.additional_resource_category.name if resource_obj.additional_resource_category else None ), "age_range": resource_obj.age_range, "curriculum": resource_obj.curriculum, "pdf_filenames": resource_metadata.pdf_filenames, "pdf_urls": resource_metadata.pdf_urls, "thumbnails": list( zip( resource_metadata.thumbnail_urls, resource_metadata.thumbnail_filenames, strict=False, ), ), "thumbnail_filenames": resource_metadata.thumbnail_filenames, "thumbnail_urls": resource_metadata.thumbnail_urls, "snapshot_urls": resource_metadata.snapshot_urls, "created": resource_metadata.created, "updated": resource_metadata.updated, } return render(request, "resources/resource_detail.html", {"resource": resource}) @login_required def hx_download_button(request): """ This is an HTMX view that is called when the user clicks the download button. :param :return: """ pdf = request.GET.get("rn") res = Resource.objects.get(pdf_filename=pdf) return render( request, "resources/hx_download_button.html", {"pdf_url": _extract_metadata_from_resource(res).pdf_url}, ) @login_required def update_resource_metadata(request, pk): # Change resource_id to pk resource = get_object_or_404(Resource, pk=pk) if request.method == "POST": form = ResourceUpdateMetadataForm(request.POST, instance=resource) if form.is_valid(): form.save() return redirect( "resources:resource_detail", resource_id=resource.pk, ) # Use pk instead of resource_id else: form = ResourceUpdateMetadataForm(instance=resource) return render( request, "resources/resource_metadata_update.html", {"form": form, "resource": resource}, )