import logging
import os
import tempfile
from collections.abc import Generator
from dataclasses import dataclass
import boto3
from botocore.exceptions import ClientError
from botocore.utils import Path
from django.conf import settings
from django.contrib.auth.decorators import login_required
from django.db import transaction
from django.shortcuts import get_object_or_404
from django.shortcuts import redirect
from django.shortcuts import render
from . import services
from .forms import ResourceCreateForm
from .forms import ResourceUpdateMetadataForm
from .models import PDFPageSnapshot
from .models import PDFResource
from .models import Resource
logger = logging.getLogger(__name__)
# I want to create a dataclass here to hold the resource information to pass to the view
@dataclass
class ResourceInfo:
id: int
name: str
description: str
main_resource_category_name: str
additional_resource_category_name: str | None
age_range: str | None
pdf_filenames: list[str]
pdf_urls: list[str]
snapshot_urls: dict[str, list[str]]
thumbnail_filenames: list[str]
thumbnail_urls: list[str]
created: str
updated: str
def _extract_metadata_from_resource(resource_obj) -> ResourceInfo | None:
"""
This function extracts the resource information from the model object and returns it as a
ResourceInfo object
:param resource_obj:
:return:
"""
pdf_resource_filenames = [
x.file_name for x in PDFResource.objects.filter(resource=resource_obj).all()
]
pdf_resources = PDFResource.objects.filter(resource=resource_obj).all()
snapshot_dict = {}
for p in pdf_resources:
snapshot_dict[p.file_name] = [
x.file_name for x in PDFPageSnapshot.objects.filter(pdf_file=p).all()
]
snapshot_url_dict = {}
# Iterate through the snapshot dict and generate the URLs
for k, v in snapshot_dict.items():
snapshot_url_dict[k] = [
get_presigned_obj_url(
settings.AWS_STORAGE_BUCKET_NAME,
f"snapshotted_pages/{f}",
)
for f in v
]
pdf_urls = [
get_presigned_obj_url(settings.AWS_STORAGE_BUCKET_NAME, f"pdfuploads/{f}")
for f in pdf_resource_filenames
]
thumbnail_urls = [
get_presigned_obj_url(settings.AWS_STORAGE_BUCKET_NAME, f"thumbnails/{f}")
for f in resource_obj.thumbnail_filenames
]
try:
if resource_obj.additional_resource_category:
arc_name = resource_obj.additional_resource_category.name
else:
arc_name = None
return ResourceInfo(
id=resource_obj.id,
name=resource_obj.name,
description=resource_obj.description,
main_resource_category_name=resource_obj.main_resource_category.name,
additional_resource_category_name=arc_name,
age_range=resource_obj.age_range,
pdf_filenames=pdf_resource_filenames,
pdf_urls=pdf_urls,
snapshot_urls=snapshot_url_dict,
thumbnail_filenames=resource_obj.thumbnail_filenames,
thumbnail_urls=thumbnail_urls,
created=resource_obj.created_at,
updated=resource_obj.updated_at,
)
except Exception:
logging.exception("Error extracting resource information")
return None
@login_required
def index(request):
resource_objs = Resource.objects.all()
resource_list = [_extract_metadata_from_resource(r) for r in resource_objs]
context = {"resource_list": resource_list}
return render(request, "resources/resource_list.html", context)
def get_presigned_obj_url(bucket_name, obj_name, expiration=3600) -> str | None:
client = boto3.client(
"s3",
endpoint_url=settings.AWS_S3_ENDPOINT_URL,
aws_access_key_id=settings.AWS_ACCESS_KEY_ID,
aws_secret_access_key=settings.AWS_SECRET_ACCESS_KEY,
region_name=settings.AWS_S3_REGION_NAME,
)
logger.info("Client created", extra={"client": client})
try:
response = client.generate_presigned_url(
"get_object",
Params={"Bucket": bucket_name, "Key": obj_name},
ExpiresIn=expiration,
)
except ClientError as e:
logger.exception("Error generating presigned URL", extra={"error": e})
return None
return response
def upload_to_s3(pdf_files, thumbnail_files, snappedshotted_pages) -> bool:
session = boto3.Session()
client = session.client(
"s3",
endpoint_url=settings.AWS_S3_ENDPOINT_URL,
aws_access_key_id=settings.AWS_ACCESS_KEY_ID,
aws_secret_access_key=settings.AWS_SECRET_ACCESS_KEY,
region_name=settings.AWS_S3_REGION_NAME,
)
try:
for pdf_file in pdf_files:
logger.info("Uploading {pdf_file.name} to S3")
client.upload_fileobj(
pdf_file,
settings.AWS_STORAGE_BUCKET_NAME,
f"pdfuploads/{pdf_file.name}",
)
for f in thumbnail_files:
logger.info("Uploading {f.name} to S3")
client.upload_fileobj(
f,
settings.AWS_STORAGE_BUCKET_NAME,
f"thumbnails/{f.name}",
)
if len(snappedshotted_pages[0]) == 1:
for img in snappedshotted_pages[0]:
logger.info("Uploading {img} to S3")
client.upload_file(
img,
settings.AWS_STORAGE_BUCKET_NAME,
f"snapshotted_pages/{Path(img).name}",
)
else:
for lst in snappedshotted_pages:
for img in lst:
logger.info("Uploading {img} to S3")
client.upload_file(
img,
settings.AWS_STORAGE_BUCKET_NAME,
f"snapshotted_pages/{Path(img).name}",
)
return True
except ClientError:
logging.exception("Error uploading files to S3")
return False
def _write_pdf_to_tempdir(f) -> str:
temp_dir = tempfile.mkdtemp()
file_path = os.path.join(temp_dir, f.name)
with open(file_path, "wb") as destination:
for chunk in f.chunks():
destination.write(chunk)
return file_path
def create_metadata(
pdf_files,
) -> Generator[tuple[services.PDFMetadata, str], None, None]:
with tempfile.TemporaryDirectory() as temp_dir:
for pdf_file in pdf_files:
file_path = os.path.join(temp_dir, pdf_file.name)
with open(file_path, "wb") as temp_file:
for chunk in pdf_file.chunks():
temp_file.write(chunk)
metadata = services.get_pdf_metadata_from_path(file_path)
snapshot_images = services.export_pages_as_images(file_path)
yield metadata, snapshot_images
@transaction.atomic
def create_resource_objects(resource, metadata_generator, thumbnail_files):
for metadata, snapshot_images in metadata_generator:
pdf_resource = PDFResource.objects.create(
resource=resource,
file_name=os.path.basename(metadata.file_name),
file_size=metadata.file_size,
)
for snapshot_image in snapshot_images:
PDFPageSnapshot.objects.create(
name="test",
file_name=os.path.basename(snapshot_image),
pdf_file=pdf_resource,
)
resource.thumbnail_filenames = [f.name for f in thumbnail_files]
resource.save()
@login_required
def create_resource(request):
if request.method == "POST":
form = ResourceCreateForm(request.POST, request.FILES)
if form.is_valid():
pdf_files = form.cleaned_data["pdf_files"]
thumbnail_files = form.cleaned_data["thumbnail_files"]
name = form.cleaned_data["name"]
description = form.cleaned_data["description"]
resource_type = form.cleaned_data["resource_type"]
age_range = form.cleaned_data["age_range"]
curriculum = form.cleaned_data["curriculum"]
main_resource_category = form.cleaned_data["main_resource_category"]
additional_resource_category = form.cleaned_data["additional_resource_category"]
try:
resource = Resource.objects.create(
name=name,
description=description,
resource_type=resource_type,
age_range=age_range,
curriculum=curriculum,
main_resource_category=main_resource_category,
additional_resource_category=additional_resource_category,
)
metadata_generator = create_metadata(pdf_files)
snapshotted_pages = []
for metadata, snapshot_images in metadata_generator:
pdf_resource = PDFResource.objects.create(
resource=resource,
file_name=os.path.basename(metadata.file_name),
file_size=metadata.file_size,
)
for snapshot_image in snapshot_images:
PDFPageSnapshot.objects.create(
name="test",
file_name=os.path.basename(snapshot_image),
pdf_file=pdf_resource,
)
snapshotted_pages.append(snapshot_images)
resource.thumbnail_filenames = [f.name for f in thumbnail_files]
resource.save()
# Reset the file pointers for pdf_files
for pdf_file in pdf_files:
pdf_file.seek(0)
if not upload_to_s3(pdf_files, thumbnail_files, snapshotted_pages):
raise Exception("Error uploading files to S3")
return redirect("resources:resource_detail", resource_id=resource.id)
except Exception:
logger.exception("Error creating resource")
form.add_error(None, "An error occurred while creating the resource.")
else:
# extract form errors
errors = {}
for field in form:
if field.errors:
errors[field.name] = field.errors
# add non-field errors
if form.non_field_errors():
errors["non_field_errors"] = form.non_field_errors()
# render form with errors
return render(
request,
"resources/resource_create.html",
{"form": form, "errors": errors},
)
else:
form = ResourceCreateForm()
return render(request, "resources/resource_create.html", {"form": form})
@login_required
def resource_detail(request, resource_id):
"""
This function returns the resource detail page.
"""
resource_obj = get_object_or_404(Resource, pk=resource_id)
resource_metadata = _extract_metadata_from_resource(resource_obj)
resource = {
"id": resource_obj.id,
"name": resource_obj.name,
"description": resource_obj.description,
"resource_type": resource_obj.resource_type.name,
"main_resource_category": resource_obj.main_resource_category.name,
"additional_resource_category": (
resource_obj.additional_resource_category.name
if resource_obj.additional_resource_category
else None
),
"age_range": resource_obj.age_range,
"curriculum": resource_obj.curriculum,
"pdf_filenames": resource_metadata.pdf_filenames,
"pdf_urls": resource_metadata.pdf_urls,
"thumbnails": list(
zip(
resource_metadata.thumbnail_urls,
resource_metadata.thumbnail_filenames,
strict=False,
),
),
"thumbnail_filenames": resource_metadata.thumbnail_filenames,
"thumbnail_urls": resource_metadata.thumbnail_urls,
"snapshot_urls": resource_metadata.snapshot_urls,
"created": resource_metadata.created,
"updated": resource_metadata.updated,
}
return render(request, "resources/resource_detail.html", {"resource": resource})
@login_required
def hx_download_button(request):
"""
This is an HTMX view that is called when the user clicks the download button.
:param
:return:
"""
pdf = request.GET.get("rn")
res = Resource.objects.get(pdf_filename=pdf)
return render(
request,
"resources/hx_download_button.html",
{"pdf_url": _extract_metadata_from_resource(res).pdf_url},
)
@login_required
def update_resource_metadata(request, pk): # Change resource_id to pk
resource = get_object_or_404(Resource, pk=pk)
if request.method == "POST":
form = ResourceUpdateMetadataForm(request.POST, instance=resource)
if form.is_valid():
form.save()
return redirect(
"resources:resource_detail",
resource_id=resource.pk,
) # Use pk instead of resource_id
else:
form = ResourceUpdateMetadataForm(instance=resource)
return render(
request,
"resources/resource_metadata_update.html",
{"form": form, "resource": resource},
)