Compare commits

...

1 commit

Author SHA1 Message Date
Simon Willison
c46bb1d411 Parallel queries experiment
Refs #1576
2022-02-23 11:11:58 -08:00
3 changed files with 108 additions and 73 deletions

View file

@ -256,6 +256,10 @@ class Database:
# Try to get counts for each table, $limit timeout for each count # Try to get counts for each table, $limit timeout for each count
counts = {} counts = {}
for table in await self.table_names(): for table in await self.table_names():
print(table.lower())
if table.lower() == "knn":
counts[table] = 0
else:
try: try:
table_count = ( table_count = (
await self.execute( await self.execute(
@ -266,7 +270,11 @@ class Database:
counts[table] = table_count counts[table] = table_count
# In some cases I saw "SQL Logic Error" here in addition to # In some cases I saw "SQL Logic Error" here in addition to
# QueryInterrupted - so we catch that too: # QueryInterrupted - so we catch that too:
except (QueryInterrupted, sqlite3.OperationalError, sqlite3.DatabaseError): except (
QueryInterrupted,
sqlite3.OperationalError,
sqlite3.DatabaseError,
):
counts[table] = None counts[table] = None
if not self.is_mutable: if not self.is_mutable:
self._cached_table_counts = counts self._cached_table_counts = counts
@ -318,7 +326,7 @@ class Database:
return explicit_label_column return explicit_label_column
column_names = await self.execute_fn(lambda conn: table_columns(conn, table)) column_names = await self.execute_fn(lambda conn: table_columns(conn, table))
# Is there a name or title column? # Is there a name or title column?
name_or_title = [c for c in column_names if c.lower() in ("name", "title")] name_or_title = [c for c in column_names if c.lower() in ("name", "title", "display_name", "displayname")]
if name_or_title: if name_or_title:
return name_or_title[0] return name_or_title[0]
# If a table has two columns, one of which is ID, then label_column is the other one # If a table has two columns, one of which is ID, then label_column is the other one

View file

@ -7,12 +7,18 @@ class Urls:
self.ds = ds self.ds = ds
def path(self, path, format=None): def path(self, path, format=None):
print(
"Urls.path called with: {} (PrefixedUrlString = {})".format(
path, isinstance(path, PrefixedUrlString)
)
)
if not isinstance(path, PrefixedUrlString): if not isinstance(path, PrefixedUrlString):
if path.startswith("/"): if path.startswith("/"):
path = path[1:] path = path[1:]
path = self.ds.setting("base_url") + path path = self.ds.setting("base_url") + path
if format is not None: if format is not None:
path = path_with_format(path=path, format=format) path = path_with_format(path=path, format=format)
print(" returning", path)
return PrefixedUrlString(path) return PrefixedUrlString(path)
def instance(self, format=None): def instance(self, format=None):

View file

@ -1,3 +1,4 @@
import asyncio
import urllib import urllib
import itertools import itertools
import json import json
@ -26,6 +27,7 @@ from datasette.utils import (
) )
from datasette.utils.asgi import BadRequest, NotFound from datasette.utils.asgi import BadRequest, NotFound
from datasette.filters import Filters from datasette.filters import Filters
from datasette import tracer
from .base import DataView, DatasetteError, ureg from .base import DataView, DatasetteError, ureg
from .database import QueryView from .database import QueryView
@ -616,9 +618,7 @@ class TableView(RowTableShared):
if request.args.get("_timelimit"): if request.args.get("_timelimit"):
extra_args["custom_time_limit"] = int(request.args.get("_timelimit")) extra_args["custom_time_limit"] = int(request.args.get("_timelimit"))
# Execute the main query! async def execute_count():
results = await db.execute(sql, params, truncate=True, **extra_args)
# Calculate the total count for this query # Calculate the total count for this query
filtered_table_rows_count = None filtered_table_rows_count = None
if ( if (
@ -628,13 +628,12 @@ class TableView(RowTableShared):
): ):
# We can use a previously cached table row count # We can use a previously cached table row count
try: try:
filtered_table_rows_count = self.ds.inspect_data[database]["tables"][ filtered_table_rows_count = self.ds.inspect_data[database][
table "tables"
]["count"] ][table]["count"]
except KeyError: except KeyError:
pass pass
# Otherwise run a select count(*) ...
if count_sql and filtered_table_rows_count is None and not nocount: if count_sql and filtered_table_rows_count is None and not nocount:
try: try:
count_rows = list(await db.execute(count_sql, from_sql_params)) count_rows = list(await db.execute(count_sql, from_sql_params))
@ -642,18 +641,14 @@ class TableView(RowTableShared):
except QueryInterrupted: except QueryInterrupted:
pass pass
# Faceting return filtered_table_rows_count
if not self.ds.setting("allow_facet") and any(
arg.startswith("_facet") for arg in request.args filtered_table_rows_count = await execute_count()
):
raise BadRequest("_facet= is not allowed")
# pylint: disable=no-member # pylint: disable=no-member
facet_classes = list( facet_classes = list(
itertools.chain.from_iterable(pm.hook.register_facet_classes()) itertools.chain.from_iterable(pm.hook.register_facet_classes())
) )
facet_results = {}
facets_timed_out = []
facet_instances = [] facet_instances = []
for klass in facet_classes: for klass in facet_classes:
facet_instances.append( facet_instances.append(
@ -669,6 +664,28 @@ class TableView(RowTableShared):
) )
) )
async def execute_suggested_facets():
# Calculate suggested facets
suggested_facets = []
if (
self.ds.setting("suggest_facets")
and self.ds.setting("allow_facet")
and not _next
and not nofacet
and not nosuggest
):
for facet in facet_instances:
suggested_facets.extend(await facet.suggest())
return suggested_facets
async def execute_facets():
facet_results = {}
facets_timed_out = []
if not self.ds.setting("allow_facet") and any(
arg.startswith("_facet") for arg in request.args
):
raise BadRequest("_facet= is not allowed")
if not nofacet: if not nofacet:
for facet in facet_instances: for facet in facet_instances:
( (
@ -685,17 +702,21 @@ class TableView(RowTableShared):
facet_results[key] = facet_info facet_results[key] = facet_info
facets_timed_out.extend(instance_facets_timed_out) facets_timed_out.extend(instance_facets_timed_out)
# Calculate suggested facets return facet_results, facets_timed_out
suggested_facets = []
if ( # Execute the main query, facets and facet suggestions in parallel:
self.ds.setting("suggest_facets") with tracer.trace_child_tasks():
and self.ds.setting("allow_facet") (
and not _next results,
and not nofacet suggested_facets,
and not nosuggest (facet_results, facets_timed_out),
): ) = await asyncio.gather(
for facet in facet_instances: db.execute(sql, params, truncate=True, **extra_args),
suggested_facets.extend(await facet.suggest()) execute_suggested_facets(),
execute_facets(),
)
results = await db.execute(sql, params, truncate=True, **extra_args)
# Figure out columns and rows for the query # Figure out columns and rows for the query
columns = [r[0] for r in results.description] columns = [r[0] for r in results.description]