Compare commits

...

1 commit

Author SHA1 Message Date
Simon Willison
c46bb1d411 Parallel queries experiment
Refs #1576
2022-02-23 11:11:58 -08:00
3 changed files with 108 additions and 73 deletions

View file

@ -256,18 +256,26 @@ class Database:
# Try to get counts for each table, $limit timeout for each count
counts = {}
for table in await self.table_names():
try:
table_count = (
await self.execute(
f"select count(*) from [{table}]",
custom_time_limit=limit,
)
).rows[0][0]
counts[table] = table_count
# In some cases I saw "SQL Logic Error" here in addition to
# QueryInterrupted - so we catch that too:
except (QueryInterrupted, sqlite3.OperationalError, sqlite3.DatabaseError):
counts[table] = None
print(table.lower())
if table.lower() == "knn":
counts[table] = 0
else:
try:
table_count = (
await self.execute(
f"select count(*) from [{table}]",
custom_time_limit=limit,
)
).rows[0][0]
counts[table] = table_count
# In some cases I saw "SQL Logic Error" here in addition to
# QueryInterrupted - so we catch that too:
except (
QueryInterrupted,
sqlite3.OperationalError,
sqlite3.DatabaseError,
):
counts[table] = None
if not self.is_mutable:
self._cached_table_counts = counts
return counts
@ -318,7 +326,7 @@ class Database:
return explicit_label_column
column_names = await self.execute_fn(lambda conn: table_columns(conn, table))
# Is there a name or title column?
name_or_title = [c for c in column_names if c.lower() in ("name", "title")]
name_or_title = [c for c in column_names if c.lower() in ("name", "title", "display_name", "displayname")]
if name_or_title:
return name_or_title[0]
# If a table has two columns, one of which is ID, then label_column is the other one

View file

@ -7,12 +7,18 @@ class Urls:
self.ds = ds
def path(self, path, format=None):
print(
"Urls.path called with: {} (PrefixedUrlString = {})".format(
path, isinstance(path, PrefixedUrlString)
)
)
if not isinstance(path, PrefixedUrlString):
if path.startswith("/"):
path = path[1:]
path = self.ds.setting("base_url") + path
if format is not None:
path = path_with_format(path=path, format=format)
print(" returning", path)
return PrefixedUrlString(path)
def instance(self, format=None):

View file

@ -1,3 +1,4 @@
import asyncio
import urllib
import itertools
import json
@ -26,6 +27,7 @@ from datasette.utils import (
)
from datasette.utils.asgi import BadRequest, NotFound
from datasette.filters import Filters
from datasette import tracer
from .base import DataView, DatasetteError, ureg
from .database import QueryView
@ -616,44 +618,37 @@ class TableView(RowTableShared):
if request.args.get("_timelimit"):
extra_args["custom_time_limit"] = int(request.args.get("_timelimit"))
# Execute the main query!
results = await db.execute(sql, params, truncate=True, **extra_args)
async def execute_count():
# Calculate the total count for this query
filtered_table_rows_count = None
if (
not db.is_mutable
and self.ds.inspect_data
and count_sql == f"select count(*) from {table} "
):
# We can use a previously cached table row count
try:
filtered_table_rows_count = self.ds.inspect_data[database][
"tables"
][table]["count"]
except KeyError:
pass
# Calculate the total count for this query
filtered_table_rows_count = None
if (
not db.is_mutable
and self.ds.inspect_data
and count_sql == f"select count(*) from {table} "
):
# We can use a previously cached table row count
try:
filtered_table_rows_count = self.ds.inspect_data[database]["tables"][
table
]["count"]
except KeyError:
pass
if count_sql and filtered_table_rows_count is None and not nocount:
try:
count_rows = list(await db.execute(count_sql, from_sql_params))
filtered_table_rows_count = count_rows[0][0]
except QueryInterrupted:
pass
# Otherwise run a select count(*) ...
if count_sql and filtered_table_rows_count is None and not nocount:
try:
count_rows = list(await db.execute(count_sql, from_sql_params))
filtered_table_rows_count = count_rows[0][0]
except QueryInterrupted:
pass
return filtered_table_rows_count
# Faceting
if not self.ds.setting("allow_facet") and any(
arg.startswith("_facet") for arg in request.args
):
raise BadRequest("_facet= is not allowed")
filtered_table_rows_count = await execute_count()
# pylint: disable=no-member
facet_classes = list(
itertools.chain.from_iterable(pm.hook.register_facet_classes())
)
facet_results = {}
facets_timed_out = []
facet_instances = []
for klass in facet_classes:
facet_instances.append(
@ -669,33 +664,59 @@ class TableView(RowTableShared):
)
)
if not nofacet:
for facet in facet_instances:
(
instance_facet_results,
instance_facets_timed_out,
) = await facet.facet_results()
for facet_info in instance_facet_results:
base_key = facet_info["name"]
key = base_key
i = 1
while key in facet_results:
i += 1
key = f"{base_key}_{i}"
facet_results[key] = facet_info
facets_timed_out.extend(instance_facets_timed_out)
async def execute_suggested_facets():
# Calculate suggested facets
suggested_facets = []
if (
self.ds.setting("suggest_facets")
and self.ds.setting("allow_facet")
and not _next
and not nofacet
and not nosuggest
):
for facet in facet_instances:
suggested_facets.extend(await facet.suggest())
return suggested_facets
# Calculate suggested facets
suggested_facets = []
if (
self.ds.setting("suggest_facets")
and self.ds.setting("allow_facet")
and not _next
and not nofacet
and not nosuggest
):
for facet in facet_instances:
suggested_facets.extend(await facet.suggest())
async def execute_facets():
facet_results = {}
facets_timed_out = []
if not self.ds.setting("allow_facet") and any(
arg.startswith("_facet") for arg in request.args
):
raise BadRequest("_facet= is not allowed")
if not nofacet:
for facet in facet_instances:
(
instance_facet_results,
instance_facets_timed_out,
) = await facet.facet_results()
for facet_info in instance_facet_results:
base_key = facet_info["name"]
key = base_key
i = 1
while key in facet_results:
i += 1
key = f"{base_key}_{i}"
facet_results[key] = facet_info
facets_timed_out.extend(instance_facets_timed_out)
return facet_results, facets_timed_out
# Execute the main query, facets and facet suggestions in parallel:
with tracer.trace_child_tasks():
(
results,
suggested_facets,
(facet_results, facets_timed_out),
) = await asyncio.gather(
db.execute(sql, params, truncate=True, **extra_args),
execute_suggested_facets(),
execute_facets(),
)
results = await db.execute(sql, params, truncate=True, **extra_args)
# Figure out columns and rows for the query
columns = [r[0] for r in results.description]