From 40a37307ded36311a07eb2577cb74c92a2639f9d Mon Sep 17 00:00:00 2001 From: Simon Willison Date: Wed, 28 Jan 2026 18:41:03 -0800 Subject: [PATCH] Add request.form() for multipart form data and file uploads * Add request.form() for multipart form data and file uploads New Request.form() method that handles both application/x-www-form-urlencoded and multipart/form-data content types with streaming parsing. Features: - Streaming multipart parser that doesn't buffer entire body in memory - Files spill to disk above 1MB threshold via SpooledTemporaryFile - files=False (default) discards file content, files=True stores them - Security limits: max_request_size, max_file_size, max_fields, max_files - FormData container with dict-like access and getlist() for multiple values - UploadedFile class with async read(), seek(), filename, content_type, size - Support for RFC 5987 filename* encoding for international filenames Uses multipart-form-data-conformance test suite for validation. * Update views to use request.form() and document new API - Migrate PermissionsDebugView, MessagesDebugView, and CreateTokenView from post_vars() to form() - Add documentation for request.form(), FormData, and UploadedFile classes Centralize multipart defaults and expose stricter limits via Request.form(). Enforce header, part, file, and disk space limits even when files are discarded; detect truncated bodies and client disconnects; and move blocking work off the event loop. Add FormData close/aclose context managers, update internals docs, and expand multipart tests (including len semantics and stricter conformance expectations). --- datasette/utils/asgi.py | 81 +++ datasette/utils/multipart.py | 757 ++++++++++++++++++++++ datasette/views/special.py | 26 +- docs/internals.rst | 131 +++- pyproject.toml | 1 + tests/test_multipart.py | 1152 ++++++++++++++++++++++++++++++++++ 6 files changed, 2133 insertions(+), 15 deletions(-) create mode 100644 datasette/utils/multipart.py create mode 100644 tests/test_multipart.py diff --git a/datasette/utils/asgi.py b/datasette/utils/asgi.py index 7f3329a6..35f243b6 100644 --- a/datasette/utils/asgi.py +++ b/datasette/utils/asgi.py @@ -1,5 +1,21 @@ import json +from typing import Optional from datasette.utils import MultiParams, calculate_etag +from datasette.utils.multipart import ( + parse_form_data, + MultipartParseError, + FormData, + DEFAULT_MAX_FILE_SIZE, + DEFAULT_MAX_REQUEST_SIZE, + DEFAULT_MAX_FIELDS, + DEFAULT_MAX_FILES, + DEFAULT_MAX_PARTS, + DEFAULT_MAX_FIELD_SIZE, + DEFAULT_MAX_MEMORY_FILE_SIZE, + DEFAULT_MAX_PART_HEADER_BYTES, + DEFAULT_MAX_PART_HEADER_LINES, + DEFAULT_MIN_FREE_DISK_BYTES, +) from mimetypes import guess_type from urllib.parse import parse_qs, urlunparse, parse_qsl from pathlib import Path @@ -139,6 +155,71 @@ class Request: body = await self.post_body() return dict(parse_qsl(body.decode("utf-8"), keep_blank_values=True)) + async def form( + self, + files: bool = False, + max_file_size: int = DEFAULT_MAX_FILE_SIZE, + max_request_size: int = DEFAULT_MAX_REQUEST_SIZE, + max_fields: int = DEFAULT_MAX_FIELDS, + max_files: int = DEFAULT_MAX_FILES, + max_parts: Optional[int] = DEFAULT_MAX_PARTS, + max_field_size: int = DEFAULT_MAX_FIELD_SIZE, + max_memory_file_size: int = DEFAULT_MAX_MEMORY_FILE_SIZE, + max_part_header_bytes: int = DEFAULT_MAX_PART_HEADER_BYTES, + max_part_header_lines: int = DEFAULT_MAX_PART_HEADER_LINES, + min_free_disk_bytes: int = DEFAULT_MIN_FREE_DISK_BYTES, + ) -> FormData: + """ + Parse form data from the request body. + + Supports both application/x-www-form-urlencoded and multipart/form-data. + + Args: + files: If True, store file uploads; if False (default), discard them + max_file_size: Maximum size per file in bytes (default 50MB) + max_request_size: Maximum total request size in bytes (default 100MB) + max_fields: Maximum number of form fields (default 1000) + max_files: Maximum number of file uploads (default 100) + max_parts: Maximum number of multipart parts (default max_fields + max_files) + max_field_size: Maximum size of a text field value in bytes (default 100KB) + max_memory_file_size: Threshold before files spill to disk (default 1MB) + max_part_header_bytes: Maximum bytes allowed in part headers (default 16KB) + max_part_header_lines: Maximum header lines per part (default 100) + min_free_disk_bytes: Minimum free bytes required in temp dir (default 50MB) + + Returns: + FormData object with dict-like access to fields and files. + Use form["key"] for first value, form.getlist("key") for all values. + + Raises: + BadRequest: If content-type is missing, unsupported, or parsing fails + """ + content_type = self.headers.get("content-type", "") + if not content_type: + raise BadRequest( + "Missing Content-Type header; expected application/x-www-form-urlencoded " + "or multipart/form-data" + ) + + try: + return await parse_form_data( + receive=self.receive, + content_type=content_type, + files=files, + max_file_size=max_file_size, + max_request_size=max_request_size, + max_fields=max_fields, + max_files=max_files, + max_parts=max_parts, + max_field_size=max_field_size, + max_memory_file_size=max_memory_file_size, + max_part_header_bytes=max_part_header_bytes, + max_part_header_lines=max_part_header_lines, + min_free_disk_bytes=min_free_disk_bytes, + ) + except MultipartParseError as e: + raise BadRequest(str(e)) + @classmethod def fake(cls, path_with_query_string, method="GET", scheme="http", url_vars=None): """Useful for constructing Request objects for tests""" diff --git a/datasette/utils/multipart.py b/datasette/utils/multipart.py new file mode 100644 index 00000000..cfa77486 --- /dev/null +++ b/datasette/utils/multipart.py @@ -0,0 +1,757 @@ +""" +Streaming multipart/form-data parser for ASGI applications. + +Supports: +- Streaming parsing without buffering entire body in memory +- Files spill to disk above configurable threshold +- Security limits on request size, file size, field count +- Both multipart/form-data and application/x-www-form-urlencoded +""" + +import asyncio +import shutil +import tempfile +from dataclasses import dataclass, field +from typing import ( + Any, + Callable, + Dict, + List, + Optional, + Tuple, + Union, +) +from urllib.parse import parse_qsl + +# Centralized defaults for multipart/form-data parsing +DEFAULT_MAX_FILE_SIZE = 50 * 1024 * 1024 # 50MB +DEFAULT_MAX_REQUEST_SIZE = 100 * 1024 * 1024 # 100MB +DEFAULT_MAX_FIELDS = 1000 +DEFAULT_MAX_FILES = 100 +# If max_parts is not specified, it defaults to max_fields + max_files +DEFAULT_MAX_PARTS: Optional[int] = None +DEFAULT_MAX_FIELD_SIZE = 100 * 1024 # 100KB +DEFAULT_MAX_MEMORY_FILE_SIZE = 1024 * 1024 # 1MB +DEFAULT_MAX_PART_HEADER_BYTES = 16 * 1024 # 16KB +DEFAULT_MAX_PART_HEADER_LINES = 100 +DEFAULT_MIN_FREE_DISK_BYTES = 50 * 1024 * 1024 # 50MB + + +class MultipartParseError(Exception): + """Raised when multipart parsing fails.""" + + pass + + +@dataclass +class UploadedFile: + """ + Represents an uploaded file from a multipart form. + + Attributes: + name: The form field name + filename: The original filename from the upload + content_type: The MIME type of the file + size: Size in bytes + """ + + name: str + filename: str + content_type: Optional[str] + size: int + _file: tempfile.SpooledTemporaryFile = field(repr=False) + + async def read(self, size: int = -1) -> bytes: + """Read file contents.""" + return await asyncio.to_thread(self._file.read, size) + + async def seek(self, offset: int, whence: int = 0) -> int: + """Seek to position in file.""" + return await asyncio.to_thread(self._file.seek, offset, whence) + + async def close(self) -> None: + """Close the underlying file.""" + await asyncio.to_thread(self._file.close) + + def close_sync(self) -> None: + """Close the underlying file synchronously.""" + self._file.close() + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc, tb): + await self.close() + + def __del__(self): + try: + self._file.close() + except Exception: + pass + + +class FormData: + """ + Container for parsed form data, supporting both fields and files. + + Provides dict-like access with support for multiple values per key. + """ + + def __init__(self): + self._data: List[Tuple[str, Union[str, UploadedFile]]] = [] + + def append(self, key: str, value: Union[str, UploadedFile]) -> None: + """Add a key-value pair.""" + self._data.append((key, value)) + + def __getitem__(self, key: str) -> Union[str, UploadedFile]: + """Get the first value for a key.""" + for k, v in self._data: + if k == key: + return v + raise KeyError(key) + + def get(self, key: str, default: Any = None) -> Optional[Union[str, UploadedFile]]: + """Get the first value for a key, or default if not found.""" + try: + return self[key] + except KeyError: + return default + + def getlist(self, key: str) -> List[Union[str, UploadedFile]]: + """Get all values for a key.""" + return [v for k, v in self._data if k == key] + + def __contains__(self, key: str) -> bool: + """Check if key exists.""" + return any(k == key for k, _ in self._data) + + def __len__(self) -> int: + """Return number of items.""" + return len(self._data) + + def __iter__(self): + """Iterate over unique keys.""" + seen = set() + for k, _ in self._data: + if k not in seen: + seen.add(k) + yield k + + def keys(self): + """Return unique keys.""" + return list(self) + + def items(self) -> List[Tuple[str, Union[str, UploadedFile]]]: + """Return all key-value pairs.""" + return list(self._data) + + def values(self) -> List[Union[str, UploadedFile]]: + """Return all values.""" + return [v for _, v in self._data] + + def _uploaded_files(self) -> List[UploadedFile]: + """Return UploadedFile instances contained in this form.""" + return [v for _, v in self._data if isinstance(v, UploadedFile)] + + def close(self) -> None: + """ + Close any uploaded files. + + This provides deterministic cleanup for spooled temp files. + """ + for uploaded in self._uploaded_files(): + try: + uploaded.close_sync() + except Exception: + # Best-effort cleanup; ignore close errors + pass + + async def aclose(self) -> None: + """Asynchronously close any uploaded files.""" + for uploaded in self._uploaded_files(): + try: + await uploaded.close() + except Exception: + # Best-effort cleanup; ignore close errors + pass + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + self.close() + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc, tb): + await self.aclose() + + +def parse_content_disposition(header: str) -> Dict[str, Optional[str]]: + """ + Parse Content-Disposition header value. + + Returns dict with 'name', 'filename' keys (filename may be None). + """ + result: Dict[str, Optional[str]] = {"name": None, "filename": None} + + # Split on semicolons, handling quoted strings + parts = [] + current = "" + in_quotes = False + i = 0 + while i < len(header): + char = header[i] + if char == '"' and (i == 0 or header[i - 1] != "\\"): + in_quotes = not in_quotes + current += char + elif char == ";" and not in_quotes: + parts.append(current.strip()) + current = "" + else: + current += char + i += 1 + if current.strip(): + parts.append(current.strip()) + + for part in parts[1:]: # Skip the "form-data" part + if "=" not in part: + continue + + key, _, value = part.partition("=") + key = key.strip().lower() + value = value.strip() + + # Handle filename* (RFC 5987 encoding) + if key == "filename*": + # Format: utf-8''encoded_filename or charset'language'encoded_filename + if "'" in value: + parts_star = value.split("'", 2) + if len(parts_star) >= 3: + # charset = parts_star[0] + # language = parts_star[1] + encoded = parts_star[2] + # URL decode + try: + from urllib.parse import unquote + + result["filename"] = unquote(encoded, encoding="utf-8") + except Exception: + pass + continue + + # Remove quotes if present + if value.startswith('"') and value.endswith('"'): + value = value[1:-1] + # Unescape backslash sequences + value = value.replace('\\"', '"').replace("\\\\", "\\") + + if key == "name": + result["name"] = value + elif key == "filename": + # Only set if filename* hasn't already set it + if result["filename"] is None: + # Strip path components (security) + # Handle both Unix and Windows paths + value = value.replace("\\", "/") + if "/" in value: + value = value.rsplit("/", 1)[-1] + result["filename"] = value + + return result + + +def parse_content_type(header: str) -> Tuple[str, Dict[str, str]]: + """ + Parse Content-Type header value. + + Returns (media_type, parameters_dict). + """ + parts = header.split(";") + media_type = parts[0].strip().lower() + params = {} + + for part in parts[1:]: + part = part.strip() + if "=" in part: + key, _, value = part.partition("=") + key = key.strip().lower() + value = value.strip() + # Remove quotes if present + if value.startswith('"') and value.endswith('"'): + value = value[1:-1] + params[key] = value + + return media_type, params + + +class MultipartParser: + """ + Streaming multipart/form-data parser. + + Processes the body chunk by chunk without loading everything into memory. + """ + + # Parser states + STATE_PREAMBLE = 0 + STATE_HEADER = 1 + STATE_BODY = 2 + STATE_DONE = 3 + + def __init__( + self, + boundary: bytes, + max_file_size: int = DEFAULT_MAX_FILE_SIZE, + max_request_size: int = DEFAULT_MAX_REQUEST_SIZE, + max_fields: int = DEFAULT_MAX_FIELDS, + max_files: int = DEFAULT_MAX_FILES, + max_parts: Optional[int] = DEFAULT_MAX_PARTS, + max_field_size: int = DEFAULT_MAX_FIELD_SIZE, + max_memory_file_size: int = DEFAULT_MAX_MEMORY_FILE_SIZE, + max_part_header_bytes: int = DEFAULT_MAX_PART_HEADER_BYTES, + max_part_header_lines: int = DEFAULT_MAX_PART_HEADER_LINES, + min_free_disk_bytes: int = DEFAULT_MIN_FREE_DISK_BYTES, + handle_files: bool = False, + ): + self.boundary = b"--" + boundary + self.end_boundary = self.boundary + b"--" + self.max_file_size = max_file_size + self.max_request_size = max_request_size + self.max_fields = max_fields + self.max_files = max_files + # If not specified, tie max_parts to the other cardinality limits + if max_parts is None: + max_parts = max_fields + max_files + self.max_parts = max_parts + self.max_field_size = max_field_size + self.max_memory_file_size = max_memory_file_size + self.max_part_header_bytes = max_part_header_bytes + self.max_part_header_lines = max_part_header_lines + self.min_free_disk_bytes = min_free_disk_bytes + self.handle_files = handle_files + + self.state = self.STATE_PREAMBLE + self.buffer = bytearray() + self.total_bytes = 0 + self.field_count = 0 + self.file_count = 0 + self.part_count = 0 + self.current_part_size = 0 + self.current_header_bytes = 0 + self.current_header_lines = 0 + + self.form_data = FormData() + self._disk_check_interval_bytes = 1024 * 1024 # 1MB between disk checks + self._bytes_since_disk_check = 0 + self._tempdir = tempfile.gettempdir() + + # Current part state + self.current_headers: Dict[str, str] = {} + self.current_file: Optional[tempfile.SpooledTemporaryFile] = None + self.current_body = bytearray() + self.current_name: Optional[str] = None + self.current_filename: Optional[str] = None + self.current_content_type: Optional[str] = None + + def feed(self, chunk: bytes) -> None: + """Feed a chunk of data to the parser.""" + self.total_bytes += len(chunk) + if self.total_bytes > self.max_request_size: + raise MultipartParseError("Request body too large") + + self.buffer.extend(chunk) + self._process() + + def _process(self) -> None: + """Process buffered data.""" + while True: + if self.state == self.STATE_PREAMBLE: + if not self._process_preamble(): + break + elif self.state == self.STATE_HEADER: + if not self._process_header(): + break + elif self.state == self.STATE_BODY: + if not self._process_body(): + break + elif self.state == self.STATE_DONE: + break + + def _process_preamble(self) -> bool: + """Skip preamble and find first boundary.""" + # Look for boundary (could be at start or after preamble) + # Try both \r\n prefixed and bare boundary at start + idx = self.buffer.find(self.boundary) + if idx == -1: + # Keep potential partial boundary at end + keep = len(self.boundary) - 1 + if len(self.buffer) > keep: + self.buffer = self.buffer[-keep:] + return False + + # Found boundary, skip to after it + after_boundary = idx + len(self.boundary) + + # Check for end boundary + if self.buffer[idx : idx + len(self.end_boundary)] == self.end_boundary: + self.state = self.STATE_DONE + return False + + # Skip CRLF or LF after boundary + if after_boundary < len(self.buffer): + if self.buffer[after_boundary : after_boundary + 2] == b"\r\n": + after_boundary += 2 + elif self.buffer[after_boundary : after_boundary + 1] == b"\n": + after_boundary += 1 + + self.buffer = self.buffer[after_boundary:] + self.state = self.STATE_HEADER + self.current_headers = {} + self.current_header_bytes = 0 + self.current_header_lines = 0 + return True + + def _process_header(self) -> bool: + """Parse part headers.""" + while True: + # Look for end of header line + crlf_idx = self.buffer.find(b"\r\n") + lf_idx = self.buffer.find(b"\n") + + if crlf_idx == -1 and lf_idx == -1: + # Guard against unbounded header buffering if no newline is ever sent + if len(self.buffer) > self.max_part_header_bytes: + raise MultipartParseError("Part headers too large") + return False # Need more data + + # Use whichever comes first + if crlf_idx != -1 and (lf_idx == -1 or crlf_idx < lf_idx): + idx = crlf_idx + line_end_len = 2 + else: + idx = lf_idx + line_end_len = 1 + + line = self.buffer[:idx] + self.buffer = self.buffer[idx + line_end_len :] + + self.current_header_lines += 1 + self.current_header_bytes += idx + line_end_len + if ( + self.current_header_lines > self.max_part_header_lines + or self.current_header_bytes > self.max_part_header_bytes + ): + raise MultipartParseError("Part headers too large") + + if not line: + # Empty line = end of headers + self._start_body() + self.state = self.STATE_BODY + return True + + # Parse header + try: + line_str = line.decode("utf-8", errors="replace") + except Exception: + line_str = line.decode("latin-1") + + if ":" in line_str: + name, _, value = line_str.partition(":") + self.current_headers[name.strip().lower()] = value.strip() + + def _start_body(self) -> None: + """Initialize body parsing for current part.""" + self.part_count += 1 + if self.part_count > self.max_parts: + raise MultipartParseError("Too many parts") + + # Parse Content-Disposition + cd = self.current_headers.get("content-disposition", "") + parsed = parse_content_disposition(cd) + self.current_name = parsed.get("name") + self.current_filename = parsed.get("filename") + self.current_content_type = self.current_headers.get("content-type") + self.current_part_size = 0 + + if self.current_filename is not None: + # It's a file + self.file_count += 1 + if self.file_count > self.max_files: + raise MultipartParseError("Too many files") + if self.handle_files: + self.current_file = tempfile.SpooledTemporaryFile( + max_size=self.max_memory_file_size + ) + else: + # Will discard file content + self.current_file = None + else: + # It's a text field + self.field_count += 1 + if self.field_count > self.max_fields: + raise MultipartParseError("Too many fields") + self.current_body = bytearray() + self.current_file = None + + # Check disk space before allocating a spooled temp file + if self.current_filename is not None and self.handle_files: + self._ensure_disk_space() + + def _process_body(self) -> bool: + """Process body data for current part.""" + # Look for boundary in buffer + # Need to handle boundary potentially split across chunks + + # The boundary is preceded by \r\n (or \n for lenient parsing) + search_boundary = b"\r\n" + self.boundary + + idx = self.buffer.find(search_boundary) + if idx == -1: + # Try LF-only boundary (lenient) + search_boundary_lf = b"\n" + self.boundary + idx = self.buffer.find(search_boundary_lf) + if idx != -1: + search_boundary = search_boundary_lf + + if idx == -1: + # No boundary found yet + # Keep potential partial boundary at end of buffer + safe_len = len(self.buffer) - len(search_boundary) - 1 + if safe_len > 0: + safe_data = self.buffer[:safe_len] + self._write_body_data(bytes(safe_data)) + self.buffer = self.buffer[safe_len:] + return False + + # Found boundary - write remaining body data + body_data = self.buffer[:idx] + self._write_body_data(bytes(body_data)) + + # Move past the boundary + after_boundary = idx + len(search_boundary) + + # Check for end boundary + remaining = self.buffer[after_boundary:] + if remaining.startswith(b"--"): + # End boundary + self._finish_part() + self.state = self.STATE_DONE + return False + + # Skip CRLF or LF after boundary + if remaining.startswith(b"\r\n"): + after_boundary += 2 + elif remaining.startswith(b"\n"): + after_boundary += 1 + + self.buffer = self.buffer[after_boundary:] + self._finish_part() + self.state = self.STATE_HEADER + self.current_headers = {} + self.current_header_bytes = 0 + self.current_header_lines = 0 + return True + + def _write_body_data(self, data: bytes) -> None: + """Write data to current part body.""" + if not data: + return + + self.current_part_size += len(data) + + if self.current_filename is not None: + # File data + if self.current_part_size > self.max_file_size: + raise MultipartParseError("File too large") + if self.handle_files and self.current_file: + self._bytes_since_disk_check += len(data) + if self._bytes_since_disk_check >= self._disk_check_interval_bytes: + self._ensure_disk_space() + self._bytes_since_disk_check = 0 + self.current_file.write(data) + # else: discard file data + else: + # Field data + if self.current_part_size > self.max_field_size: + raise MultipartParseError("Field value too large") + self.current_body.extend(data) + + def _finish_part(self) -> None: + """Finalize current part and add to form data.""" + if self.current_name is None: + return + + if self.current_filename is not None: + # File + if self.handle_files and self.current_file: + self.current_file.seek(0) + uploaded = UploadedFile( + name=self.current_name, + filename=self.current_filename, + content_type=self.current_content_type, + size=self.current_part_size, + _file=self.current_file, + ) + self.form_data.append(self.current_name, uploaded) + # else: file was discarded + else: + # Text field + try: + value = bytes(self.current_body).decode("utf-8") + except UnicodeDecodeError: + value = bytes(self.current_body).decode("latin-1") + self.form_data.append(self.current_name, value) + + # Reset part state + self.current_file = None + self.current_body = bytearray() + self.current_name = None + self.current_filename = None + self.current_content_type = None + + def finalize(self) -> FormData: + """Finalize parsing and return form data.""" + # Process any remaining data + self._process() + if self.state != self.STATE_DONE: + raise MultipartParseError( + "Truncated multipart body (missing closing boundary)" + ) + return self.form_data + + def _ensure_disk_space(self) -> None: + """ + Ensure there is enough free space on the temp filesystem. + + This is a best-effort guard against filling the disk with uploads. + """ + if not self.handle_files: + return + if self.min_free_disk_bytes <= 0: + return + free_bytes = shutil.disk_usage(self._tempdir).free + if free_bytes < self.min_free_disk_bytes: + raise MultipartParseError("Insufficient disk space for uploads") + + +async def parse_form_data( + receive: Callable, + content_type: str, + files: bool = False, + max_file_size: int = DEFAULT_MAX_FILE_SIZE, + max_request_size: int = DEFAULT_MAX_REQUEST_SIZE, + max_fields: int = DEFAULT_MAX_FIELDS, + max_files: int = DEFAULT_MAX_FILES, + max_parts: Optional[int] = DEFAULT_MAX_PARTS, + max_field_size: int = DEFAULT_MAX_FIELD_SIZE, + max_memory_file_size: int = DEFAULT_MAX_MEMORY_FILE_SIZE, + max_part_header_bytes: int = DEFAULT_MAX_PART_HEADER_BYTES, + max_part_header_lines: int = DEFAULT_MAX_PART_HEADER_LINES, + min_free_disk_bytes: int = DEFAULT_MIN_FREE_DISK_BYTES, +) -> FormData: + """ + Parse form data from an ASGI receive callable. + + Supports both application/x-www-form-urlencoded and multipart/form-data. + + Args: + receive: ASGI receive callable + content_type: Content-Type header value + files: If True, store file uploads; if False, discard them + max_file_size: Maximum size per file in bytes + max_request_size: Maximum total request size in bytes + max_fields: Maximum number of form fields + max_files: Maximum number of file uploads + max_field_size: Maximum size of a text field value + max_memory_file_size: File size threshold before spilling to disk + + Returns: + FormData object containing parsed fields and files + """ + media_type, params = parse_content_type(content_type) + + if media_type == "application/x-www-form-urlencoded": + # Read entire body for URL-encoded forms (they're typically small) + body = bytearray() + total = 0 + while True: + message = await receive() + message_type = message.get("type") + if message_type == "http.disconnect": + raise MultipartParseError("Client disconnected during request body") + if message_type is not None and message_type != "http.request": + continue + chunk = message.get("body", b"") + total += len(chunk) + if total > max_request_size: + raise MultipartParseError("Request body too large") + body.extend(chunk) + if not message.get("more_body", False): + break + + form_data = FormData() + try: + pairs = parse_qsl(bytes(body).decode("utf-8"), keep_blank_values=True) + except UnicodeDecodeError: + pairs = parse_qsl(bytes(body).decode("latin-1"), keep_blank_values=True) + + for key, value in pairs: + form_data.append(key, value) + + return form_data + + elif media_type == "multipart/form-data": + boundary = params.get("boundary") + if not boundary: + raise MultipartParseError("Missing boundary in Content-Type") + + parser = MultipartParser( + boundary=boundary.encode("utf-8"), + max_file_size=max_file_size, + max_request_size=max_request_size, + max_fields=max_fields, + max_files=max_files, + max_parts=max_parts, + max_field_size=max_field_size, + max_memory_file_size=max_memory_file_size, + max_part_header_bytes=max_part_header_bytes, + max_part_header_lines=max_part_header_lines, + min_free_disk_bytes=min_free_disk_bytes, + handle_files=files, + ) + + # Stream body through parser + batch_target = 64 * 1024 + batch = bytearray() + + async def flush_batch() -> None: + if batch: + data = bytes(batch) + batch.clear() + await asyncio.to_thread(parser.feed, data) + + while True: + message = await receive() + message_type = message.get("type") + if message_type == "http.disconnect": + raise MultipartParseError("Client disconnected during request body") + if message_type is not None and message_type != "http.request": + continue + chunk = message.get("body", b"") + if chunk: + batch.extend(chunk) + if len(batch) >= batch_target: + await flush_batch() + if not message.get("more_body", False): + break + + await flush_batch() + return await asyncio.to_thread(parser.finalize) + + else: + raise MultipartParseError( + f"Unsupported Content-Type: {media_type}. " + "Expected application/x-www-form-urlencoded or multipart/form-data" + ) diff --git a/datasette/views/special.py b/datasette/views/special.py index 411363ec..57a3024d 100644 --- a/datasette/views/special.py +++ b/datasette/views/special.py @@ -177,11 +177,11 @@ class PermissionsDebugView(BaseView): async def post(self, request): await self.ds.ensure_permission(action="view-instance", actor=request.actor) await self.ds.ensure_permission(action="permissions-debug", actor=request.actor) - vars = await request.post_vars() - actor = json.loads(vars["actor"]) - permission = vars["permission"] - parent = vars.get("resource_1") or None - child = vars.get("resource_2") or None + form = await request.form() + actor = json.loads(form["actor"]) + permission = form["permission"] + parent = form.get("resource_1") or None + child = form.get("resource_2") or None response, status = await _check_permission_for_actor( self.ds, permission, parent, child, actor @@ -602,9 +602,9 @@ class MessagesDebugView(BaseView): async def post(self, request): await self.ds.ensure_permission(action="view-instance", actor=request.actor) - post = await request.post_vars() - message = post.get("message", "") - message_type = post.get("message_type") or "INFO" + form = await request.form() + message = form.get("message", "") + message_type = form.get("message_type") or "INFO" assert message_type in ("INFO", "WARNING", "ERROR", "all") datasette = self.ds if message_type == "all": @@ -688,11 +688,11 @@ class CreateTokenView(BaseView): async def post(self, request): self.check_permission(request) - post = await request.post_vars() + form = await request.form() errors = [] expires_after = None - if post.get("expire_type"): - duration_string = post.get("expire_duration") + if form.get("expire_type"): + duration_string = form.get("expire_duration") if ( not duration_string or not duration_string.isdigit() @@ -700,7 +700,7 @@ class CreateTokenView(BaseView): ): errors.append("Invalid expire duration") else: - unit = post["expire_type"] + unit = form["expire_type"] if unit == "minutes": expires_after = int(duration_string) * 60 elif unit == "hours": @@ -715,7 +715,7 @@ class CreateTokenView(BaseView): restrict_database = {} restrict_resource = {} - for key in post: + for key in form: if key.startswith("all:") and key.count(":") == 1: restrict_all.append(key.split(":")[1]) elif key.startswith("database:") and key.count(":") == 2: diff --git a/docs/internals.rst b/docs/internals.rst index cfd78593..0491c1f7 100644 --- a/docs/internals.rst +++ b/docs/internals.rst @@ -52,10 +52,59 @@ The request object is passed to various plugin hooks. It represents an incoming ``.actor`` - dictionary (str -> Any) or None The currently authenticated actor (see :ref:`actors `), or ``None`` if the request is unauthenticated. -The object also has two awaitable methods: +The object also has the following awaitable methods: + +``await request.form(files=False, ...)`` - FormData + Parses form data from the request body. Supports both ``application/x-www-form-urlencoded`` and ``multipart/form-data`` content types. + + Returns a :ref:`internals_formdata` object with dict-like access to form fields and uploaded files. + + Requirements and errors: + + - A ``Content-Type`` header is required. Missing or unsupported content types raise ``BadRequest``. + - For ``multipart/form-data``, the ``boundary=...`` parameter is required. + + Parameters: + + - ``files`` (bool, default ``False``): If ``True``, uploaded files are stored and accessible. If ``False`` (default), file content is discarded but form fields are still available. + - ``max_file_size`` (int, default 50MB): Maximum size per uploaded file in bytes. + - ``max_request_size`` (int, default 100MB): Maximum total request body size in bytes. + - ``max_fields`` (int, default 1000): Maximum number of form fields. + - ``max_files`` (int, default 100): Maximum number of uploaded files. + - ``max_parts`` (int, default ``max_fields + max_files``): Maximum number of multipart parts in total. + - ``max_field_size`` (int, default 100KB): Maximum size of a text field value in bytes. + - ``max_memory_file_size`` (int, default 1MB): File size threshold before uploads spill to disk. + - ``max_part_header_bytes`` (int, default 16KB): Maximum total bytes allowed in part headers. + - ``max_part_header_lines`` (int, default 100): Maximum header lines per part. + - ``min_free_disk_bytes`` (int, default 50MB): Minimum free bytes required in the temp directory before accepting file uploads. + + Example usage: + + .. code-block:: python + + # Parse form fields only (files are discarded) + form = await request.form() + username = form["username"] + tags = form.getlist("tags") # For multiple values + + # Parse form fields AND files + form = await request.form(files=True) + uploaded = form["avatar"] + content = await uploaded.read() + print( + uploaded.filename, uploaded.content_type, uploaded.size + ) + + Cleanup note: + + When using ``files=True``, call ``await form.aclose()`` once you are done with the uploads + to ensure spooled temporary files are closed promptly. You can also use + ``async with form: ...`` for automatic cleanup. + + Don't forget to read about :ref:`internals_csrf`! ``await request.post_vars()`` - dictionary - Returns a dictionary of form variables that were submitted in the request body via ``POST``. Don't forget to read about :ref:`internals_csrf`! + Returns a dictionary of form variables that were submitted in the request body via ``POST`` using ``application/x-www-form-urlencoded`` encoding. For multipart forms or file uploads, use ``request.form()`` instead. ``await request.post_body()`` - bytes Returns the un-parsed body of a request submitted by ``POST`` - useful for things like incoming JSON data. @@ -117,6 +166,84 @@ Consider the query string ``?foo=1&foo=2&bar=3`` - with two values for ``foo`` a ``len(request.args)`` - integer Returns the number of keys. +.. _internals_formdata: + +The FormData class +================== + +``await request.form()`` returns a ``FormData`` object - a dictionary-like object which provides access to form fields and uploaded files. It has a similar interface to ``MultiParams``. + +``form[key]`` - string or UploadedFile + Returns the first value for that key, or raises a ``KeyError`` if the key is missing. + +``form.get(key)`` - string, UploadedFile, or None + Returns the first value for that key, or ``None`` if the key is missing. Pass a second argument to specify a different default. + +``form.getlist(key)`` - list + Returns the list of values for that key. If the key is missing an empty list will be returned. + +``form.keys()`` - list of strings + Returns the list of available keys. + +``key in form`` - True or False + You can use ``if key in form`` to check if a key is present. + +``for key in form`` - iterator + This lets you loop through every available key. + +``len(form)`` - integer + Returns the total number of submitted values. + +.. _internals_uploadedfile: + +The UploadedFile class +====================== + +When parsing multipart form data with ``files=True``, file uploads are returned as ``UploadedFile`` objects with the following properties and methods: + +``uploaded_file.name`` - string + The form field name. + +``uploaded_file.filename`` - string + The original filename provided by the client. Note: This is sanitized to remove path components for security. + +``uploaded_file.content_type`` - string or None + The MIME type of the uploaded file, if provided by the client. + +``uploaded_file.size`` - integer + The size of the uploaded file in bytes. + +``await uploaded_file.read(size=-1)`` - bytes + Read and return up to ``size`` bytes from the file. If ``size`` is -1 (default), read the entire file. + +``await uploaded_file.seek(offset, whence=0)`` - integer + Seek to the given position in the file. Returns the new position. + +``await uploaded_file.close()`` + Close the underlying file. This is called automatically when the object is garbage collected. + +Files smaller than 1MB are stored in memory. Larger files are automatically spilled to temporary files on disk and cleaned up when the request completes. + +Example: + +.. code-block:: python + + form = await request.form(files=True) + uploaded = form["document"] + + # Check file metadata + print(f"Filename: {uploaded.filename}") + print(f"Content-Type: {uploaded.content_type}") + print(f"Size: {uploaded.size} bytes") + + # Read file content + content = await uploaded.read() + + # Or read in chunks + await uploaded.seek(0) + while chunk := await uploaded.read(8192): + process_chunk(chunk) + .. _internals_response: Response class diff --git a/pyproject.toml b/pyproject.toml index 6fca673d..d9ef2a73 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -66,6 +66,7 @@ dev = [ "pytest-timeout>=1.4.2", "trustme>=0.7", "cogapp>=3.3.0", + "multipart-form-data-conformance==0.1a0", "ruff>=0.9", # docs "Sphinx==7.4.7", diff --git a/tests/test_multipart.py b/tests/test_multipart.py new file mode 100644 index 00000000..0dc3ecd7 --- /dev/null +++ b/tests/test_multipart.py @@ -0,0 +1,1152 @@ +""" +Tests for request.form() multipart form data parsing. + +Uses TDD approach - these tests are written first, then implementation follows. +""" + +import base64 +import json +import pytest +from collections import namedtuple + +from multipart_form_data_conformance import get_tests_dir + +from datasette.utils.asgi import Request, BadRequest + + +def make_receive(body: bytes): + """Create an async receive callable that yields body in chunks.""" + consumed = False + + async def receive(): + nonlocal consumed + if consumed: + return {"type": "http.request", "body": b"", "more_body": False} + consumed = True + return {"type": "http.request", "body": body, "more_body": False} + + return receive + + +def make_chunked_receive(body: bytes, chunk_size: int = 64): + """Create an async receive callable that yields body in small chunks.""" + offset = 0 + + async def receive(): + nonlocal offset + chunk = body[offset : offset + chunk_size] + offset += chunk_size + more_body = offset < len(body) + return {"type": "http.request", "body": chunk, "more_body": more_body} + + return receive + + +def make_receive_with_noise(body: bytes): + """ + Create an async receive callable that includes an unexpected ASGI message. + + The parser should ignore the unknown message type and continue. + """ + messages = [ + {"type": "http.response.start", "status": 200, "headers": []}, + {"type": "http.request", "body": body, "more_body": False}, + ] + index = 0 + + async def receive(): + nonlocal index + if index >= len(messages): + return {"type": "http.request", "body": b"", "more_body": False} + message = messages[index] + index += 1 + return message + + return receive + + +def make_disconnect_receive(body: bytes, chunk_size: int = 64): + """ + Create an async receive callable that disconnects mid-request. + + The parser should raise on the disconnect. + """ + offset = 0 + disconnected = False + + async def receive(): + nonlocal offset, disconnected + if disconnected: + return {"type": "http.disconnect"} + chunk = body[offset : offset + chunk_size] + offset += chunk_size + more_body = offset < len(body) + if more_body: + disconnected = True + return {"type": "http.request", "body": chunk, "more_body": more_body} + + return receive + + +class TestFormUrlEncoded: + """Test request.form() with application/x-www-form-urlencoded data.""" + + @pytest.mark.asyncio + async def test_basic_form_fields(self): + """Basic URL-encoded form should be parseable via request.form().""" + body = b"username=john&password=secret" + scope = { + "type": "http", + "method": "POST", + "headers": [ + (b"content-type", b"application/x-www-form-urlencoded"), + ], + } + request = Request(scope, make_receive(body)) + + form = await request.form() + + assert form["username"] == "john" + assert form["password"] == "secret" + + @pytest.mark.asyncio + async def test_form_with_multiple_values(self): + """Multiple values for same key should be accessible via getlist().""" + body = b"tag=python&tag=web&tag=api" + scope = { + "type": "http", + "method": "POST", + "headers": [ + (b"content-type", b"application/x-www-form-urlencoded"), + ], + } + request = Request(scope, make_receive(body)) + + form = await request.form() + + assert form["tag"] == "python" # First value + assert form.getlist("tag") == ["python", "web", "api"] + + @pytest.mark.asyncio + async def test_empty_form(self): + """Empty form should return empty FormData.""" + body = b"" + scope = { + "type": "http", + "method": "POST", + "headers": [ + (b"content-type", b"application/x-www-form-urlencoded"), + ], + } + request = Request(scope, make_receive(body)) + + form = await request.form() + + assert len(form) == 0 + + @pytest.mark.asyncio + async def test_form_with_special_characters(self): + """URL-encoded special characters should be decoded properly.""" + body = b"message=hello%20world&emoji=%F0%9F%91%8B" + scope = { + "type": "http", + "method": "POST", + "headers": [ + (b"content-type", b"application/x-www-form-urlencoded"), + ], + } + request = Request(scope, make_receive(body)) + + form = await request.form() + + assert form["message"] == "hello world" + assert form["emoji"] == "👋" + + +class TestMultipartBasic: + """Test request.form() with multipart/form-data (fields only, no files).""" + + @pytest.mark.asyncio + async def test_single_text_field(self): + """Single text field in multipart should be parseable.""" + boundary = "----TestBoundary123" + body = ( + b"------TestBoundary123\r\n" + b'Content-Disposition: form-data; name="username"\r\n' + b"\r\n" + b"john_doe\r\n" + b"------TestBoundary123--\r\n" + ) + scope = { + "type": "http", + "method": "POST", + "headers": [ + (b"content-type", f"multipart/form-data; boundary={boundary}".encode()), + ], + } + request = Request(scope, make_receive(body)) + + form = await request.form() + + assert form["username"] == "john_doe" + + @pytest.mark.asyncio + async def test_multiple_text_fields(self): + """Multiple text fields in multipart should all be accessible.""" + boundary = "----TestBoundary123" + body = ( + b"------TestBoundary123\r\n" + b'Content-Disposition: form-data; name="first_name"\r\n' + b"\r\n" + b"John\r\n" + b"------TestBoundary123\r\n" + b'Content-Disposition: form-data; name="last_name"\r\n' + b"\r\n" + b"Doe\r\n" + b"------TestBoundary123--\r\n" + ) + scope = { + "type": "http", + "method": "POST", + "headers": [ + (b"content-type", f"multipart/form-data; boundary={boundary}".encode()), + ], + } + request = Request(scope, make_receive(body)) + + form = await request.form() + + assert form["first_name"] == "John" + assert form["last_name"] == "Doe" + + @pytest.mark.asyncio + async def test_file_discarded_when_files_false(self): + """File content should be discarded when files=False (default).""" + boundary = "----TestBoundary123" + body = ( + b"------TestBoundary123\r\n" + b'Content-Disposition: form-data; name="title"\r\n' + b"\r\n" + b"My Document\r\n" + b"------TestBoundary123\r\n" + b'Content-Disposition: form-data; name="file"; filename="doc.txt"\r\n' + b"Content-Type: text/plain\r\n" + b"\r\n" + b"File content here\r\n" + b"------TestBoundary123\r\n" + b'Content-Disposition: form-data; name="description"\r\n' + b"\r\n" + b"A sample document\r\n" + b"------TestBoundary123--\r\n" + ) + scope = { + "type": "http", + "method": "POST", + "headers": [ + (b"content-type", f"multipart/form-data; boundary={boundary}".encode()), + ], + } + request = Request(scope, make_receive(body)) + + form = await request.form() # files=False is default + + # Text fields should be present + assert form["title"] == "My Document" + assert form["description"] == "A sample document" + # File should NOT be present + assert "file" not in form + + @pytest.mark.asyncio + async def test_chunked_body_parsing(self): + """Multipart should work when body arrives in small chunks.""" + boundary = "----TestBoundary123" + body = ( + b"------TestBoundary123\r\n" + b'Content-Disposition: form-data; name="username"\r\n' + b"\r\n" + b"john_doe\r\n" + b"------TestBoundary123--\r\n" + ) + scope = { + "type": "http", + "method": "POST", + "headers": [ + (b"content-type", f"multipart/form-data; boundary={boundary}".encode()), + ], + } + # Use small chunks to test streaming parser + request = Request(scope, make_chunked_receive(body, chunk_size=16)) + + form = await request.form() + + assert form["username"] == "john_doe" + + +class TestMultipartWithFiles: + """Test request.form(files=True) for file uploads.""" + + @pytest.mark.asyncio + async def test_single_file_upload(self): + """Single file upload should create UploadedFile object.""" + boundary = "----TestBoundary123" + body = ( + b"------TestBoundary123\r\n" + b'Content-Disposition: form-data; name="document"; filename="test.txt"\r\n' + b"Content-Type: text/plain\r\n" + b"\r\n" + b"Hello, World!\r\n" + b"------TestBoundary123--\r\n" + ) + scope = { + "type": "http", + "method": "POST", + "headers": [ + (b"content-type", f"multipart/form-data; boundary={boundary}".encode()), + ], + } + request = Request(scope, make_receive(body)) + + form = await request.form(files=True) + + uploaded_file = form["document"] + assert uploaded_file.filename == "test.txt" + assert uploaded_file.content_type == "text/plain" + assert await uploaded_file.read() == b"Hello, World!" + assert uploaded_file.size == 13 + + @pytest.mark.asyncio + async def test_mixed_fields_and_files(self): + """Mixed form fields and files should all be accessible.""" + boundary = "----TestBoundary123" + body = ( + b"------TestBoundary123\r\n" + b'Content-Disposition: form-data; name="title"\r\n' + b"\r\n" + b"My Document\r\n" + b"------TestBoundary123\r\n" + b'Content-Disposition: form-data; name="file"; filename="doc.txt"\r\n' + b"Content-Type: text/plain\r\n" + b"\r\n" + b"Document content\r\n" + b"------TestBoundary123\r\n" + b'Content-Disposition: form-data; name="description"\r\n' + b"\r\n" + b"A sample\r\n" + b"------TestBoundary123--\r\n" + ) + scope = { + "type": "http", + "method": "POST", + "headers": [ + (b"content-type", f"multipart/form-data; boundary={boundary}".encode()), + ], + } + request = Request(scope, make_receive(body)) + + form = await request.form(files=True) + + # Text fields + assert form["title"] == "My Document" + assert form["description"] == "A sample" + # File + uploaded_file = form["file"] + assert uploaded_file.filename == "doc.txt" + assert await uploaded_file.read() == b"Document content" + + @pytest.mark.asyncio + async def test_multiple_files_same_name(self): + """Multiple files with same name should be accessible via getlist().""" + boundary = "----TestBoundary123" + body = ( + b"------TestBoundary123\r\n" + b'Content-Disposition: form-data; name="files"; filename="a.txt"\r\n' + b"Content-Type: text/plain\r\n" + b"\r\n" + b"File A\r\n" + b"------TestBoundary123\r\n" + b'Content-Disposition: form-data; name="files"; filename="b.txt"\r\n' + b"Content-Type: text/plain\r\n" + b"\r\n" + b"File B\r\n" + b"------TestBoundary123--\r\n" + ) + scope = { + "type": "http", + "method": "POST", + "headers": [ + (b"content-type", f"multipart/form-data; boundary={boundary}".encode()), + ], + } + request = Request(scope, make_receive(body)) + + form = await request.form(files=True) + + files = form.getlist("files") + assert len(files) == 2 + assert files[0].filename == "a.txt" + assert files[1].filename == "b.txt" + + @pytest.mark.asyncio + async def test_large_file_spills_to_disk(self): + """Files larger than threshold should spill to temp file.""" + boundary = "----TestBoundary123" + # Create a body larger than the in-memory threshold (1MB) + large_content = b"x" * (2 * 1024 * 1024) # 2MB + body = ( + b"------TestBoundary123\r\n" + b'Content-Disposition: form-data; name="bigfile"; filename="large.bin"\r\n' + b"Content-Type: application/octet-stream\r\n" + b"\r\n" + large_content + b"\r\n" + b"------TestBoundary123--\r\n" + ) + scope = { + "type": "http", + "method": "POST", + "headers": [ + (b"content-type", f"multipart/form-data; boundary={boundary}".encode()), + ], + } + request = Request(scope, make_receive(body)) + + form = await request.form(files=True) + + uploaded_file = form["bigfile"] + assert uploaded_file.size == len(large_content) + # Content should still be readable + content = await uploaded_file.read() + assert content == large_content + + @pytest.mark.asyncio + async def test_uploaded_file_seek_and_read(self): + """UploadedFile should support seek and multiple reads.""" + boundary = "----TestBoundary123" + body = ( + b"------TestBoundary123\r\n" + b'Content-Disposition: form-data; name="file"; filename="test.txt"\r\n' + b"Content-Type: text/plain\r\n" + b"\r\n" + b"Hello, World!\r\n" + b"------TestBoundary123--\r\n" + ) + scope = { + "type": "http", + "method": "POST", + "headers": [ + (b"content-type", f"multipart/form-data; boundary={boundary}".encode()), + ], + } + request = Request(scope, make_receive(body)) + + form = await request.form(files=True) + uploaded_file = form["file"] + + # First read + content1 = await uploaded_file.read() + assert content1 == b"Hello, World!" + + # Seek back to start + await uploaded_file.seek(0) + + # Second read + content2 = await uploaded_file.read() + assert content2 == b"Hello, World!" + + +class TestMultipartCleanup: + """Test deterministic cleanup of uploaded files.""" + + @pytest.mark.asyncio + async def test_formdata_close_closes_uploaded_files(self): + boundary = "----TestBoundary123" + body = ( + b"------TestBoundary123\r\n" + b'Content-Disposition: form-data; name="file"; filename="test.txt"\r\n' + b"Content-Type: text/plain\r\n" + b"\r\n" + b"Hello\r\n" + b"------TestBoundary123--\r\n" + ) + scope = { + "type": "http", + "method": "POST", + "headers": [ + (b"content-type", f"multipart/form-data; boundary={boundary}".encode()), + ], + } + request = Request(scope, make_receive(body)) + form = await request.form(files=True) + uploaded_file = form["file"] + + form.close() + + with pytest.raises(ValueError): + await uploaded_file.read() + + @pytest.mark.asyncio + async def test_formdata_async_context_manager_closes_files(self): + boundary = "----TestBoundary123" + body = ( + b"------TestBoundary123\r\n" + b'Content-Disposition: form-data; name="file"; filename="test.txt"\r\n' + b"Content-Type: text/plain\r\n" + b"\r\n" + b"Hello\r\n" + b"------TestBoundary123--\r\n" + ) + scope = { + "type": "http", + "method": "POST", + "headers": [ + (b"content-type", f"multipart/form-data; boundary={boundary}".encode()), + ], + } + request = Request(scope, make_receive(body)) + form = await request.form(files=True) + uploaded_file = form["file"] + + async with form: + pass + + with pytest.raises(ValueError): + await uploaded_file.read() + + +class TestMultipartEdgeCases: + """Test edge cases in multipart parsing.""" + + @pytest.mark.asyncio + async def test_empty_file_upload(self): + """Empty file (filename but no content) should be handled.""" + boundary = "----TestBoundary123" + body = ( + b"------TestBoundary123\r\n" + b'Content-Disposition: form-data; name="file"; filename="empty.txt"\r\n' + b"Content-Type: text/plain\r\n" + b"\r\n" + b"\r\n" + b"------TestBoundary123--\r\n" + ) + scope = { + "type": "http", + "method": "POST", + "headers": [ + (b"content-type", f"multipart/form-data; boundary={boundary}".encode()), + ], + } + request = Request(scope, make_receive(body)) + + form = await request.form(files=True) + + uploaded_file = form["file"] + assert uploaded_file.filename == "empty.txt" + assert uploaded_file.size == 0 + assert await uploaded_file.read() == b"" + + @pytest.mark.asyncio + async def test_filename_with_path(self): + """Filename containing path should extract just the filename.""" + boundary = "----TestBoundary123" + body = ( + b"------TestBoundary123\r\n" + b'Content-Disposition: form-data; name="file"; filename="C:\\Users\\test\\doc.txt"\r\n' + b"Content-Type: text/plain\r\n" + b"\r\n" + b"content\r\n" + b"------TestBoundary123--\r\n" + ) + scope = { + "type": "http", + "method": "POST", + "headers": [ + (b"content-type", f"multipart/form-data; boundary={boundary}".encode()), + ], + } + request = Request(scope, make_receive(body)) + + form = await request.form(files=True) + + # Should extract just the filename, not the full path + uploaded_file = form["file"] + assert uploaded_file.filename == "doc.txt" + + @pytest.mark.asyncio + async def test_missing_content_type_header(self): + """Missing content-type in request should raise BadRequest.""" + body = b"some body" + scope = { + "type": "http", + "method": "POST", + "headers": [], + } + request = Request(scope, make_receive(body)) + + with pytest.raises(BadRequest): + await request.form() + + @pytest.mark.asyncio + async def test_invalid_content_type(self): + """Non-form content-type should raise BadRequest.""" + body = b'{"key": "value"}' + scope = { + "type": "http", + "method": "POST", + "headers": [ + (b"content-type", b"application/json"), + ], + } + request = Request(scope, make_receive(body)) + + with pytest.raises(BadRequest): + await request.form() + + @pytest.mark.asyncio + async def test_missing_boundary(self): + """Multipart without boundary should raise BadRequest.""" + body = b"some body" + scope = { + "type": "http", + "method": "POST", + "headers": [ + (b"content-type", b"multipart/form-data"), + ], + } + request = Request(scope, make_receive(body)) + + with pytest.raises(BadRequest): + await request.form() + + +class TestSecurityLimits: + """Test security limits on form parsing.""" + + @pytest.mark.asyncio + async def test_max_fields_limit(self): + """Should reject requests with too many fields.""" + boundary = "----TestBoundary123" + # Create body with many fields + parts = [] + for i in range(1001): # Default max is 1000 + parts.append( + f"------TestBoundary123\r\n" + f'Content-Disposition: form-data; name="field{i}"\r\n' + f"\r\n" + f"value{i}\r\n" + ) + parts.append("------TestBoundary123--\r\n") + body = "".join(parts).encode() + + scope = { + "type": "http", + "method": "POST", + "headers": [ + (b"content-type", f"multipart/form-data; boundary={boundary}".encode()), + ], + } + request = Request(scope, make_receive(body)) + + with pytest.raises(BadRequest, match="(?i)too many"): + await request.form(max_fields=1000) + + @pytest.mark.asyncio + async def test_max_file_size_limit(self): + """Should reject files exceeding size limit.""" + boundary = "----TestBoundary123" + large_content = b"x" * (11 * 1024 * 1024) # 11MB + body = ( + b"------TestBoundary123\r\n" + b'Content-Disposition: form-data; name="file"; filename="big.bin"\r\n' + b"Content-Type: application/octet-stream\r\n" + b"\r\n" + large_content + b"\r\n" + b"------TestBoundary123--\r\n" + ) + scope = { + "type": "http", + "method": "POST", + "headers": [ + (b"content-type", f"multipart/form-data; boundary={boundary}".encode()), + ], + } + request = Request(scope, make_receive(body)) + + with pytest.raises(BadRequest, match="(?i)file.*too large|too large"): + await request.form(files=True, max_file_size=10 * 1024 * 1024) + + @pytest.mark.asyncio + async def test_max_request_size_limit(self): + """Should reject requests exceeding total size limit.""" + boundary = "----TestBoundary123" + large_content = b"x" * (6 * 1024 * 1024) # 6MB + body = ( + b"------TestBoundary123\r\n" + b'Content-Disposition: form-data; name="file"; filename="big.bin"\r\n' + b"Content-Type: application/octet-stream\r\n" + b"\r\n" + large_content + b"\r\n" + b"------TestBoundary123--\r\n" + ) + scope = { + "type": "http", + "method": "POST", + "headers": [ + (b"content-type", f"multipart/form-data; boundary={boundary}".encode()), + ], + } + request = Request(scope, make_receive(body)) + + with pytest.raises(BadRequest, match="(?i)too large|request.*too large"): + await request.form(files=True, max_request_size=5 * 1024 * 1024) + + +class TestMultipartStrictnessAndLimits: + """Tests that enforce stricter ASGI and multipart behaviors.""" + + @pytest.mark.asyncio + async def test_multipart_truncated_body_is_error(self): + """Truncated multipart without closing boundary should raise.""" + boundary = "----TestBoundary123" + # Missing the final closing boundary line + body = ( + b"------TestBoundary123\r\n" + b'Content-Disposition: form-data; name="field"\r\n' + b"\r\n" + b"value\r\n" + ) + scope = { + "type": "http", + "method": "POST", + "headers": [ + (b"content-type", f"multipart/form-data; boundary={boundary}".encode()), + ], + } + request = Request(scope, make_receive(body)) + + with pytest.raises(BadRequest, match="Truncated multipart body"): + await request.form() + + @pytest.mark.asyncio + async def test_disconnect_mid_body_is_error(self): + """Client disconnect during body streaming should raise.""" + boundary = "----TestBoundary123" + body = ( + b"------TestBoundary123\r\n" + b'Content-Disposition: form-data; name="field"\r\n' + b"\r\n" + b"value\r\n" + b"------TestBoundary123--\r\n" + ) + scope = { + "type": "http", + "method": "POST", + "headers": [ + (b"content-type", f"multipart/form-data; boundary={boundary}".encode()), + ], + } + request = Request(scope, make_disconnect_receive(body, chunk_size=16)) + + with pytest.raises(BadRequest, match="disconnected"): + await request.form() + + @pytest.mark.asyncio + async def test_unknown_asgi_message_type_is_ignored(self): + """Unexpected ASGI message types should be ignored.""" + boundary = "----TestBoundary123" + body = ( + b"------TestBoundary123\r\n" + b'Content-Disposition: form-data; name="field"\r\n' + b"\r\n" + b"value\r\n" + b"------TestBoundary123--\r\n" + ) + scope = { + "type": "http", + "method": "POST", + "headers": [ + (b"content-type", f"multipart/form-data; boundary={boundary}".encode()), + ], + } + request = Request(scope, make_receive_with_noise(body)) + + form = await request.form() + assert form["field"] == "value" + + @pytest.mark.asyncio + async def test_max_files_enforced_even_when_files_false(self): + """File count limits should apply even when file handling is disabled.""" + boundary = "----TestBoundary123" + body = ( + b"------TestBoundary123\r\n" + b'Content-Disposition: form-data; name="f1"; filename="a.txt"\r\n' + b"Content-Type: text/plain\r\n" + b"\r\n" + b"a\r\n" + b"------TestBoundary123\r\n" + b'Content-Disposition: form-data; name="f2"; filename="b.txt"\r\n' + b"Content-Type: text/plain\r\n" + b"\r\n" + b"b\r\n" + b"------TestBoundary123--\r\n" + ) + scope = { + "type": "http", + "method": "POST", + "headers": [ + (b"content-type", f"multipart/form-data; boundary={boundary}".encode()), + ], + } + request = Request(scope, make_receive(body)) + + with pytest.raises(BadRequest, match="Too many files"): + await request.form(files=False, max_files=1) + + @pytest.mark.asyncio + async def test_max_parts_limit(self): + """Total part count should be bounded.""" + boundary = "----TestBoundary123" + body = ( + b"------TestBoundary123\r\n" + b'Content-Disposition: form-data; name="a"\r\n' + b"\r\n" + b"1\r\n" + b"------TestBoundary123\r\n" + b'Content-Disposition: form-data; name="b"\r\n' + b"\r\n" + b"2\r\n" + b"------TestBoundary123--\r\n" + ) + scope = { + "type": "http", + "method": "POST", + "headers": [ + (b"content-type", f"multipart/form-data; boundary={boundary}".encode()), + ], + } + request = Request(scope, make_receive(body)) + + with pytest.raises(BadRequest, match="Too many parts"): + await request.form(max_parts=1) + + @pytest.mark.asyncio + async def test_max_file_size_enforced_even_when_files_false(self): + """File size limits should apply even when file handling is disabled.""" + boundary = "----TestBoundary123" + big_content = b"x" * 2048 + body = ( + b"------TestBoundary123\r\n" + b'Content-Disposition: form-data; name="file"; filename="big.bin"\r\n' + b"Content-Type: application/octet-stream\r\n" + b"\r\n" + big_content + b"\r\n" + b"------TestBoundary123--\r\n" + ) + scope = { + "type": "http", + "method": "POST", + "headers": [ + (b"content-type", f"multipart/form-data; boundary={boundary}".encode()), + ], + } + request = Request(scope, make_receive(body)) + + with pytest.raises(BadRequest, match="File too large"): + await request.form(files=False, max_file_size=1024) + + @pytest.mark.asyncio + async def test_part_header_limits(self): + """Overly large part headers should be rejected.""" + boundary = "----TestBoundary123" + huge_header_value = "x" * 5000 + body = ( + b"------TestBoundary123\r\n" + + f'Content-Disposition: form-data; name="field"; foo="{huge_header_value}"\r\n'.encode() + + b"\r\n" + + b"value\r\n" + + b"------TestBoundary123--\r\n" + ) + scope = { + "type": "http", + "method": "POST", + "headers": [ + (b"content-type", f"multipart/form-data; boundary={boundary}".encode()), + ], + } + request = Request(scope, make_receive(body)) + + with pytest.raises(BadRequest, match="headers too large"): + await request.form(max_part_header_bytes=1024) + + @pytest.mark.asyncio + async def test_insufficient_disk_space_rejects_upload(self, monkeypatch): + """Uploads should be rejected when free disk is below the floor.""" + boundary = "----TestBoundary123" + body = ( + b"------TestBoundary123\r\n" + b'Content-Disposition: form-data; name="file"; filename="test.txt"\r\n' + b"Content-Type: text/plain\r\n" + b"\r\n" + b"Hello\r\n" + b"------TestBoundary123--\r\n" + ) + scope = { + "type": "http", + "method": "POST", + "headers": [ + (b"content-type", f"multipart/form-data; boundary={boundary}".encode()), + ], + } + + DiskUsage = namedtuple("DiskUsage", ("total", "used", "free")) + monkeypatch.setattr( + "datasette.utils.multipart.shutil.disk_usage", + lambda path: DiskUsage(total=100, used=95, free=5), + ) + + request = Request(scope, make_receive(body)) + with pytest.raises(BadRequest, match="Insufficient disk space"): + await request.form(files=True, min_free_disk_bytes=50) + + @pytest.mark.asyncio + async def test_low_disk_space_does_not_block_field_only_forms(self, monkeypatch): + """Low disk space should not reject multipart forms with no file parts.""" + boundary = "----TestBoundary123" + body = ( + b"------TestBoundary123\r\n" + b'Content-Disposition: form-data; name="field"\r\n' + b"\r\n" + b"value\r\n" + b"------TestBoundary123--\r\n" + ) + scope = { + "type": "http", + "method": "POST", + "headers": [ + (b"content-type", f"multipart/form-data; boundary={boundary}".encode()), + ], + } + + DiskUsage = namedtuple("DiskUsage", ("total", "used", "free")) + monkeypatch.setattr( + "datasette.utils.multipart.shutil.disk_usage", + lambda path: DiskUsage(total=100, used=99, free=1), + ) + + request = Request(scope, make_receive(body)) + form = await request.form(files=True, min_free_disk_bytes=50) + assert form["field"] == "value" + + @pytest.mark.asyncio + async def test_headers_without_newline_hit_header_byte_limit(self): + """Headers that never terminate should still hit the header byte limit.""" + boundary = "----TestBoundary123" + huge = b"x" * 5000 + # No CRLF is included after the header line + body = ( + b"------TestBoundary123\r\n" + b'Content-Disposition: form-data; name="field"; foo="' + huge + b'"' + ) + scope = { + "type": "http", + "method": "POST", + "headers": [ + (b"content-type", f"multipart/form-data; boundary={boundary}".encode()), + ], + } + request = Request(scope, make_receive(body)) + + with pytest.raises(BadRequest, match="headers too large"): + await request.form(max_part_header_bytes=1024) + + +class TestFormDataLenSemantics: + """Test that FormData.__len__ reflects number of items, not unique keys.""" + + @pytest.mark.asyncio + async def test_len_counts_items(self): + body = b"tag=python&tag=web&tag=api" + scope = { + "type": "http", + "method": "POST", + "headers": [ + (b"content-type", b"application/x-www-form-urlencoded"), + ], + } + request = Request(scope, make_receive(body)) + + form = await request.form() + assert len(form) == 3 + + +# Conformance test suite using multipart-form-data-conformance + +# Tests where our parser intentionally differs from strict spec for security/practicality +# Our parser sanitizes filenames (strips paths) while the conformance suite expects raw +FILENAME_SANITIZATION_TESTS = { + "026-filename-with-backslash", # We preserve backslashes but they test expects raw + "029-filename-path-traversal", # We strip path components for security +} + +# Tests for optional/lenient features we don't implement +OPTIONAL_TESTS = { + "085-header-folding", # Obsolete header folding feature +} + +# Tests for malformed input where we're lenient instead of erroring +LENIENT_PARSING_TESTS = { + "203-missing-content-disposition", + "204-invalid-content-disposition", +} + + +def load_conformance_test_cases(): + """Load all test cases from multipart-form-data-conformance.""" + tests_dir = get_tests_dir() + test_cases = [] + + for category_dir in sorted(tests_dir.iterdir()): + if not category_dir.is_dir(): + continue + for test_dir in sorted(category_dir.iterdir()): + if not test_dir.is_dir(): + continue + test_json = test_dir / "test.json" + headers_json = test_dir / "headers.json" + input_raw = test_dir / "input.raw" + + if not all(f.exists() for f in [test_json, headers_json, input_raw]): + continue + + with open(test_json) as f: + test_spec = json.load(f) + with open(headers_json) as f: + headers = json.load(f) + with open(input_raw, "rb") as f: + body = f.read() + + test_id = test_spec["id"] + + # Add marks for tests we handle differently + marks = [] + if test_id in FILENAME_SANITIZATION_TESTS: + marks.append( + pytest.mark.xfail(reason="Parser sanitizes filenames for security") + ) + elif test_id in OPTIONAL_TESTS: + marks.append( + pytest.mark.xfail(reason="Optional feature not implemented") + ) + elif test_id in LENIENT_PARSING_TESTS: + marks.append( + pytest.mark.xfail(reason="Parser is lenient with malformed input") + ) + + test_cases.append( + pytest.param( + test_spec, + headers, + body, + id=test_id, + marks=marks, + ) + ) + + return test_cases + + +CONFORMANCE_TEST_CASES = load_conformance_test_cases() + + +@pytest.mark.parametrize("test_spec,headers,body", CONFORMANCE_TEST_CASES) +@pytest.mark.asyncio +async def test_conformance(test_spec, headers, body): + """ + Run conformance test cases from multipart-form-data-conformance. + + Each test case specifies: + - headers: HTTP headers including Content-Type with boundary + - body: Raw multipart body bytes + - expected: Expected parse result (valid/invalid, parts list) + """ + scope = { + "type": "http", + "method": "POST", + "headers": [(k.encode(), v.encode()) for k, v in headers.items()], + } + request = Request(scope, make_receive(body)) + + expected = test_spec["expected"] + + if not expected["valid"]: + # Should raise an error for invalid input + with pytest.raises((BadRequest, ValueError)): + await request.form(files=True) + return + + # Parse form data + form = await request.form(files=True) + + # Verify each expected part + for i, expected_part in enumerate(expected["parts"]): + name = expected_part["name"] + + # Get value(s) for this name + values = form.getlist(name) + + # Find the value at the correct index for this name + # (handles multiple values with same name) + same_name_count = sum(1 for p in expected["parts"][:i] if p["name"] == name) + + if same_name_count >= len(values): + pytest.fail( + f"Expected part {name} at index {same_name_count} but only {len(values)} found" + ) + + value = values[same_name_count] + + # Determine expected content + if "body_base64" in expected_part: + expected_content = base64.b64decode(expected_part["body_base64"]) + elif "body_text" in expected_part: + expected_content = expected_part["body_text"].encode("utf-8") + else: + expected_content = None + + # Check for file vs field + # A part is a file if it has a filename OR filename_star + is_file = ( + expected_part.get("filename") is not None + or expected_part.get("filename_star") is not None + ) + + if is_file: + # It's a file + assert hasattr(value, "filename"), f"Expected file for {name}" + + # Check filename - use filename_star if present, else filename + expected_filename = expected_part.get("filename_star") or expected_part.get( + "filename" + ) + if expected_filename: + assert ( + value.filename == expected_filename + ), f"Filename mismatch: expected {expected_filename!r}, got {value.filename!r}" + + if expected_part.get("content_type"): + assert value.content_type == expected_part["content_type"] + + content = await value.read() + assert ( + len(content) == expected_part["body_size"] + ), f"Size mismatch: expected {expected_part['body_size']}, got {len(content)}" + if expected_content is not None: + assert content == expected_content + else: + # It's a text field + if hasattr(value, "filename"): + pytest.fail(f"Expected text field for {name}, got file") + + if expected_content is not None: + # For text fields, value is a string + try: + expected_text = expected_content.decode("utf-8") + except UnicodeDecodeError: + expected_text = expected_content.decode("latin-1") + assert ( + value == expected_text + ), f"Value mismatch: expected {expected_text!r}, got {value!r}"