mirror of
https://github.com/simonw/datasette.git
synced 2025-12-10 16:51:24 +01:00
I've run the black code formatting tool against everything:
black tests datasette setup.py
I also added a new unit test, in tests/test_black.py, which will fail if the code does not
conform to black's exacting standards.
This unit test only runs on Python 3.6 or higher, because black itself doesn't run on 3.5.
252 lines
7.9 KiB
Python
252 lines
7.9 KiB
Python
import json
|
|
import numbers
|
|
|
|
from .utils import detect_json1, escape_sqlite
|
|
|
|
|
|
class Filter:
|
|
key = None
|
|
display = None
|
|
no_argument = False
|
|
|
|
def where_clause(self, table, column, value, param_counter):
|
|
raise NotImplementedError
|
|
|
|
def human_clause(self, column, value):
|
|
raise NotImplementedError
|
|
|
|
|
|
class TemplatedFilter(Filter):
|
|
def __init__(
|
|
self,
|
|
key,
|
|
display,
|
|
sql_template,
|
|
human_template,
|
|
format="{}",
|
|
numeric=False,
|
|
no_argument=False,
|
|
):
|
|
self.key = key
|
|
self.display = display
|
|
self.sql_template = sql_template
|
|
self.human_template = human_template
|
|
self.format = format
|
|
self.numeric = numeric
|
|
self.no_argument = no_argument
|
|
|
|
def where_clause(self, table, column, value, param_counter):
|
|
converted = self.format.format(value)
|
|
if self.numeric and converted.isdigit():
|
|
converted = int(converted)
|
|
if self.no_argument:
|
|
kwargs = {"c": column}
|
|
converted = None
|
|
else:
|
|
kwargs = {"c": column, "p": "p{}".format(param_counter), "t": table}
|
|
return self.sql_template.format(**kwargs), converted
|
|
|
|
def human_clause(self, column, value):
|
|
if callable(self.human_template):
|
|
template = self.human_template(column, value)
|
|
else:
|
|
template = self.human_template
|
|
if self.no_argument:
|
|
return template.format(c=column)
|
|
else:
|
|
return template.format(c=column, v=value)
|
|
|
|
|
|
class InFilter(Filter):
|
|
key = "in"
|
|
display = "in"
|
|
|
|
def __init__(self):
|
|
pass
|
|
|
|
def split_value(self, value):
|
|
if value.startswith("["):
|
|
return json.loads(value)
|
|
else:
|
|
return [v.strip() for v in value.split(",")]
|
|
|
|
def where_clause(self, table, column, value, param_counter):
|
|
values = self.split_value(value)
|
|
params = [":p{}".format(param_counter + i) for i in range(len(values))]
|
|
sql = "{} in ({})".format(escape_sqlite(column), ", ".join(params))
|
|
return sql, values
|
|
|
|
def human_clause(self, column, value):
|
|
return "{} in {}".format(column, json.dumps(self.split_value(value)))
|
|
|
|
|
|
class Filters:
|
|
_filters = (
|
|
[
|
|
# key, display, sql_template, human_template, format=, numeric=, no_argument=
|
|
TemplatedFilter(
|
|
"exact",
|
|
"=",
|
|
'"{c}" = :{p}',
|
|
lambda c, v: "{c} = {v}" if v.isdigit() else '{c} = "{v}"',
|
|
),
|
|
TemplatedFilter(
|
|
"not",
|
|
"!=",
|
|
'"{c}" != :{p}',
|
|
lambda c, v: "{c} != {v}" if v.isdigit() else '{c} != "{v}"',
|
|
),
|
|
TemplatedFilter(
|
|
"contains",
|
|
"contains",
|
|
'"{c}" like :{p}',
|
|
'{c} contains "{v}"',
|
|
format="%{}%",
|
|
),
|
|
TemplatedFilter(
|
|
"endswith",
|
|
"ends with",
|
|
'"{c}" like :{p}',
|
|
'{c} ends with "{v}"',
|
|
format="%{}",
|
|
),
|
|
TemplatedFilter(
|
|
"startswith",
|
|
"starts with",
|
|
'"{c}" like :{p}',
|
|
'{c} starts with "{v}"',
|
|
format="{}%",
|
|
),
|
|
TemplatedFilter("gt", ">", '"{c}" > :{p}', "{c} > {v}", numeric=True),
|
|
TemplatedFilter(
|
|
"gte", "\u2265", '"{c}" >= :{p}', "{c} \u2265 {v}", numeric=True
|
|
),
|
|
TemplatedFilter("lt", "<", '"{c}" < :{p}', "{c} < {v}", numeric=True),
|
|
TemplatedFilter(
|
|
"lte", "\u2264", '"{c}" <= :{p}', "{c} \u2264 {v}", numeric=True
|
|
),
|
|
TemplatedFilter("like", "like", '"{c}" like :{p}', '{c} like "{v}"'),
|
|
TemplatedFilter("glob", "glob", '"{c}" glob :{p}', '{c} glob "{v}"'),
|
|
InFilter(),
|
|
]
|
|
+ (
|
|
[
|
|
TemplatedFilter(
|
|
"arraycontains",
|
|
"array contains",
|
|
"""rowid in (
|
|
select {t}.rowid from {t}, json_each({t}.{c}) j
|
|
where j.value = :{p}
|
|
)""",
|
|
'{c} contains "{v}"',
|
|
)
|
|
]
|
|
if detect_json1()
|
|
else []
|
|
)
|
|
+ [
|
|
TemplatedFilter("date", "date", "date({c}) = :{p}", '"{c}" is on date {v}'),
|
|
TemplatedFilter(
|
|
"isnull", "is null", '"{c}" is null', "{c} is null", no_argument=True
|
|
),
|
|
TemplatedFilter(
|
|
"notnull",
|
|
"is not null",
|
|
'"{c}" is not null',
|
|
"{c} is not null",
|
|
no_argument=True,
|
|
),
|
|
TemplatedFilter(
|
|
"isblank",
|
|
"is blank",
|
|
'("{c}" is null or "{c}" = "")',
|
|
"{c} is blank",
|
|
no_argument=True,
|
|
),
|
|
TemplatedFilter(
|
|
"notblank",
|
|
"is not blank",
|
|
'("{c}" is not null and "{c}" != "")',
|
|
"{c} is not blank",
|
|
no_argument=True,
|
|
),
|
|
]
|
|
)
|
|
_filters_by_key = {f.key: f for f in _filters}
|
|
|
|
def __init__(self, pairs, units={}, ureg=None):
|
|
self.pairs = pairs
|
|
self.units = units
|
|
self.ureg = ureg
|
|
|
|
def lookups(self):
|
|
"Yields (lookup, display, no_argument) pairs"
|
|
for filter in self._filters:
|
|
yield filter.key, filter.display, filter.no_argument
|
|
|
|
def human_description_en(self, extra=None):
|
|
bits = []
|
|
if extra:
|
|
bits.extend(extra)
|
|
for column, lookup, value in self.selections():
|
|
filter = self._filters_by_key.get(lookup, None)
|
|
if filter:
|
|
bits.append(filter.human_clause(column, value))
|
|
# Comma separated, with an ' and ' at the end
|
|
and_bits = []
|
|
commas, tail = bits[:-1], bits[-1:]
|
|
if commas:
|
|
and_bits.append(", ".join(commas))
|
|
if tail:
|
|
and_bits.append(tail[0])
|
|
s = " and ".join(and_bits)
|
|
if not s:
|
|
return ""
|
|
return "where {}".format(s)
|
|
|
|
def selections(self):
|
|
"Yields (column, lookup, value) tuples"
|
|
for key, value in self.pairs:
|
|
if "__" in key:
|
|
column, lookup = key.rsplit("__", 1)
|
|
else:
|
|
column = key
|
|
lookup = "exact"
|
|
yield column, lookup, value
|
|
|
|
def has_selections(self):
|
|
return bool(self.pairs)
|
|
|
|
def convert_unit(self, column, value):
|
|
"If the user has provided a unit in the query, convert it into the column unit, if present."
|
|
if column not in self.units:
|
|
return value
|
|
|
|
# Try to interpret the value as a unit
|
|
value = self.ureg(value)
|
|
if isinstance(value, numbers.Number):
|
|
# It's just a bare number, assume it's the column unit
|
|
return value
|
|
|
|
column_unit = self.ureg(self.units[column])
|
|
return value.to(column_unit).magnitude
|
|
|
|
def build_where_clauses(self, table):
|
|
sql_bits = []
|
|
params = {}
|
|
i = 0
|
|
for column, lookup, value in self.selections():
|
|
filter = self._filters_by_key.get(lookup, None)
|
|
if filter:
|
|
sql_bit, param = filter.where_clause(
|
|
table, column, self.convert_unit(column, value), i
|
|
)
|
|
sql_bits.append(sql_bit)
|
|
if param is not None:
|
|
if not isinstance(param, list):
|
|
param = [param]
|
|
for individual_param in param:
|
|
param_id = "p{}".format(i)
|
|
params[param_id] = individual_param
|
|
i += 1
|
|
return sql_bits, params
|