diff --git a/aws_lambda_powertools/event_handler/middlewares/openapi_validation.py b/aws_lambda_powertools/event_handler/middlewares/openapi_validation.py index 1bfe416dac7..05306b5ca8b 100644 --- a/aws_lambda_powertools/event_handler/middlewares/openapi_validation.py +++ b/aws_lambda_powertools/event_handler/middlewares/openapi_validation.py @@ -1,8 +1,10 @@ from __future__ import annotations +import base64 import dataclasses import json import logging +import warnings from typing import TYPE_CHECKING, Any, Callable, Mapping, MutableMapping, Sequence, Union, cast from urllib.parse import parse_qs @@ -25,7 +27,7 @@ RequestValidationError, ResponseValidationError, ) -from aws_lambda_powertools.event_handler.openapi.params import Param +from aws_lambda_powertools.event_handler.openapi.params import Param, UploadFile from aws_lambda_powertools.event_handler.openapi.types import UnionType if TYPE_CHECKING: @@ -44,6 +46,7 @@ CONTENT_DISPOSITION_NAME_PARAM = "name=" APPLICATION_JSON_CONTENT_TYPE = "application/json" APPLICATION_FORM_CONTENT_TYPE = "application/x-www-form-urlencoded" +MULTIPART_FORM_DATA_CONTENT_TYPE = "multipart/form-data" class OpenAPIRequestValidationMiddleware(BaseMiddlewareHandler): @@ -141,14 +144,18 @@ def _get_body(self, app: EventHandlerInstance) -> dict[str, Any]: elif content_type.startswith(APPLICATION_FORM_CONTENT_TYPE): return self._parse_form_data(app) + # Handle multipart/form-data (file uploads) + elif content_type.startswith(MULTIPART_FORM_DATA_CONTENT_TYPE): + return self._parse_multipart_data(app, content_type) + else: raise RequestUnsupportedContentType( - "Only JSON body or Form() are supported", + "Unsupported content type", errors=[ { "type": "unsupported_content_type", "loc": ("body",), - "msg": "Only JSON body or Form() are supported", + "msg": f"Unsupported content type: {content_type}", "input": {}, "ctx": {}, }, @@ -195,6 +202,49 @@ def _parse_form_data(self, app: EventHandlerInstance) -> dict[str, Any]: ], ) from e + def _parse_multipart_data(self, app: EventHandlerInstance, content_type: str) -> dict[str, Any]: + """Parse multipart/form-data from the request body (file uploads).""" + try: + # Extract the boundary from the content-type header + boundary = _extract_multipart_boundary(content_type) + if not boundary: + raise ValueError("Missing boundary in multipart/form-data content-type header") + + # Get raw body bytes + raw_body = app.current_event.body or "" + if app.current_event.is_base64_encoded: + body_bytes = base64.b64decode(raw_body) + else: + warnings.warn( + "Received multipart/form-data without base64 encoding. " + "Binary file uploads may be corrupted. " + "If using API Gateway REST API (v1), configure Binary Media Types " + "to include 'multipart/form-data'. " + "See: https://docs.aws.amazon.com/apigateway/latest/developerguide/" + "api-gateway-payload-encodings.html", + stacklevel=2, + ) + # Use latin-1 to preserve all byte values (0-255) since the body + # may contain raw binary data that isn't valid UTF-8 + body_bytes = raw_body.encode("latin-1") + + return _parse_multipart_body(body_bytes, boundary) + + except ValueError: + raise + except Exception as e: + raise RequestValidationError( + [ + { + "type": "multipart_invalid", + "loc": ("body",), + "msg": "Multipart form data parsing error", + "input": {}, + "ctx": {"error": str(e)}, + }, + ], + ) from e + class OpenAPIResponseValidationMiddleware(BaseMiddlewareHandler): """ @@ -398,7 +448,12 @@ def _request_body_to_args( continue value = _normalize_field_value(value=value, field_info=field.field_info) - values[field.name] = _validate_field(field=field, value=value, loc=loc, existing_errors=errors) + + # UploadFile objects bypass Pydantic validation — they're already constructed + if isinstance(value, UploadFile): + values[field.name] = value + else: + values[field.name] = _validate_field(field=field, value=value, loc=loc, existing_errors=errors) return values, errors @@ -474,6 +529,10 @@ def _is_or_contains_sequence(annotation: Any) -> bool: def _normalize_field_value(value: Any, field_info: FieldInfo) -> Any: """Normalize field value, converting lists to single values for non-sequence fields.""" + # When annotation is bytes but value is UploadFile, extract raw content + if isinstance(value, UploadFile) and field_info.annotation is bytes: + return value.content + if _is_or_contains_sequence(field_info.annotation): return value elif isinstance(value, list) and value: @@ -587,3 +646,106 @@ def _get_param_value( value = input_dict.get(field_name) return value + + +def _extract_multipart_boundary(content_type: str) -> str | None: + """Extract the boundary string from a multipart/form-data content-type header.""" + for segment in content_type.split(";"): + stripped = segment.strip() + if stripped.startswith("boundary="): + boundary = stripped[len("boundary=") :] + # Remove optional quotes around boundary + if boundary.startswith('"') and boundary.endswith('"'): + boundary = boundary[1:-1] + return boundary + return None + + +def _parse_multipart_body(body: bytes, boundary: str) -> dict[str, Any]: + """ + Parse a multipart/form-data body into a dict of field names to values. + + File fields get bytes values; regular form fields get string values. + Multiple values for the same field name are collected into lists. + """ + delimiter = f"--{boundary}".encode() + end_delimiter = f"--{boundary}--".encode() + + result: dict[str, Any] = {} + + # Split body by the boundary delimiter + raw_parts = body.split(delimiter) + + for raw_part in raw_parts: + # Skip the preamble (before first boundary) and epilogue (after closing boundary) + if not raw_part or raw_part.strip() == b"" or raw_part.strip() == b"--": + continue + + # Remove the end delimiter marker if present + chunk = raw_part + if chunk.endswith(end_delimiter): + chunk = chunk[: -len(end_delimiter)] + + # Strip leading \r\n + if chunk.startswith(b"\r\n"): + chunk = chunk[2:] + + # Strip trailing \r\n + if chunk.endswith(b"\r\n"): + chunk = chunk[:-2] + + # Split headers from body at the double CRLF + header_end = chunk.find(b"\r\n\r\n") + if header_end == -1: + continue + + header_section = chunk[:header_end].decode("utf-8") + body_section = chunk[header_end + 4 :] + + # Parse Content-Disposition to get the field name and optional filename + field_name = None + filename = None + content_type_header = None + + for header_line in header_section.split("\r\n"): + header_lower = header_line.lower() + if header_lower.startswith("content-disposition:"): + field_name = _extract_header_param(header_line, "name") + filename = _extract_header_param(header_line, "filename") + elif header_lower.startswith("content-type:"): + content_type_header = header_line.split(":", 1)[1].strip() + + if field_name is None: + continue + + # If it has a filename, it's a file upload — wrap as UploadFile + # Otherwise it's a regular form field — decode to string + if filename is not None: + value: Any = UploadFile(content=body_section, filename=filename, content_type=content_type_header) + else: + value = body_section.decode("utf-8") + + # Collect multiple values for same field name into a list + if field_name in result: + existing = result[field_name] + if isinstance(existing, list): + existing.append(value) + else: + result[field_name] = [existing, value] + else: + result[field_name] = value + + return result + + +def _extract_header_param(header_line: str, param_name: str) -> str | None: + """Extract a parameter value from a header line (e.g., name="file" from Content-Disposition).""" + search = f'{param_name}="' + idx = header_line.find(search) + if idx == -1: + return None + start = idx + len(search) + end = header_line.find('"', start) + if end == -1: + return None + return header_line[start:end] diff --git a/aws_lambda_powertools/event_handler/openapi/dependant.py b/aws_lambda_powertools/event_handler/openapi/dependant.py index 197de9427d2..ec8414a7dd2 100644 --- a/aws_lambda_powertools/event_handler/openapi/dependant.py +++ b/aws_lambda_powertools/event_handler/openapi/dependant.py @@ -13,10 +13,10 @@ from aws_lambda_powertools.event_handler.openapi.params import ( Body, Dependant, + File, Form, Param, ParamTypes, - _File, analyze_param, create_response_field, get_flat_dependant, @@ -370,9 +370,9 @@ def get_body_field_info( if not required: body_field_info_kwargs["default"] = None - if any(isinstance(f.field_info, _File) for f in flat_dependant.body_params): - # MAINTENANCE: body_field_info: type[Body] = _File - raise NotImplementedError("_File fields are not supported in request bodies") + if any(isinstance(f.field_info, File) for f in flat_dependant.body_params): + body_field_info = Body + body_field_info_kwargs["media_type"] = "multipart/form-data" elif any(isinstance(f.field_info, Form) for f in flat_dependant.body_params): body_field_info = Body body_field_info_kwargs["media_type"] = "application/x-www-form-urlencoded" diff --git a/aws_lambda_powertools/event_handler/openapi/params.py b/aws_lambda_powertools/event_handler/openapi/params.py index 534a20a5686..3da928ca236 100644 --- a/aws_lambda_powertools/event_handler/openapi/params.py +++ b/aws_lambda_powertools/event_handler/openapi/params.py @@ -902,7 +902,57 @@ def __init__( ) -class _File(Form): # type: ignore[misc] +class UploadFile: + """ + Represents an uploaded file with its metadata. + + Use with ``Annotated[UploadFile, File()]`` to receive file content along with + filename and content type. For raw bytes only, use ``Annotated[bytes, File()]``. + + Attributes + ---------- + filename : str | None + The original filename from the upload. + content_type : str | None + The MIME type declared by the client (e.g. ``image/jpeg``). + content : bytes + The raw file content. + """ + + __slots__ = ("content", "content_type", "filename") + + def __init__(self, *, content: bytes, filename: str | None = None, content_type: str | None = None): + self.content = content + self.filename = filename + self.content_type = content_type + + def __len__(self) -> int: + return len(self.content) + + def __repr__(self) -> str: + return f"UploadFile(filename={self.filename!r}, content_type={self.content_type!r}, size={len(self.content)})" + + @classmethod + def __get_pydantic_core_schema__(cls, _source_type: Any, _handler: Any) -> Any: + from pydantic_core import core_schema + + return core_schema.no_info_plain_validator_function( + cls._validate, + serialization=core_schema.plain_serializer_function_ser_schema(lambda v: v, info_arg=False), + ) + + @classmethod + def _validate(cls, v: Any) -> UploadFile: + if isinstance(v, cls): + return v + raise ValueError(f"Expected UploadFile, got {type(v).__name__}") + + @classmethod + def __get_pydantic_json_schema__(cls, _schema: Any, handler: Any) -> dict[str, Any]: + return {"type": "string", "format": "binary"} + + +class File(Form): # type: ignore[misc] """ A class used to represent a file parameter in a path operation. """ diff --git a/docs/core/event_handler/api_gateway.md b/docs/core/event_handler/api_gateway.md index e262613046c..cd8fcc57791 100644 --- a/docs/core/event_handler/api_gateway.md +++ b/docs/core/event_handler/api_gateway.md @@ -605,6 +605,57 @@ You can use the `Form` type to tell the Event Handler that a parameter expects f --8<-- "examples/event_handler_rest/src/working_with_form_data.py" ``` +#### Handling file uploads + +!!! info "You must set `enable_validation=True` to handle file uploads via type annotation." + +You can use the `File` type to accept `multipart/form-data` file uploads. This automatically sets the correct OpenAPI schema, and Swagger UI will render a file picker for each `File()` parameter. + +There are two ways to receive uploaded files: + +* **`bytes`** — receive raw file content only +* **`UploadFile`** — receive file content along with metadata (filename, content type) + +=== "working_with_file_uploads.py" + + ```python hl_lines="4 12" + --8<-- "examples/event_handler_rest/src/working_with_file_uploads.py" + ``` + + 1. `File` is a special OpenAPI type for `multipart/form-data` file uploads. When annotated as `bytes`, you receive the raw file content. + +=== "working_with_file_uploads_metadata.py" + + ```python hl_lines="4 11 15-16" + --8<-- "examples/event_handler_rest/src/working_with_file_uploads_metadata.py" + ``` + + 1. Using `UploadFile` instead of `bytes` gives you access to file metadata. + 2. `filename` and `content_type` come from the multipart headers sent by the client. + +=== "working_with_file_uploads_mixed.py" + + You can combine `File()` and `Form()` parameters in the same route to accept file uploads with additional form fields. + + ```python hl_lines="6 14-15" + --8<-- "examples/event_handler_rest/src/working_with_file_uploads_mixed.py" + ``` + + 1. File upload parameter — receives the uploaded file with metadata. + 2. Regular form field — receives a string value from the same multipart request. + +!!! warning "API Gateway REST API (v1) requires Binary Media Types configuration" + When using API Gateway REST API (v1), you must configure Binary Media Types to include `multipart/form-data`, otherwise binary file content will be corrupted. + + ```yaml title="SAM template.yaml" + Globals: + Api: + BinaryMediaTypes: + - "multipart~1form-data" + ``` + + API Gateway HTTP API (v2), Lambda Function URL, and ALB handle binary encoding automatically — no extra configuration needed. + #### Supported types for response serialization With data validation enabled, we natively support serializing the following data types to JSON: diff --git a/examples/event_handler_rest/src/working_with_file_uploads.py b/examples/event_handler_rest/src/working_with_file_uploads.py new file mode 100644 index 00000000000..bebf72939fe --- /dev/null +++ b/examples/event_handler_rest/src/working_with_file_uploads.py @@ -0,0 +1,17 @@ +from typing import Annotated + +from aws_lambda_powertools.event_handler import APIGatewayRestResolver +from aws_lambda_powertools.event_handler.openapi.params import File + +app = APIGatewayRestResolver(enable_validation=True) + + +@app.post("/upload") +def upload_file( + file_data: Annotated[bytes, File(description="File to upload")], # (1)! +): + return {"file_size": len(file_data)} + + +def lambda_handler(event, context): + return app.resolve(event, context) diff --git a/examples/event_handler_rest/src/working_with_file_uploads_metadata.py b/examples/event_handler_rest/src/working_with_file_uploads_metadata.py new file mode 100644 index 00000000000..07da798f350 --- /dev/null +++ b/examples/event_handler_rest/src/working_with_file_uploads_metadata.py @@ -0,0 +1,21 @@ +from typing import Annotated + +from aws_lambda_powertools.event_handler import APIGatewayRestResolver +from aws_lambda_powertools.event_handler.openapi.params import File, UploadFile + +app = APIGatewayRestResolver(enable_validation=True) + + +@app.post("/upload") +def upload_file( + file_data: Annotated[UploadFile, File(description="File to upload")], # (1)! +): + return { + "filename": file_data.filename, # (2)! + "content_type": file_data.content_type, + "file_size": len(file_data), + } + + +def lambda_handler(event, context): + return app.resolve(event, context) diff --git a/examples/event_handler_rest/src/working_with_file_uploads_mixed.py b/examples/event_handler_rest/src/working_with_file_uploads_mixed.py new file mode 100644 index 00000000000..e0c3859c58e --- /dev/null +++ b/examples/event_handler_rest/src/working_with_file_uploads_mixed.py @@ -0,0 +1,29 @@ +import csv +import io +from typing import Annotated + +from aws_lambda_powertools.event_handler import APIGatewayRestResolver +from aws_lambda_powertools.event_handler.openapi.params import File, Form, UploadFile + +app = APIGatewayRestResolver(enable_validation=True) + + +@app.post("/upload-csv") +def upload_csv( + file_data: Annotated[UploadFile, File(description="CSV file to parse")], # (1)! + separator: Annotated[str, Form(description="CSV separator")] = ",", # (2)! +): + text = file_data.content.decode("utf-8") + reader = csv.DictReader(io.StringIO(text), delimiter=separator) + rows = list(reader) + + return { + "filename": file_data.filename, + "total_rows": len(rows), + "columns": list(rows[0].keys()) if rows else [], + "data": rows, + } + + +def lambda_handler(event, context): + return app.resolve(event, context) diff --git a/tests/functional/event_handler/_pydantic/test_openapi_validation_middleware.py b/tests/functional/event_handler/_pydantic/test_openapi_validation_middleware.py index 01935d3aba3..e7199adc9c5 100644 --- a/tests/functional/event_handler/_pydantic/test_openapi_validation_middleware.py +++ b/tests/functional/event_handler/_pydantic/test_openapi_validation_middleware.py @@ -1,6 +1,7 @@ import base64 import datetime import json +import warnings from dataclasses import dataclass from enum import Enum from pathlib import PurePath @@ -3329,6 +3330,610 @@ def handler(items: Annotated[Union[_Item, List[_Item]], Body()]) -> Dict[str, An assert body["count"] == 100 +# ---------- File upload (multipart/form-data) ---------- + + +def _build_multipart_body(fields: List[Dict], boundary: str = "----TestBoundary") -> Tuple[str, str]: + """ + Build a multipart/form-data body and return (base64_body, content_type). + + Each field dict can have: + - name: field name (required) + - value: str or bytes (required) + - filename: optional filename (makes it a file part) + - content_type: optional content type for the part + """ + parts = [] + for field in fields: + headers = f'Content-Disposition: form-data; name="{field["name"]}"' + if "filename" in field: + headers += f'; filename="{field["filename"]}"' + if "content_type" in field: + headers += f"\r\nContent-Type: {field['content_type']}" + value = field["value"] + if isinstance(value, str): + value = value.encode("utf-8") + parts.append((headers, value)) + + body = b"" + for headers, value in parts: + body += f"--{boundary}\r\n".encode() + body += f"{headers}\r\n\r\n".encode() + body += value + body += b"\r\n" + body += f"--{boundary}--\r\n".encode() + + content_type = f"multipart/form-data; boundary={boundary}" + return base64.b64encode(body).decode("utf-8"), content_type + + +def test_file_upload_basic(gw_event): + """Test basic file upload with File() parameter.""" + from aws_lambda_powertools.event_handler.openapi.params import File + + app = APIGatewayRestResolver(enable_validation=True) + + @app.post("/upload") + def upload(file_data: Annotated[bytes, File()]): + return {"size": len(file_data)} + + body, content_type = _build_multipart_body( + [ + {"name": "file_data", "value": b"hello world", "filename": "test.txt"}, + ], + ) + + gw_event["httpMethod"] = "POST" + gw_event["path"] = "/upload" + gw_event["headers"]["content-type"] = content_type + gw_event["body"] = body + gw_event["isBase64Encoded"] = True + + result = app(gw_event, {}) + assert result["statusCode"] == 200 + assert json.loads(result["body"]) == {"size": 11} + + +def test_file_upload_with_form_field(gw_event): + """Test file upload mixed with a regular form field.""" + from aws_lambda_powertools.event_handler.openapi.params import File + + app = APIGatewayRestResolver(enable_validation=True) + + @app.post("/upload") + def upload( + description: Annotated[str, Form()], + file_data: Annotated[bytes, File()], + ): + return {"description": description, "size": len(file_data)} + + body, content_type = _build_multipart_body( + [ + {"name": "description", "value": "my file"}, + {"name": "file_data", "value": b"\x89PNG\r\n\x1a\n", "filename": "image.png", "content_type": "image/png"}, + ], + ) + + gw_event["httpMethod"] = "POST" + gw_event["path"] = "/upload" + gw_event["headers"]["content-type"] = content_type + gw_event["body"] = body + gw_event["isBase64Encoded"] = True + + result = app(gw_event, {}) + assert result["statusCode"] == 200 + parsed = json.loads(result["body"]) + assert parsed["description"] == "my file" + assert parsed["size"] == 8 + + +def test_file_upload_missing_required(gw_event): + """Test that missing required File() parameter returns 422.""" + from aws_lambda_powertools.event_handler.openapi.params import File + + app = APIGatewayRestResolver(enable_validation=True) + + @app.post("/upload") + def upload(file_data: Annotated[bytes, File()]): + return {"size": len(file_data)} + + # Send empty multipart body (no file_data field) + body, content_type = _build_multipart_body( + [ + {"name": "other_field", "value": "some value"}, + ], + ) + + gw_event["httpMethod"] = "POST" + gw_event["path"] = "/upload" + gw_event["headers"]["content-type"] = content_type + gw_event["body"] = body + gw_event["isBase64Encoded"] = True + + result = app(gw_event, {}) + assert result["statusCode"] == 422 + assert "missing" in result["body"] + + +def test_file_upload_openapi_schema(): + """Test that File() parameters generate correct OpenAPI schema.""" + from aws_lambda_powertools.event_handler.openapi.params import File + + app = APIGatewayRestResolver(enable_validation=True) + + @app.post("/upload") + def upload(file_data: Annotated[bytes, File(description="The file to upload")]): + return {"size": len(file_data)} + + schema = app.get_openapi_schema() + path = schema.paths["/upload"] + post_op = path.post + + # Should have a request body with multipart/form-data + assert post_op.requestBody is not None + content = post_op.requestBody.content + assert "multipart/form-data" in content + + # The schema should reference a binary format field + multipart_schema = content["multipart/form-data"].schema_ + assert multipart_schema is not None + + +def test_file_upload_non_base64(gw_event): + """Test file upload when body is not base64-encoded (edge case).""" + from aws_lambda_powertools.event_handler.openapi.params import File + + app = APIGatewayRestResolver(enable_validation=True) + + @app.post("/upload") + def upload(file_data: Annotated[bytes, File()]): + return {"size": len(file_data)} + + # Build multipart body without base64 encoding + boundary = "----TestBoundary" + raw_body = ( + f"--{boundary}\r\n" + f'Content-Disposition: form-data; name="file_data"; filename="test.txt"\r\n' + f"\r\n" + f"hello world\r\n" + f"--{boundary}--\r\n" + ) + + gw_event["httpMethod"] = "POST" + gw_event["path"] = "/upload" + gw_event["headers"]["content-type"] = f"multipart/form-data; boundary={boundary}" + gw_event["body"] = raw_body + gw_event["isBase64Encoded"] = False + + result = app(gw_event, {}) + assert result["statusCode"] == 200 + assert json.loads(result["body"]) == {"size": 11} + + +def test_file_upload_non_base64_emits_warning(gw_event): + """Test that non-base64 multipart body emits a warning about API Gateway config.""" + from aws_lambda_powertools.event_handler.openapi.params import File + + app = APIGatewayRestResolver(enable_validation=True) + + @app.post("/upload") + def upload(file_data: Annotated[bytes, File()]): + return {"size": len(file_data)} + + boundary = "----TestBoundary" + raw_body = ( + f"--{boundary}\r\n" + f'Content-Disposition: form-data; name="file_data"; filename="test.txt"\r\n' + f"\r\n" + f"hello world\r\n" + f"--{boundary}--\r\n" + ) + + gw_event["httpMethod"] = "POST" + gw_event["path"] = "/upload" + gw_event["headers"]["content-type"] = f"multipart/form-data; boundary={boundary}" + gw_event["body"] = raw_body + gw_event["isBase64Encoded"] = False + + with warnings.catch_warnings(record=True) as w: + warnings.simplefilter("always") + result = app(gw_event, {}) + + assert result["statusCode"] == 200 + assert len(w) == 1 + assert "Binary Media Types" in str(w[0].message) + + +def test_file_upload_non_base64_binary_content(gw_event): + """Test file upload with raw binary bytes (e.g. JPEG) without base64 encoding.""" + from aws_lambda_powertools.event_handler.openapi.params import File + + app = APIGatewayRestResolver(enable_validation=True) + + @app.post("/upload") + def upload(file_data: Annotated[bytes, File()]): + return {"size": len(file_data)} + + # Simulate binary content with bytes that are NOT valid UTF-8 (like JPEG header 0xFF 0xD8) + binary_content = b"\xff\xd8\xff\xe0\x00\x10JFIF\x00" + boundary = "----TestBoundary" + raw_bytes = ( + ( + f"--{boundary}\r\n" + f'Content-Disposition: form-data; name="file_data"; filename="photo.jpg"\r\n' + f"Content-Type: image/jpeg\r\n" + f"\r\n" + ).encode("latin-1") + + binary_content + + f"\r\n--{boundary}--\r\n".encode("latin-1") + ) + + # Without binary mode, API Gateway passes body as latin-1 compatible string + gw_event["httpMethod"] = "POST" + gw_event["path"] = "/upload" + gw_event["headers"]["content-type"] = f"multipart/form-data; boundary={boundary}" + gw_event["body"] = raw_bytes.decode("latin-1") + gw_event["isBase64Encoded"] = False + + with warnings.catch_warnings(record=True): + warnings.simplefilter("always") + result = app(gw_event, {}) + + assert result["statusCode"] == 200 + assert json.loads(result["body"]) == {"size": len(binary_content)} + + +def test_upload_file_with_metadata(gw_event): + """Test UploadFile annotation provides filename and content_type.""" + from aws_lambda_powertools.event_handler.openapi.params import File, UploadFile + + app = APIGatewayRestResolver(enable_validation=True) + + @app.post("/upload") + def upload(file_data: Annotated[UploadFile, File()]): + return { + "filename": file_data.filename, + "content_type": file_data.content_type, + "size": len(file_data), + } + + body, content_type = _build_multipart_body( + [ + {"name": "file_data", "value": b"fake jpeg", "filename": "photo.jpg", "content_type": "image/jpeg"}, + ], + ) + + gw_event["httpMethod"] = "POST" + gw_event["path"] = "/upload" + gw_event["headers"]["content-type"] = content_type + gw_event["body"] = body + gw_event["isBase64Encoded"] = True + + result = app(gw_event, {}) + assert result["statusCode"] == 200 + parsed = json.loads(result["body"]) + assert parsed["filename"] == "photo.jpg" + assert parsed["content_type"] == "image/jpeg" + assert parsed["size"] == 9 + + +def test_upload_file_mixed_with_form(gw_event): + """Test UploadFile + Form fields together.""" + from aws_lambda_powertools.event_handler.openapi.params import File, UploadFile + + app = APIGatewayRestResolver(enable_validation=True) + + @app.post("/upload") + def upload( + file_data: Annotated[UploadFile, File()], + title: Annotated[str, Form()], + ): + return { + "title": title, + "filename": file_data.filename, + "size": len(file_data), + } + + body, content_type = _build_multipart_body( + [ + {"name": "title", "value": "My Document"}, + { + "name": "file_data", + "value": b"pdf content here", + "filename": "doc.pdf", + "content_type": "application/pdf", + }, + ], + ) + + gw_event["httpMethod"] = "POST" + gw_event["path"] = "/upload" + gw_event["headers"]["content-type"] = content_type + gw_event["body"] = body + gw_event["isBase64Encoded"] = True + + result = app(gw_event, {}) + assert result["statusCode"] == 200 + parsed = json.loads(result["body"]) + assert parsed["title"] == "My Document" + assert parsed["filename"] == "doc.pdf" + assert parsed["size"] == 16 + + +def test_upload_file_openapi_schema(): + """Test UploadFile generates correct OpenAPI schema.""" + from aws_lambda_powertools.event_handler.openapi.params import File, UploadFile + + app = APIGatewayRestResolver(enable_validation=True) + + @app.post("/upload") + def upload(file_data: Annotated[UploadFile, File(description="A file")]): + return {} + + schema = app.get_openapi_schema() + schema_dict = schema.model_dump(exclude_none=True, by_alias=True) + upload_path = schema_dict["paths"]["/upload"]["post"] + content = upload_path["requestBody"]["content"] + assert "multipart/form-data" in content + + # Resolve $ref to get the actual schema + ref = content["multipart/form-data"]["schema"]["$ref"] + schema_name = ref.split("/")[-1] + props = schema_dict["components"]["schemas"][schema_name]["properties"] + assert props["file_data"]["type"] == "string" + assert props["file_data"]["format"] == "binary" + + +def test_multipart_missing_boundary(gw_event): + """Test that missing boundary in content-type raises ValueError.""" + from aws_lambda_powertools.event_handler.openapi.params import File + + app = APIGatewayRestResolver(enable_validation=True) + + @app.post("/upload") + def upload(file_data: Annotated[bytes, File()]): + return {"size": len(file_data)} + + gw_event["httpMethod"] = "POST" + gw_event["path"] = "/upload" + gw_event["headers"]["content-type"] = "multipart/form-data" # no boundary + gw_event["body"] = base64.b64encode(b"some data").decode() + gw_event["isBase64Encoded"] = True + + with pytest.raises(ValueError, match="Missing boundary"): + app(gw_event, {}) + + +def test_multipart_quoted_boundary(gw_event): + """Test that boundary with quotes is parsed correctly.""" + from aws_lambda_powertools.event_handler.openapi.params import File + + app = APIGatewayRestResolver(enable_validation=True) + + @app.post("/upload") + def upload(file_data: Annotated[bytes, File()]): + return {"size": len(file_data)} + + boundary = "----TestBoundary" + body, _ = _build_multipart_body( + [ + {"name": "file_data", "value": b"hello", "filename": "test.txt"}, + ], + boundary=boundary, + ) + + gw_event["httpMethod"] = "POST" + gw_event["path"] = "/upload" + # Use quoted boundary + gw_event["headers"]["content-type"] = f'multipart/form-data; boundary="{boundary}"' + gw_event["body"] = body + gw_event["isBase64Encoded"] = True + + result = app(gw_event, {}) + assert result["statusCode"] == 200 + assert json.loads(result["body"]) == {"size": 5} + + +def test_multipart_multiple_values_same_field(gw_event): + """Test multiple values for the same field name are collected as list.""" + from aws_lambda_powertools.event_handler.openapi.params import File, UploadFile + + app = APIGatewayRestResolver(enable_validation=True) + + @app.post("/upload") + def upload(file_data: Annotated[List[UploadFile], File()]): + return {"count": len(file_data), "filenames": [f.filename for f in file_data]} + + # Build body with two parts having the same field name + boundary = "----TestBoundary" + raw = ( + f"--{boundary}\r\n" + f'Content-Disposition: form-data; name="file_data"; filename="a.txt"\r\n' + f"\r\n" + f"content a\r\n" + f"--{boundary}\r\n" + f'Content-Disposition: form-data; name="file_data"; filename="b.txt"\r\n' + f"\r\n" + f"content b\r\n" + f"--{boundary}--\r\n" + ).encode() + + gw_event["httpMethod"] = "POST" + gw_event["path"] = "/upload" + gw_event["headers"]["content-type"] = f"multipart/form-data; boundary={boundary}" + gw_event["body"] = base64.b64encode(raw).decode() + gw_event["isBase64Encoded"] = True + + result = app(gw_event, {}) + assert result["statusCode"] == 200 + parsed = json.loads(result["body"]) + assert parsed["count"] == 2 + assert parsed["filenames"] == ["a.txt", "b.txt"] + + +def test_multipart_three_values_same_field(gw_event): + """Test three or more values for same field name builds onto existing list.""" + from aws_lambda_powertools.event_handler.openapi.params import File, UploadFile + + app = APIGatewayRestResolver(enable_validation=True) + + @app.post("/upload") + def upload(file_data: Annotated[List[UploadFile], File()]): + return {"count": len(file_data), "filenames": [f.filename for f in file_data]} + + boundary = "----TestBoundary" + raw = ( + f"--{boundary}\r\n" + f'Content-Disposition: form-data; name="file_data"; filename="a.txt"\r\n' + f"\r\n" + f"aaa\r\n" + f"--{boundary}\r\n" + f'Content-Disposition: form-data; name="file_data"; filename="b.txt"\r\n' + f"\r\n" + f"bbb\r\n" + f"--{boundary}\r\n" + f'Content-Disposition: form-data; name="file_data"; filename="c.txt"\r\n' + f"\r\n" + f"ccc\r\n" + f"--{boundary}--\r\n" + ).encode() + + gw_event["httpMethod"] = "POST" + gw_event["path"] = "/upload" + gw_event["headers"]["content-type"] = f"multipart/form-data; boundary={boundary}" + gw_event["body"] = base64.b64encode(raw).decode() + gw_event["isBase64Encoded"] = True + + result = app(gw_event, {}) + assert result["statusCode"] == 200 + parsed = json.loads(result["body"]) + assert parsed["count"] == 3 + assert parsed["filenames"] == ["a.txt", "b.txt", "c.txt"] + + +def test_multipart_part_without_headers_separator(gw_event): + """Test that a malformed part missing the header/body separator is skipped.""" + from aws_lambda_powertools.event_handler.openapi.params import File, UploadFile + + app = APIGatewayRestResolver(enable_validation=True) + + @app.post("/upload") + def upload(file_data: Annotated[UploadFile, File()]): + return {"filename": file_data.filename} + + # Build a body with one malformed part (no \r\n\r\n) and one valid part + boundary = "----TestBoundary" + raw = ( + f"--{boundary}\r\n" + f"This part has no header separator at all\r\n" + f"--{boundary}\r\n" + f'Content-Disposition: form-data; name="file_data"; filename="good.txt"\r\n' + f"\r\n" + f"good content\r\n" + f"--{boundary}--\r\n" + ).encode() + + gw_event["httpMethod"] = "POST" + gw_event["path"] = "/upload" + gw_event["headers"]["content-type"] = f"multipart/form-data; boundary={boundary}" + gw_event["body"] = base64.b64encode(raw).decode() + gw_event["isBase64Encoded"] = True + + result = app(gw_event, {}) + assert result["statusCode"] == 200 + parsed = json.loads(result["body"]) + assert parsed["filename"] == "good.txt" + + +def test_multipart_part_without_field_name(gw_event): + """Test that a part missing the name parameter in Content-Disposition is skipped.""" + from aws_lambda_powertools.event_handler.openapi.params import File, UploadFile + + app = APIGatewayRestResolver(enable_validation=True) + + @app.post("/upload") + def upload(file_data: Annotated[UploadFile, File()]): + return {"filename": file_data.filename} + + # Build a body with one part that has no name= param and one valid part + boundary = "----TestBoundary" + raw = ( + f"--{boundary}\r\n" + f"Content-Disposition: form-data\r\n" + f"\r\n" + f"orphan content\r\n" + f"--{boundary}\r\n" + f'Content-Disposition: form-data; name="file_data"; filename="valid.txt"\r\n' + f"\r\n" + f"valid content\r\n" + f"--{boundary}--\r\n" + ).encode() + + gw_event["httpMethod"] = "POST" + gw_event["path"] = "/upload" + gw_event["headers"]["content-type"] = f"multipart/form-data; boundary={boundary}" + gw_event["body"] = base64.b64encode(raw).decode() + gw_event["isBase64Encoded"] = True + + result = app(gw_event, {}) + assert result["statusCode"] == 200 + parsed = json.loads(result["body"]) + assert parsed["filename"] == "valid.txt" + + +def test_upload_file_validate_error(): + """Test UploadFile._validate raises ValueError for non-UploadFile values.""" + from aws_lambda_powertools.event_handler.openapi.params import UploadFile + + with pytest.raises(ValueError, match="Expected UploadFile, got str"): + UploadFile._validate("not an upload file") + + with pytest.raises(ValueError, match="Expected UploadFile, got int"): + UploadFile._validate(42) + + +def test_multipart_unclosed_quote_in_header(): + """Test that _extract_header_param returns None when quote is unclosed.""" + from aws_lambda_powertools.event_handler.middlewares.openapi_validation import _extract_header_param + + # name=" is present but closing quote is missing + result = _extract_header_param('Content-Disposition: form-data; name="broken', "name") + assert result is None + + +def test_multipart_generic_parse_error(gw_event): + """Test that non-ValueError exceptions during multipart parsing produce 422.""" + from unittest.mock import patch + + from aws_lambda_powertools.event_handler.openapi.params import File, UploadFile + + app = APIGatewayRestResolver(enable_validation=True) + + @app.post("/upload") + def upload(file_data: Annotated[UploadFile, File()]): + return {"filename": file_data.filename} + + body_b64, content_type = _build_multipart_body( + [{"name": "file_data", "value": b"data", "filename": "test.txt"}], + ) + + gw_event["httpMethod"] = "POST" + gw_event["path"] = "/upload" + gw_event["headers"]["content-type"] = content_type + gw_event["body"] = body_b64 + gw_event["isBase64Encoded"] = True + + # Patch _parse_multipart_body to raise a non-ValueError (e.g. TypeError) + with patch( + "aws_lambda_powertools.event_handler.middlewares.openapi_validation._parse_multipart_body", + side_effect=TypeError("unexpected type"), + ): + result = app(gw_event, {}) + assert result["statusCode"] == 422 + body = json.loads(result["body"]) + assert body["detail"][0]["type"] == "multipart_invalid" + + # ---------- Cookie parameter tests ----------