Compare commits

...

1 commit

Author SHA1 Message Date
Simon Willison
c46bb1d411 Parallel queries experiment
Refs #1576
2022-02-23 11:11:58 -08:00
3 changed files with 108 additions and 73 deletions

View file

@ -256,18 +256,26 @@ class Database:
# Try to get counts for each table, $limit timeout for each count # Try to get counts for each table, $limit timeout for each count
counts = {} counts = {}
for table in await self.table_names(): for table in await self.table_names():
try: print(table.lower())
table_count = ( if table.lower() == "knn":
await self.execute( counts[table] = 0
f"select count(*) from [{table}]", else:
custom_time_limit=limit, try:
) table_count = (
).rows[0][0] await self.execute(
counts[table] = table_count f"select count(*) from [{table}]",
# In some cases I saw "SQL Logic Error" here in addition to custom_time_limit=limit,
# QueryInterrupted - so we catch that too: )
except (QueryInterrupted, sqlite3.OperationalError, sqlite3.DatabaseError): ).rows[0][0]
counts[table] = None counts[table] = table_count
# In some cases I saw "SQL Logic Error" here in addition to
# QueryInterrupted - so we catch that too:
except (
QueryInterrupted,
sqlite3.OperationalError,
sqlite3.DatabaseError,
):
counts[table] = None
if not self.is_mutable: if not self.is_mutable:
self._cached_table_counts = counts self._cached_table_counts = counts
return counts return counts
@ -318,7 +326,7 @@ class Database:
return explicit_label_column return explicit_label_column
column_names = await self.execute_fn(lambda conn: table_columns(conn, table)) column_names = await self.execute_fn(lambda conn: table_columns(conn, table))
# Is there a name or title column? # Is there a name or title column?
name_or_title = [c for c in column_names if c.lower() in ("name", "title")] name_or_title = [c for c in column_names if c.lower() in ("name", "title", "display_name", "displayname")]
if name_or_title: if name_or_title:
return name_or_title[0] return name_or_title[0]
# If a table has two columns, one of which is ID, then label_column is the other one # If a table has two columns, one of which is ID, then label_column is the other one

View file

@ -7,12 +7,18 @@ class Urls:
self.ds = ds self.ds = ds
def path(self, path, format=None): def path(self, path, format=None):
print(
"Urls.path called with: {} (PrefixedUrlString = {})".format(
path, isinstance(path, PrefixedUrlString)
)
)
if not isinstance(path, PrefixedUrlString): if not isinstance(path, PrefixedUrlString):
if path.startswith("/"): if path.startswith("/"):
path = path[1:] path = path[1:]
path = self.ds.setting("base_url") + path path = self.ds.setting("base_url") + path
if format is not None: if format is not None:
path = path_with_format(path=path, format=format) path = path_with_format(path=path, format=format)
print(" returning", path)
return PrefixedUrlString(path) return PrefixedUrlString(path)
def instance(self, format=None): def instance(self, format=None):

View file

@ -1,3 +1,4 @@
import asyncio
import urllib import urllib
import itertools import itertools
import json import json
@ -26,6 +27,7 @@ from datasette.utils import (
) )
from datasette.utils.asgi import BadRequest, NotFound from datasette.utils.asgi import BadRequest, NotFound
from datasette.filters import Filters from datasette.filters import Filters
from datasette import tracer
from .base import DataView, DatasetteError, ureg from .base import DataView, DatasetteError, ureg
from .database import QueryView from .database import QueryView
@ -616,44 +618,37 @@ class TableView(RowTableShared):
if request.args.get("_timelimit"): if request.args.get("_timelimit"):
extra_args["custom_time_limit"] = int(request.args.get("_timelimit")) extra_args["custom_time_limit"] = int(request.args.get("_timelimit"))
# Execute the main query! async def execute_count():
results = await db.execute(sql, params, truncate=True, **extra_args) # Calculate the total count for this query
filtered_table_rows_count = None
if (
not db.is_mutable
and self.ds.inspect_data
and count_sql == f"select count(*) from {table} "
):
# We can use a previously cached table row count
try:
filtered_table_rows_count = self.ds.inspect_data[database][
"tables"
][table]["count"]
except KeyError:
pass
# Calculate the total count for this query if count_sql and filtered_table_rows_count is None and not nocount:
filtered_table_rows_count = None try:
if ( count_rows = list(await db.execute(count_sql, from_sql_params))
not db.is_mutable filtered_table_rows_count = count_rows[0][0]
and self.ds.inspect_data except QueryInterrupted:
and count_sql == f"select count(*) from {table} " pass
):
# We can use a previously cached table row count
try:
filtered_table_rows_count = self.ds.inspect_data[database]["tables"][
table
]["count"]
except KeyError:
pass
# Otherwise run a select count(*) ... return filtered_table_rows_count
if count_sql and filtered_table_rows_count is None and not nocount:
try:
count_rows = list(await db.execute(count_sql, from_sql_params))
filtered_table_rows_count = count_rows[0][0]
except QueryInterrupted:
pass
# Faceting filtered_table_rows_count = await execute_count()
if not self.ds.setting("allow_facet") and any(
arg.startswith("_facet") for arg in request.args
):
raise BadRequest("_facet= is not allowed")
# pylint: disable=no-member # pylint: disable=no-member
facet_classes = list( facet_classes = list(
itertools.chain.from_iterable(pm.hook.register_facet_classes()) itertools.chain.from_iterable(pm.hook.register_facet_classes())
) )
facet_results = {}
facets_timed_out = []
facet_instances = [] facet_instances = []
for klass in facet_classes: for klass in facet_classes:
facet_instances.append( facet_instances.append(
@ -669,33 +664,59 @@ class TableView(RowTableShared):
) )
) )
if not nofacet: async def execute_suggested_facets():
for facet in facet_instances: # Calculate suggested facets
( suggested_facets = []
instance_facet_results, if (
instance_facets_timed_out, self.ds.setting("suggest_facets")
) = await facet.facet_results() and self.ds.setting("allow_facet")
for facet_info in instance_facet_results: and not _next
base_key = facet_info["name"] and not nofacet
key = base_key and not nosuggest
i = 1 ):
while key in facet_results: for facet in facet_instances:
i += 1 suggested_facets.extend(await facet.suggest())
key = f"{base_key}_{i}" return suggested_facets
facet_results[key] = facet_info
facets_timed_out.extend(instance_facets_timed_out)
# Calculate suggested facets async def execute_facets():
suggested_facets = [] facet_results = {}
if ( facets_timed_out = []
self.ds.setting("suggest_facets") if not self.ds.setting("allow_facet") and any(
and self.ds.setting("allow_facet") arg.startswith("_facet") for arg in request.args
and not _next ):
and not nofacet raise BadRequest("_facet= is not allowed")
and not nosuggest
): if not nofacet:
for facet in facet_instances: for facet in facet_instances:
suggested_facets.extend(await facet.suggest()) (
instance_facet_results,
instance_facets_timed_out,
) = await facet.facet_results()
for facet_info in instance_facet_results:
base_key = facet_info["name"]
key = base_key
i = 1
while key in facet_results:
i += 1
key = f"{base_key}_{i}"
facet_results[key] = facet_info
facets_timed_out.extend(instance_facets_timed_out)
return facet_results, facets_timed_out
# Execute the main query, facets and facet suggestions in parallel:
with tracer.trace_child_tasks():
(
results,
suggested_facets,
(facet_results, facets_timed_out),
) = await asyncio.gather(
db.execute(sql, params, truncate=True, **extra_args),
execute_suggested_facets(),
execute_facets(),
)
results = await db.execute(sql, params, truncate=True, **extra_args)
# Figure out columns and rows for the query # Figure out columns and rows for the query
columns = [r[0] for r in results.description] columns = [r[0] for r in results.description]