Extract query page rendering into new QueryPage module

Refactor QueryView.get() by moving query execution, CSV streaming,
renderer dispatch, and HTML template rendering into a new
datasette/query_page.py module. Share renderer dispatch logic between
query and table views via dispatch_renderer(). Fix query view passing
view_name="query" instead of "table" to output renderers.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Simon Willison 2026-02-15 08:49:41 -08:00
commit 5602f3efa5
4 changed files with 880 additions and 402 deletions

835
datasette/query_page.py Normal file
View file

@ -0,0 +1,835 @@
"""
Public API for displaying interactive pages of SQL query results.
This module provides :class:`QueryPage`, a class-based view (CBV) that
executes a SQL query and renders an interactive page showing the results.
It supports HTML, JSON, and CSV output formats.
This is the shared foundation used by both the ``/db/-/query`` and
``/db/table`` views internally. Plugins can import and use this class
directly, or subclass it for customization.
Simple usage in a plugin::
from datasette import hookimpl
from datasette.query_page import QueryPage
@hookimpl
def register_routes(datasette):
return [
(r"/my-query", my_query_view),
]
async def my_query_view(datasette, request):
page = QueryPage(
datasette,
request,
database="_internal",
sql="select * from catalog_tables",
)
return await page.response()
Subclass for customization::
class MyQueryPage(QueryPage):
async def title(self):
return "My Custom Results"
async def extra_context(self):
return {"custom_key": "value"}
async def my_view(datasette, request):
return await MyQueryPage(
datasette, request,
database="_internal",
sql="select * from catalog_tables",
).response()
"""
import asyncio
import hashlib
import markupsafe
import textwrap
from datasette.database import QueryInterrupted
from datasette.plugins import pm
from datasette.resources import DatabaseResource
from datasette.utils import (
add_cors_headers,
await_me_maybe,
call_with_supported_arguments,
format_bytes,
is_url,
make_slot_function,
named_parameters as derive_named_parameters,
path_with_added_args,
path_with_format,
path_with_removed_args,
to_css_class,
truncate_url,
validate_sql_select,
InvalidSql,
sqlite3,
)
from datasette.utils.asgi import NotFound, Response
from datasette.views.base import DatasetteError, stream_csv
class QueryPage:
"""Render an interactive page displaying SQL query results.
This class encapsulates the logic for executing a SQL query, formatting
the result rows for display, and rendering an HTML page (or JSON/CSV
response) with those results.
It is designed in the style of a class-based view: instantiate with
the necessary parameters, then call :meth:`response` to get back a
:class:`Response`. Override methods on a subclass for customization.
Args:
datasette: The :class:`~datasette.Datasette` instance.
request: The incoming :class:`~datasette.utils.asgi.Request`.
database: Name of the database to query.
sql: The SQL query string.
params: Optional dict of query parameters for named placeholders.
editable: Whether to show an editable SQL editor (default True).
canned_query: Optional canned query dict (used internally).
private: Whether this is a private/restricted resource (default False).
templates: Optional list of Jinja template names to try.
extra_template_context: Optional dict merged into the template context.
"""
def __init__(
self,
datasette,
request,
database,
sql,
params=None,
*,
editable=True,
canned_query=None,
private=False,
templates=None,
extra_template_context=None,
):
self.datasette = datasette
self.request = request
self.database = database
self.sql = sql
self.params = params or {}
self.editable = editable
self.canned_query = canned_query
self.private = private
self._templates = templates
self._extra_template_context = extra_template_context or {}
#: Set to True by :meth:`execute_query` if results were truncated.
self.truncated = False
# ------------------------------------------------------------------
# Override points subclass and override these for customisation
# ------------------------------------------------------------------
async def title(self):
"""Return the page title. Override for custom titles."""
if self.canned_query and self.canned_query.get("title"):
return self.canned_query["title"]
return self.database
async def execute_query(self):
"""Execute the SQL query and return ``(columns, rows, error)``.
*columns* is a list of column name strings.
*rows* is a list of row tuples/dicts.
*error* is a string error message or ``None``.
Override this to customise query execution for example to add
pagination, modify the SQL, or query a different source entirely.
.. note::
Pagination for arbitrary queries requires knowing which column(s)
can serve as a reliable sort key for keyset pagination. Since this
cannot be guessed from arbitrary SQL, a future implementation
could accept sort column hints via query string parameters
(e.g. ``?_sort=id``). The table view already implements keyset
pagination using the table's primary keys.
"""
query_error = None
columns = []
rows = []
if not self.sql:
return columns, rows, query_error
canned_query_write = bool(
self.canned_query and self.canned_query.get("write")
)
if canned_query_write:
# Write queries don't execute on GET
return columns, rows, query_error
extra_args = {}
if self.params.get("_timelimit"):
extra_args["custom_time_limit"] = int(self.params["_timelimit"])
try:
if not self.canned_query:
validate_sql_select(self.sql)
else:
# Canned queries can use magic parameters
from datasette.views.database import MagicParameters
self.params = MagicParameters(
self.sql, self.params, self.request, self.datasette
)
await self.params.execute_params()
results = await self.datasette.execute(
self.database, self.sql, self.params, truncate=True, **extra_args
)
columns = results.columns
rows = results.rows
self.truncated = results.truncated
except QueryInterrupted as ex:
raise DatasetteError(
textwrap.dedent(
"""
<p>SQL query took too long. The time limit is controlled by the
<a href="https://docs.datasette.io/en/stable/settings.html#sql-time-limit-ms">sql_time_limit_ms</a>
configuration option.</p>
<textarea style="width: 90%">{}</textarea>
<script>
let ta = document.querySelector("textarea");
ta.style.height = ta.scrollHeight + "px";
</script>
""".format(
markupsafe.escape(ex.sql)
)
).strip(),
title="SQL Interrupted",
status=400,
message_is_html=True,
)
except sqlite3.DatabaseError as ex:
query_error = str(ex)
except (sqlite3.OperationalError, InvalidSql) as ex:
raise DatasetteError(str(ex), title="Invalid SQL", status=400)
return columns, rows, query_error
async def display_rows(self, rows, columns):
"""Format raw result rows into display-ready values.
Returns a list of lists of display values (strings or Markup).
Override to customise how cell values are rendered.
"""
return await _display_rows(
self.datasette, self.database, self.request, rows, columns
)
async def get_templates(self):
"""Return ordered list of Jinja template names to try.
Override to use a custom template for your page.
"""
if self._templates:
return list(self._templates)
templates = [
"query-{}.html".format(to_css_class(self.database)),
"query.html",
]
if self.canned_query:
templates.insert(
0,
"query-{}-{}.html".format(
to_css_class(self.database),
to_css_class(self.canned_query["name"]),
),
)
return templates
async def extra_context(self):
"""Return a dict of extra template context variables.
Override to inject additional context into the template. This is
the simplest customisation point for adding data to the page.
"""
return {}
async def query_actions(self):
"""Return list of action links for the query action menu.
Override to add custom action links.
"""
links = []
for hook in pm.hook.query_actions(
datasette=self.datasette,
actor=self.request.actor,
database=self.database,
query_name=(
self.canned_query["name"] if self.canned_query else None
),
request=self.request,
sql=self.sql,
params=self.params,
):
extra_links = await await_me_maybe(hook)
if extra_links:
links.extend(extra_links)
return links
# ------------------------------------------------------------------
# Core response method
# ------------------------------------------------------------------
async def response(self):
"""Execute the query and return the appropriate :class:`Response`.
Dispatches to HTML, JSON, CSV, or a plugin renderer based on the
requested format (derived from the URL extension or query params).
Returns:
:class:`~datasette.utils.asgi.Response`
"""
format_ = self.request.url_vars.get("format") or "html"
columns, rows, error = await self.execute_query()
if format_ == "csv":
return await self._csv_response()
if format_ in self.datasette.renderers.keys():
return await self._renderer_response(
format_, columns, rows, error
)
if format_ == "html":
return await self._html_response(columns, rows, error)
raise NotFound("Invalid format: {}".format(format_))
# ------------------------------------------------------------------
# Format-specific response builders
# ------------------------------------------------------------------
async def _csv_response(self):
"""Return a streaming CSV response."""
sql = self.sql
params = self.params
async def fetch_data_for_csv(request, _next=None):
db = self.datasette.get_database(self.database)
results = await db.execute(sql, params, truncate=True)
data = {"rows": results.rows, "columns": results.columns}
return data, None, None
return await stream_csv(
self.datasette, fetch_data_for_csv, self.request, self.database
)
async def _renderer_response(self, format_, columns, rows, error):
"""Dispatch to a plugin output renderer."""
result = call_with_supported_arguments(
self.datasette.renderers[format_][0],
datasette=self.datasette,
columns=columns,
rows=rows,
sql=self.sql,
query_name=(
self.canned_query["name"] if self.canned_query else None
),
database=self.database,
table=None,
request=self.request,
view_name="query",
truncated=self.truncated,
error=error,
# Deprecated but kept for backwards compat:
args=self.request.args,
data={"ok": True, "rows": rows, "columns": columns},
)
if asyncio.iscoroutine(result):
result = await result
if result is None:
raise NotFound("No data")
if isinstance(result, dict):
r = Response(
body=result.get("body"),
status=result.get("status_code") or 200,
content_type=result.get("content_type", "text/plain"),
headers=result.get("headers"),
)
elif isinstance(result, Response):
r = result
else:
raise AssertionError(
"{} should be dict or Response".format(result)
)
if self.datasette.cors:
add_cors_headers(r.headers)
return r
async def _html_response(self, columns, rows, error):
"""Build and return the HTML response."""
datasette = self.datasette
request = self.request
database = self.database
db = datasette.get_database(database)
templates = await self.get_templates()
environment = datasette.get_jinja_environment(request)
template = environment.select_template(templates)
alternate_url_json = datasette.absolute_url(
request,
datasette.urls.path(
path_with_format(request=request, format="json")
),
)
headers = {
"Link": '<{}>; rel="alternate"; type="application/json+datasette"'.format(
alternate_url_json
)
}
metadata = await datasette.get_database_metadata(database)
display_rows = await self.display_rows(rows, columns)
# Named parameters
named_parameters = []
if self.canned_query and self.canned_query.get("params"):
named_parameters = self.canned_query["params"]
if not named_parameters and self.sql:
named_parameters = derive_named_parameters(self.sql)
named_parameter_values = {
p: self.params.get(p) or ""
for p in named_parameters
if not p.startswith("_")
}
# Renderers
renderers = await self._available_renderers(columns, rows)
allow_execute_sql = await datasette.allowed(
action="execute-sql",
resource=DatabaseResource(database=database),
actor=request.actor,
)
# Show/hide SQL controls
canned_query_write = bool(
self.canned_query and self.canned_query.get("write")
)
show_hide_hidden, hide_sql, show_hide_link, show_hide_text = (
self._show_hide_sql_controls()
)
# Edit SQL URL
edit_sql_url = self._edit_sql_url(
allow_execute_sql, named_parameter_values
)
# Tables for autocomplete
from datasette.views.database import (
get_tables,
_table_columns,
)
allowed_tables_page = await datasette.allowed_resources(
"view-table",
request.actor,
parent=database,
include_is_private=True,
limit=1000,
)
allowed_dict = {r.child: r for r in allowed_tables_page.resources}
context = {
"database": database,
"database_color": db.color,
"query": {"sql": self.sql, "params": self.params},
"canned_query": (
self.canned_query["name"] if self.canned_query else None
),
"private": self.private,
"canned_query_write": canned_query_write,
"db_is_immutable": not db.is_mutable,
"error": error,
"hide_sql": hide_sql,
"show_hide_link": datasette.urls.path(show_hide_link),
"show_hide_text": show_hide_text,
"editable": self.editable and not self.canned_query,
"allow_execute_sql": allow_execute_sql,
"tables": await get_tables(
datasette, request, db, allowed_dict
),
"named_parameter_values": named_parameter_values,
"edit_sql_url": edit_sql_url,
"display_rows": display_rows,
"table_columns": (
await _table_columns(datasette, database)
if allow_execute_sql
else {}
),
"columns": columns,
"renderers": renderers,
"url_csv": datasette.urls.path(
path_with_format(
request=request,
format="csv",
extra_qs={"_size": "max"},
)
),
"show_hide_hidden": markupsafe.Markup(show_hide_hidden),
"metadata": self.canned_query or metadata,
"alternate_url_json": alternate_url_json,
"select_templates": [
"{}{}".format(
"*" if t == template.name else "", t
)
for t in templates
],
"top_query": make_slot_function(
"top_query",
datasette,
request,
database=database,
sql=self.sql,
),
"top_canned_query": make_slot_function(
"top_canned_query",
datasette,
request,
database=database,
query_name=(
self.canned_query["name"]
if self.canned_query
else None
),
),
"query_actions": self.query_actions,
}
# Merge extra context from subclass
extra_ctx = await self.extra_context()
if extra_ctx:
context.update(extra_ctx)
# Merge extra context from constructor
if self._extra_template_context:
context.update(self._extra_template_context)
r = Response.html(
await datasette.render_template(
template,
context,
request=request,
view_name="database",
),
headers=headers,
)
if datasette.cors:
add_cors_headers(r.headers)
return r
# ------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------
def _show_hide_sql_controls(self):
"""Compute the show/hide SQL toggle state.
Returns (show_hide_hidden, hide_sql, show_hide_link, show_hide_text).
"""
params = {key: self.request.args.get(key) for key in self.request.args}
show_hide_hidden = ""
if self.canned_query and self.canned_query.get("hide_sql"):
if bool(params.get("_show_sql")):
show_hide_link = path_with_removed_args(
self.request, {"_show_sql"}
)
show_hide_text = "hide"
show_hide_hidden = (
'<input type="hidden" name="_show_sql" value="1">'
)
else:
show_hide_link = path_with_added_args(
self.request, {"_show_sql": 1}
)
show_hide_text = "show"
else:
if bool(params.get("_hide_sql")):
show_hide_link = path_with_removed_args(
self.request, {"_hide_sql"}
)
show_hide_text = "show"
show_hide_hidden = (
'<input type="hidden" name="_hide_sql" value="1">'
)
else:
show_hide_link = path_with_added_args(
self.request, {"_hide_sql": 1}
)
show_hide_text = "hide"
hide_sql = show_hide_text == "show"
return show_hide_hidden, hide_sql, show_hide_link, show_hide_text
def _edit_sql_url(self, allow_execute_sql, named_parameter_values):
"""Build the 'Edit SQL' URL for canned queries, or None."""
if not self.canned_query:
return None
is_validated = False
try:
validate_sql_select(self.sql)
is_validated = True
except InvalidSql:
pass
if allow_execute_sql and is_validated and ":_" not in self.sql:
from urllib.parse import urlencode
return (
self.datasette.urls.database(self.database)
+ "/-/query"
+ "?"
+ urlencode({"sql": self.sql, **named_parameter_values})
)
return None
async def _available_renderers(self, columns, rows):
"""Build dict of {renderer_name: url} for available output formats."""
renderers = {}
for key, (_, can_render) in self.datasette.renderers.items():
it_can_render = call_with_supported_arguments(
can_render,
datasette=self.datasette,
columns=columns or [],
rows=rows or [],
sql=self.sql,
query_name=(
self.canned_query["name"] if self.canned_query else None
),
database=self.database,
table=None,
request=self.request,
view_name="database",
)
it_can_render = await await_me_maybe(it_can_render)
if it_can_render:
renderers[key] = self.datasette.urls.path(
path_with_format(request=self.request, format=key)
)
return renderers
# ------------------------------------------------------------------
# Class-level convenience for use as a route handler
# ------------------------------------------------------------------
@classmethod
def view(cls, datasette, database, sql, **kwargs):
"""Return an async view function suitable for use with register_routes.
Usage::
@hookimpl
def register_routes(datasette):
return [
(r"/my-query", QueryPage.view(
datasette,
database="_internal",
sql="select * from catalog_tables",
)),
]
"""
async def _view(datasette_arg, request):
page = cls(
datasette_arg,
request,
database=database,
sql=sql,
**kwargs,
)
return await page.response()
return _view
# ------------------------------------------------------------------
# Shared utility: format rows for display (used by both query and table views)
# ------------------------------------------------------------------
async def _display_rows(datasette, database, request, rows, columns):
"""Format raw query result rows into display-ready values.
Used by both the query page and the table page for rendering cell
values in HTML. Calls the ``render_cell`` plugin hook for each cell.
Args:
datasette: Datasette instance
database: Database name string
request: Request object
rows: List of row tuples
columns: List of column name strings
Returns:
List of lists of display values (strings or :class:`markupsafe.Markup`)
"""
display_rows = []
truncate_cells = datasette.setting("truncate_cells_html")
for row in rows:
display_row = []
for column, value in zip(columns, row):
display_value = value
# Let plugins have a go
plugin_display_value = None
for candidate in pm.hook.render_cell(
row=row,
value=value,
column=column,
table=None,
database=database,
datasette=datasette,
request=request,
):
candidate = await await_me_maybe(candidate)
if candidate is not None:
plugin_display_value = candidate
break
if plugin_display_value is not None:
display_value = plugin_display_value
else:
if value in ("", None):
display_value = markupsafe.Markup("&nbsp;")
elif is_url(str(display_value).strip()):
display_value = markupsafe.Markup(
'<a href="{url}">{truncated_url}</a>'.format(
url=markupsafe.escape(value.strip()),
truncated_url=markupsafe.escape(
truncate_url(value.strip(), truncate_cells)
),
)
)
elif isinstance(display_value, bytes):
blob_url = path_with_format(
request=request,
format="blob",
extra_qs={
"_blob_column": column,
"_blob_hash": hashlib.sha256(
display_value
).hexdigest(),
},
)
formatted = format_bytes(len(value))
display_value = markupsafe.Markup(
'<a class="blob-download" href="{}"{}>'
"&lt;Binary:&nbsp;{:,}&nbsp;byte{}&gt;</a>".format(
blob_url,
(
' title="{}"'.format(formatted)
if "bytes" not in formatted
else ""
),
len(value),
"" if len(value) == 1 else "s",
)
)
else:
display_value = str(value)
if truncate_cells and len(display_value) > truncate_cells:
display_value = (
display_value[:truncate_cells] + "\u2026"
)
display_row.append(display_value)
display_rows.append(display_row)
return display_rows
# ------------------------------------------------------------------
# Shared format dispatch helper (used by table_view too)
# ------------------------------------------------------------------
async def dispatch_renderer(
datasette,
request,
format_,
columns,
rows,
sql,
*,
database,
table=None,
query_name=None,
truncated=False,
error=None,
view_name="table",
data=None,
):
"""Dispatch a request to a plugin output renderer.
This is the shared code used by both the query view and the table
view for handling non-HTML, non-CSV output formats registered by
plugins via the ``register_output_renderer`` hook.
Args:
datasette: Datasette instance
request: Request object
format_: The format string (e.g. "json")
columns: List of column names
rows: List of result rows
sql: The SQL query string
database: Database name
table: Optional table name
query_name: Optional canned query name
truncated: Whether results were truncated
error: Error message or None
view_name: View name string for plugin hooks
data: Optional full data dict for backwards compat
Returns:
:class:`Response` object
"""
result = call_with_supported_arguments(
datasette.renderers[format_][0],
datasette=datasette,
columns=columns,
rows=rows,
sql=sql,
query_name=query_name,
database=database,
table=table,
request=request,
view_name=view_name,
truncated=truncated,
error=error,
# Deprecated but kept for backwards compat:
args=request.args,
data=data or {"ok": True, "rows": rows, "columns": columns},
)
if asyncio.iscoroutine(result):
result = await result
if result is None:
raise NotFound("No data")
if isinstance(result, dict):
r = Response(
body=result.get("body"),
status=result.get("status_code") or 200,
content_type=result.get("content_type", "text/plain"),
headers=result.get("headers"),
)
elif isinstance(result, Response):
r = result
else:
raise AssertionError("{} should be dict or Response".format(result))
if datasette.cors:
add_cors_headers(r.headers)
return r

View file

@ -509,26 +509,15 @@ class QueryView(View):
async def get(self, request, datasette):
from datasette.app import TableNotFound
from datasette.query_page import QueryPage
await datasette.refresh_schemas()
db = await datasette.resolve_database(request)
database = db.name
# Get all tables/views this actor can see in bulk with private flag
allowed_tables_page = await datasette.allowed_resources(
"view-table",
request.actor,
parent=database,
include_is_private=True,
limit=1000,
)
# Create lookup dict for quick access
allowed_dict = {r.child: r for r in allowed_tables_page.resources}
# Are we a canned query?
canned_query = None
canned_query_write = False
if "table" in request.url_vars:
try:
await datasette.resolve_table(request)
@ -539,7 +528,6 @@ class QueryView(View):
)
if canned_query is None:
raise
canned_query_write = bool(canned_query.get("write"))
private = False
if canned_query:
@ -551,7 +539,6 @@ class QueryView(View):
)
if not visible:
raise Forbidden("You do not have permission to view this query")
else:
await datasette.ensure_permission(
action="execute-sql",
@ -574,305 +561,23 @@ class QueryView(View):
named_parameters = canned_query["params"]
if not named_parameters:
named_parameters = derive_named_parameters(sql)
named_parameter_values = {
named_parameter: params.get(named_parameter) or ""
for named_parameter in named_parameters
if not named_parameter.startswith("_")
}
# Set to blank string if missing from params
for named_parameter in named_parameters:
if named_parameter not in params and not named_parameter.startswith("_"):
params[named_parameter] = ""
extra_args = {}
if params.get("_timelimit"):
extra_args["custom_time_limit"] = int(params["_timelimit"])
format_ = request.url_vars.get("format") or "html"
query_error = None
results = None
rows = []
columns = []
params_for_query = params
if not canned_query_write:
try:
if not canned_query:
# For regular queries we only allow SELECT, plus other rules
validate_sql_select(sql)
else:
# Canned queries can run magic parameters
params_for_query = MagicParameters(sql, params, request, datasette)
await params_for_query.execute_params()
results = await datasette.execute(
database, sql, params_for_query, truncate=True, **extra_args
)
columns = results.columns
rows = results.rows
except QueryInterrupted as ex:
raise DatasetteError(
textwrap.dedent(
"""
<p>SQL query took too long. The time limit is controlled by the
<a href="https://docs.datasette.io/en/stable/settings.html#sql-time-limit-ms">sql_time_limit_ms</a>
configuration option.</p>
<textarea style="width: 90%">{}</textarea>
<script>
let ta = document.querySelector("textarea");
ta.style.height = ta.scrollHeight + "px";
</script>
""".format(
markupsafe.escape(ex.sql)
)
).strip(),
title="SQL Interrupted",
status=400,
message_is_html=True,
)
except sqlite3.DatabaseError as ex:
query_error = str(ex)
results = None
rows = []
columns = []
except (sqlite3.OperationalError, InvalidSql) as ex:
raise DatasetteError(str(ex), title="Invalid SQL", status=400)
except sqlite3.OperationalError as ex:
raise DatasetteError(str(ex))
except DatasetteError:
raise
# Handle formats from plugins
if format_ == "csv":
async def fetch_data_for_csv(request, _next=None):
results = await db.execute(sql, params, truncate=True)
data = {"rows": results.rows, "columns": results.columns}
return data, None, None
return await stream_csv(datasette, fetch_data_for_csv, request, db.name)
elif format_ in datasette.renderers.keys():
# Dispatch request to the correct output format renderer
# (CSV is not handled here due to streaming)
result = call_with_supported_arguments(
datasette.renderers[format_][0],
datasette=datasette,
columns=columns,
rows=rows,
sql=sql,
query_name=canned_query["name"] if canned_query else None,
database=database,
table=None,
request=request,
view_name="table",
truncated=results.truncated if results else False,
error=query_error,
# These will be deprecated in Datasette 1.0:
args=request.args,
data={"ok": True, "rows": rows, "columns": columns},
)
if asyncio.iscoroutine(result):
result = await result
if result is None:
raise NotFound("No data")
if isinstance(result, dict):
r = Response(
body=result.get("body"),
status=result.get("status_code") or 200,
content_type=result.get("content_type", "text/plain"),
headers=result.get("headers"),
)
elif isinstance(result, Response):
r = result
# if status_code is not None:
# # Over-ride the status code
# r.status = status_code
else:
assert False, f"{result} should be dict or Response"
elif format_ == "html":
headers = {}
templates = [f"query-{to_css_class(database)}.html", "query.html"]
if canned_query:
templates.insert(
0,
f"query-{to_css_class(database)}-{to_css_class(canned_query['name'])}.html",
)
environment = datasette.get_jinja_environment(request)
template = environment.select_template(templates)
alternate_url_json = datasette.absolute_url(
request,
datasette.urls.path(path_with_format(request=request, format="json")),
)
data = {}
headers.update(
{
"Link": '<{}>; rel="alternate"; type="application/json+datasette"'.format(
alternate_url_json
)
}
)
metadata = await datasette.get_database_metadata(database)
renderers = {}
for key, (_, can_render) in datasette.renderers.items():
it_can_render = call_with_supported_arguments(
can_render,
datasette=datasette,
columns=data.get("columns") or [],
rows=data.get("rows") or [],
sql=data.get("query", {}).get("sql", None),
query_name=data.get("query_name"),
database=database,
table=data.get("table"),
request=request,
view_name="database",
)
it_can_render = await await_me_maybe(it_can_render)
if it_can_render:
renderers[key] = datasette.urls.path(
path_with_format(request=request, format=key)
)
allow_execute_sql = await datasette.allowed(
action="execute-sql",
resource=DatabaseResource(database=database),
actor=request.actor,
)
show_hide_hidden = ""
if canned_query and canned_query.get("hide_sql"):
if bool(params.get("_show_sql")):
show_hide_link = path_with_removed_args(request, {"_show_sql"})
show_hide_text = "hide"
show_hide_hidden = (
'<input type="hidden" name="_show_sql" value="1">'
)
else:
show_hide_link = path_with_added_args(request, {"_show_sql": 1})
show_hide_text = "show"
else:
if bool(params.get("_hide_sql")):
show_hide_link = path_with_removed_args(request, {"_hide_sql"})
show_hide_text = "show"
show_hide_hidden = (
'<input type="hidden" name="_hide_sql" value="1">'
)
else:
show_hide_link = path_with_added_args(request, {"_hide_sql": 1})
show_hide_text = "hide"
hide_sql = show_hide_text == "show"
# Show 'Edit SQL' button only if:
# - User is allowed to execute SQL
# - SQL is an approved SELECT statement
# - No magic parameters, so no :_ in the SQL string
edit_sql_url = None
is_validated_sql = False
try:
validate_sql_select(sql)
is_validated_sql = True
except InvalidSql:
pass
if allow_execute_sql and is_validated_sql and ":_" not in sql:
edit_sql_url = (
datasette.urls.database(database)
+ "/-/query"
+ "?"
+ urlencode(
{
**{
"sql": sql,
},
**named_parameter_values,
}
)
)
async def query_actions():
query_actions = []
for hook in pm.hook.query_actions(
datasette=datasette,
actor=request.actor,
database=database,
query_name=canned_query["name"] if canned_query else None,
request=request,
sql=sql,
params=params,
):
extra_links = await await_me_maybe(hook)
if extra_links:
query_actions.extend(extra_links)
return query_actions
r = Response.html(
await datasette.render_template(
template,
QueryContext(
database=database,
database_color=db.color,
query={
"sql": sql,
"params": params,
},
canned_query=canned_query["name"] if canned_query else None,
private=private,
canned_query_write=canned_query_write,
db_is_immutable=not db.is_mutable,
error=query_error,
hide_sql=hide_sql,
show_hide_link=datasette.urls.path(show_hide_link),
show_hide_text=show_hide_text,
editable=not canned_query,
allow_execute_sql=allow_execute_sql,
tables=await get_tables(datasette, request, db, allowed_dict),
named_parameter_values=named_parameter_values,
edit_sql_url=edit_sql_url,
display_rows=await display_rows(
datasette, database, request, rows, columns
),
table_columns=(
await _table_columns(datasette, database)
if allow_execute_sql
else {}
),
columns=columns,
renderers=renderers,
url_csv=datasette.urls.path(
path_with_format(
request=request, format="csv", extra_qs={"_size": "max"}
)
),
show_hide_hidden=markupsafe.Markup(show_hide_hidden),
metadata=canned_query or metadata,
alternate_url_json=alternate_url_json,
select_templates=[
f"{'*' if template_name == template.name else ''}{template_name}"
for template_name in templates
],
top_query=make_slot_function(
"top_query", datasette, request, database=database, sql=sql
),
top_canned_query=make_slot_function(
"top_canned_query",
datasette,
request,
database=database,
query_name=canned_query["name"] if canned_query else None,
),
query_actions=query_actions,
),
request=request,
view_name="database",
),
headers=headers,
)
else:
assert False, "Invalid format: {}".format(format_)
if datasette.cors:
add_cors_headers(r.headers)
return r
# Delegate to QueryPage for query execution and rendering
page = QueryPage(
datasette,
request,
database=database,
sql=sql,
params=params,
editable=not canned_query,
canned_query=canned_query,
private=private,
)
return await page.response()
class MagicParameters(dict):
@ -1189,68 +894,11 @@ async def _table_columns(datasette, database_name):
async def display_rows(datasette, database, request, rows, columns):
display_rows = []
truncate_cells = datasette.setting("truncate_cells_html")
for row in rows:
display_row = []
for column, value in zip(columns, row):
display_value = value
# Let the plugins have a go
# pylint: disable=no-member
plugin_display_value = None
for candidate in pm.hook.render_cell(
row=row,
value=value,
column=column,
table=None,
database=database,
datasette=datasette,
request=request,
):
candidate = await await_me_maybe(candidate)
if candidate is not None:
plugin_display_value = candidate
break
if plugin_display_value is not None:
display_value = plugin_display_value
else:
if value in ("", None):
display_value = markupsafe.Markup("&nbsp;")
elif is_url(str(display_value).strip()):
display_value = markupsafe.Markup(
'<a href="{url}">{truncated_url}</a>'.format(
url=markupsafe.escape(value.strip()),
truncated_url=markupsafe.escape(
truncate_url(value.strip(), truncate_cells)
),
)
)
elif isinstance(display_value, bytes):
blob_url = path_with_format(
request=request,
format="blob",
extra_qs={
"_blob_column": column,
"_blob_hash": hashlib.sha256(display_value).hexdigest(),
},
)
formatted = format_bytes(len(value))
display_value = markupsafe.Markup(
'<a class="blob-download" href="{}"{}>&lt;Binary:&nbsp;{:,}&nbsp;byte{}&gt;</a>'.format(
blob_url,
(
' title="{}"'.format(formatted)
if "bytes" not in formatted
else ""
),
len(value),
"" if len(value) == 1 else "s",
)
)
else:
display_value = str(value)
if truncate_cells and len(display_value) > truncate_cells:
display_value = display_value[:truncate_cells] + "\u2026"
display_row.append(display_value)
display_rows.append(display_row)
return display_rows
"""Format raw query result rows for HTML display.
This is a thin wrapper around :func:`datasette.query_page._display_rows`
for backwards compatibility.
"""
from datasette.query_page import _display_rows
return await _display_rows(datasette, database, request, rows, columns)

View file

@ -857,41 +857,23 @@ async def table_view_traced(datasette, request):
elif format_ in datasette.renderers.keys():
# Dispatch request to the correct output format renderer
# (CSV is not handled here due to streaming)
result = call_with_supported_arguments(
datasette.renderers[format_][0],
datasette=datasette,
columns=columns,
rows=rows,
sql=sql,
query_name=None,
from datasette.query_page import dispatch_renderer
r = await dispatch_renderer(
datasette,
request,
format_,
columns,
rows,
sql,
database=resolved.db.name,
table=resolved.table,
request=request,
view_name="table",
query_name=None,
truncated=False,
error=None,
# These will be deprecated in Datasette 1.0:
args=request.args,
view_name="table",
data=data,
)
if asyncio.iscoroutine(result):
result = await result
if result is None:
raise NotFound("No data")
if isinstance(result, dict):
r = Response(
body=result.get("body"),
status=result.get("status_code") or 200,
content_type=result.get("content_type", "text/plain"),
headers=result.get("headers"),
)
elif isinstance(result, Response):
r = result
# if status_code is not None:
# # Over-ride the status code
# r.status = status_code
else:
assert False, f"{result} should be dict or Response"
elif format_ == "html":
headers = {}
templates = [

View file

@ -502,6 +502,19 @@ async def test_hook_register_output_renderer_all_parameters(ds_client):
}
@pytest.mark.asyncio
async def test_hook_register_output_renderer_query_view_name(ds_client):
response = await ds_client.get(
"/fixtures/-/query.testall?sql=select+1"
)
assert response.status_code == 200
body = at_memory_re.sub(" at 0xXXX", response.text)
data = json.loads(body)
assert data["view_name"] == "query"
assert data["table"] is None
assert data["database"] == "fixtures"
@pytest.mark.asyncio
async def test_hook_register_output_renderer_custom_status_code(ds_client):
response = await ds_client.get(