mirror of
https://github.com/simonw/datasette.git
synced 2025-12-10 16:51:24 +01:00
Execute some TableView queries in parallel
Use ?_noparallel=1 to opt out (undocumented, useful for benchmark comparisons) Refs #1723, #1715
This commit is contained in:
parent
8a0c38f0b8
commit
942411ef94
1 changed files with 67 additions and 26 deletions
|
|
@ -1,3 +1,4 @@
|
||||||
|
import asyncio
|
||||||
import itertools
|
import itertools
|
||||||
import json
|
import json
|
||||||
|
|
||||||
|
|
@ -5,6 +6,7 @@ import markupsafe
|
||||||
|
|
||||||
from datasette.plugins import pm
|
from datasette.plugins import pm
|
||||||
from datasette.database import QueryInterrupted
|
from datasette.database import QueryInterrupted
|
||||||
|
from datasette import tracer
|
||||||
from datasette.utils import (
|
from datasette.utils import (
|
||||||
await_me_maybe,
|
await_me_maybe,
|
||||||
CustomRow,
|
CustomRow,
|
||||||
|
|
@ -150,6 +152,16 @@ class TableView(DataView):
|
||||||
default_labels=False,
|
default_labels=False,
|
||||||
_next=None,
|
_next=None,
|
||||||
_size=None,
|
_size=None,
|
||||||
|
):
|
||||||
|
with tracer.trace_child_tasks():
|
||||||
|
return await self._data_traced(request, default_labels, _next, _size)
|
||||||
|
|
||||||
|
async def _data_traced(
|
||||||
|
self,
|
||||||
|
request,
|
||||||
|
default_labels=False,
|
||||||
|
_next=None,
|
||||||
|
_size=None,
|
||||||
):
|
):
|
||||||
database_route = tilde_decode(request.url_vars["database"])
|
database_route = tilde_decode(request.url_vars["database"])
|
||||||
table_name = tilde_decode(request.url_vars["table"])
|
table_name = tilde_decode(request.url_vars["table"])
|
||||||
|
|
@ -159,6 +171,20 @@ class TableView(DataView):
|
||||||
raise NotFound("Database not found: {}".format(database_route))
|
raise NotFound("Database not found: {}".format(database_route))
|
||||||
database_name = db.name
|
database_name = db.name
|
||||||
|
|
||||||
|
# For performance profiling purposes, ?_noparallel=1 turns off asyncio.gather
|
||||||
|
async def _gather_parallel(*args):
|
||||||
|
return await asyncio.gather(*args)
|
||||||
|
|
||||||
|
async def _gather_sequential(*args):
|
||||||
|
results = []
|
||||||
|
for fn in args:
|
||||||
|
results.append(await fn)
|
||||||
|
return results
|
||||||
|
|
||||||
|
gather = (
|
||||||
|
_gather_sequential if request.args.get("_noparallel") else _gather_parallel
|
||||||
|
)
|
||||||
|
|
||||||
# If this is a canned query, not a table, then dispatch to QueryView instead
|
# If this is a canned query, not a table, then dispatch to QueryView instead
|
||||||
canned_query = await self.ds.get_canned_query(
|
canned_query = await self.ds.get_canned_query(
|
||||||
database_name, table_name, request.actor
|
database_name, table_name, request.actor
|
||||||
|
|
@ -174,8 +200,12 @@ class TableView(DataView):
|
||||||
write=bool(canned_query.get("write")),
|
write=bool(canned_query.get("write")),
|
||||||
)
|
)
|
||||||
|
|
||||||
is_view = bool(await db.get_view_definition(table_name))
|
is_view, table_exists = map(
|
||||||
table_exists = bool(await db.table_exists(table_name))
|
bool,
|
||||||
|
await gather(
|
||||||
|
db.get_view_definition(table_name), db.table_exists(table_name)
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
# If table or view not found, return 404
|
# If table or view not found, return 404
|
||||||
if not is_view and not table_exists:
|
if not is_view and not table_exists:
|
||||||
|
|
@ -497,33 +527,44 @@ class TableView(DataView):
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
if not nofacet:
|
async def execute_facets():
|
||||||
for facet in facet_instances:
|
if not nofacet:
|
||||||
(
|
# Run them in parallel
|
||||||
|
facet_awaitables = [facet.facet_results() for facet in facet_instances]
|
||||||
|
facet_awaitable_results = await gather(*facet_awaitables)
|
||||||
|
for (
|
||||||
instance_facet_results,
|
instance_facet_results,
|
||||||
instance_facets_timed_out,
|
instance_facets_timed_out,
|
||||||
) = await facet.facet_results()
|
) in facet_awaitable_results:
|
||||||
for facet_info in instance_facet_results:
|
for facet_info in instance_facet_results:
|
||||||
base_key = facet_info["name"]
|
base_key = facet_info["name"]
|
||||||
key = base_key
|
key = base_key
|
||||||
i = 1
|
i = 1
|
||||||
while key in facet_results:
|
while key in facet_results:
|
||||||
i += 1
|
i += 1
|
||||||
key = f"{base_key}_{i}"
|
key = f"{base_key}_{i}"
|
||||||
facet_results[key] = facet_info
|
facet_results[key] = facet_info
|
||||||
facets_timed_out.extend(instance_facets_timed_out)
|
facets_timed_out.extend(instance_facets_timed_out)
|
||||||
|
|
||||||
# Calculate suggested facets
|
|
||||||
suggested_facets = []
|
suggested_facets = []
|
||||||
if (
|
|
||||||
self.ds.setting("suggest_facets")
|
async def execute_suggested_facets():
|
||||||
and self.ds.setting("allow_facet")
|
# Calculate suggested facets
|
||||||
and not _next
|
if (
|
||||||
and not nofacet
|
self.ds.setting("suggest_facets")
|
||||||
and not nosuggest
|
and self.ds.setting("allow_facet")
|
||||||
):
|
and not _next
|
||||||
for facet in facet_instances:
|
and not nofacet
|
||||||
suggested_facets.extend(await facet.suggest())
|
and not nosuggest
|
||||||
|
):
|
||||||
|
# Run them in parallel
|
||||||
|
facet_suggest_awaitables = [
|
||||||
|
facet.suggest() for facet in facet_instances
|
||||||
|
]
|
||||||
|
for suggest_result in await gather(*facet_suggest_awaitables):
|
||||||
|
suggested_facets.extend(suggest_result)
|
||||||
|
|
||||||
|
await gather(execute_facets(), execute_suggested_facets())
|
||||||
|
|
||||||
# Figure out columns and rows for the query
|
# Figure out columns and rows for the query
|
||||||
columns = [r[0] for r in results.description]
|
columns = [r[0] for r in results.description]
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue