datasette/datasette/utils.py

325 lines
10 KiB
Python

from contextlib import contextmanager
import base64
import json
import os
import re
import sqlite3
import tempfile
import time
import urllib
def compound_pks_from_path(path):
return [
urllib.parse.unquote_plus(b) for b in path.split(',')
]
def path_from_row_pks(row, pks, use_rowid):
if use_rowid:
return urllib.parse.quote_plus(str(row['rowid']))
bits = []
for pk in pks:
bits.append(
urllib.parse.quote_plus(str(row[pk]))
)
return ','.join(bits)
class CustomJSONEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, sqlite3.Row):
return tuple(obj)
if isinstance(obj, sqlite3.Cursor):
return list(obj)
if isinstance(obj, bytes):
# Does it encode to utf8?
try:
return obj.decode('utf8')
except UnicodeDecodeError:
return {
'$base64': True,
'encoded': base64.b64encode(obj).decode('latin1'),
}
return json.JSONEncoder.default(self, obj)
@contextmanager
def sqlite_timelimit(conn, ms):
deadline = time.time() + (ms / 1000)
# n is the number of SQLite virtual machine instructions that will be
# executed between each check. It's hard to know what to pick here.
# After some experimentation, I've decided to go with 1000 by default and
# 1 for time limits that are less than 50ms
n = 1000
if ms < 50:
n = 1
def handler():
if time.time() >= deadline:
return 1
conn.set_progress_handler(handler, n)
yield
conn.set_progress_handler(None, n)
class InvalidSql(Exception):
pass
def validate_sql_select(sql):
sql = sql.strip().lower()
if not sql.startswith('select '):
raise InvalidSql('Statement must begin with SELECT')
if 'pragma' in sql:
raise InvalidSql('Statement may not contain PRAGMA')
def path_with_added_args(request, args):
current = {
key: value
for key, value in request.raw_args.items()
if key not in args
}
current.update({
key: value
for key, value in args.items()
if value is not None
})
return request.path + '?' + urllib.parse.urlencode(sorted(current.items()))
def path_with_ext(request, ext):
path = request.path
path += ext
if request.query_string:
path += '?' + request.query_string
return path
_css_re = re.compile(r'''['"\n\\]''')
_boring_table_name_re = re.compile(r'^[a-zA-Z_][a-zA-Z0-9_]*$')
def escape_css_string(s):
return _css_re.sub(lambda m: '\\{:X}'.format(ord(m.group())), s)
def escape_sqlite_table_name(s):
if _boring_table_name_re.match(s):
return s
else:
return '[{}]'.format(s)
def make_dockerfile(files, metadata_file, extra_options='', branch=None):
cmd = ['"datasette"', '"serve"', '"--host"', '"0.0.0.0"']
cmd.append('"' + '", "'.join(files) + '"')
cmd.extend(['"--cors"', '"--port"', '"8001"', '"--inspect-file"', '"inspect-data.json"'])
if metadata_file:
cmd.extend(['"--metadata"', '"{}"'.format(metadata_file)])
if extra_options:
for opt in extra_options.split():
cmd.append('"{}"'.format(opt))
install_from = 'datasette'
if branch:
install_from = 'https://github.com/simonw/datasette/archive/{}.zip'.format(
branch
)
return '''
FROM python:3
COPY . /app
WORKDIR /app
RUN pip install {install_from}
RUN datasette build {files} --inspect-file inspect-data.json
EXPOSE 8001
CMD [{cmd}]'''.format(
files=' '.join(files),
cmd=', '.join(cmd),
install_from=install_from,
).strip()
@contextmanager
def temporary_docker_directory(files, name, metadata, extra_options, branch=None, extra_metadata=None):
extra_metadata = extra_metadata or {}
tmp = tempfile.TemporaryDirectory()
# We create a datasette folder in there to get a nicer now deploy name
datasette_dir = os.path.join(tmp.name, name)
os.mkdir(datasette_dir)
saved_cwd = os.getcwd()
file_paths = [
os.path.join(saved_cwd, name)
for name in files
]
file_names = [os.path.split(f)[-1] for f in files]
if metadata:
metadata_content = json.load(metadata)
else:
metadata_content = {}
for key, value in extra_metadata.items():
if value:
metadata_content[key] = value
try:
dockerfile = make_dockerfile(file_names, metadata_content and 'metadata.json', extra_options, branch)
os.chdir(datasette_dir)
if metadata_content:
open('metadata.json', 'w').write(json.dumps(metadata_content, indent=2))
open('Dockerfile', 'w').write(dockerfile)
for path, filename in zip(file_paths, file_names):
os.link(path, os.path.join(datasette_dir, filename))
yield
finally:
tmp.cleanup()
os.chdir(saved_cwd)
def get_all_foreign_keys(conn):
tables = [r[0] for r in conn.execute('select name from sqlite_master where type="table"')]
table_to_foreign_keys = {}
for table in tables:
table_to_foreign_keys[table] = {
'incoming': [],
'outgoing': [],
}
for table in tables:
infos = conn.execute(
'PRAGMA foreign_key_list([{}])'.format(table)
).fetchall()
for info in infos:
if info is not None:
id, seq, table_name, from_, to_, on_update, on_delete, match = info
if table_name not in table_to_foreign_keys:
# Weird edge case where something refers to a table that does
# not actually exist
continue
table_to_foreign_keys[table_name]['incoming'].append({
'other_table': table,
'column': to_,
'other_column': from_
})
table_to_foreign_keys[table]['outgoing'].append({
'other_table': table_name,
'column': from_,
'other_column': to_
})
return table_to_foreign_keys
def detect_fts(conn, table, return_sql=False):
"Detect if table has a corresponding FTS virtual table and return it"
rows = conn.execute(detect_fts_sql(table)).fetchall()
if len(rows) == 0:
return None
else:
return rows[0][0]
def detect_fts_sql(table):
return r'''
select name from sqlite_master
where rootpage = 0
and sql like '%VIRTUAL TABLE%USING FTS%content="{}"%';
'''.format(table)
class Filter:
def __init__(self, key, sql_template, human_template, format='{}', numeric=False, no_argument=False):
self.key = key
self.sql_template = sql_template
self.human_template = human_template
self.format = format
self.numeric = numeric
self.no_argument = no_argument
def where_clause(self, column, value, param_counter):
converted = self.format.format(value)
if self.numeric and converted.isdigit():
converted = int(converted)
if self.no_argument:
kwargs = {
'c': column,
}
converted = None
else:
kwargs = {
'c': column,
'p': 'p{}'.format(param_counter),
}
return self.sql_template.format(**kwargs), converted
def human_clause(self, column, value):
if callable(self.human_template):
template = self.human_template(column, value)
else:
template = self.human_template
if self.no_argument:
return template.format(c=column)
else:
return template.format(c=column, v=value)
class Filters:
_filters = [
Filter('exact', '"{c}" = :{p}', lambda c, v: '{c} = {v}' if v.isdigit() else '{c} = "{v}"'),
Filter('contains', '"{c}" like :{p}', '{c} contains "{v}"', format='%{}%'),
Filter('endswith', '"{c}" like :{p}', '{c} ends with "{v}"', format='%{}'),
Filter('startswith', '"{c}" like :{p}', '{c} starts with "{v}"', format='{}%'),
Filter('gt', '"{c}" > :{p}', '{c} > {v}', numeric=True),
Filter('gte', '"{c}" >= :{p}', '{c} \u2265 {v}', numeric=True),
Filter('lt', '"{c}" < :{p}', '{c} < {v}', numeric=True),
Filter('lte', '"{c}" <= :{p}', '{c} \u2264 {v}', numeric=True),
Filter('glob', '"{c}" glob :{p}', '{c} glob "{v}"'),
Filter('like', '"{c}" like :{p}', '{c} like "{v}"'),
Filter('isnull', '"{c}" is null', '{c} is null', no_argument=True),
Filter('notnull', '"{c}" is not null', '{c} is not null', no_argument=True),
Filter('isblank', '("{c}" is null or "{c}" = "")', '{c} is blank', no_argument=True),
Filter('notblank', '("{c}" is not null and "{c}" != "")', '{c} is not blank', no_argument=True),
]
_filters_by_key = {
f.key: f for f in _filters
}
def __init__(self, pairs):
self.pairs = pairs
def human_description(self):
bits = []
for key, value in self.pairs:
if '__' in key:
column, lookup = key.rsplit('__', 1)
else:
column = key
lookup = 'exact'
filter = self._filters_by_key.get(lookup, None)
if filter:
bits.append(filter.human_clause(column, value))
# Comma separated, with an ' and ' at the end
and_bits = []
commas, tail = bits[:-1], bits[-1:]
if commas:
and_bits.append(', '.join(commas))
if tail:
and_bits.append(tail[0])
return ' and '.join(and_bits)
def build_where_clauses(self):
sql_bits = []
params = {}
for i, (key, value) in enumerate(self.pairs):
if '__' in key:
column, lookup = key.rsplit('__', 1)
else:
column = key
lookup = 'exact'
filter = self._filters_by_key.get(lookup, None)
if filter:
sql_bit, param = filter.where_clause(column, value, i)
sql_bits.append(sql_bit)
if param is not None:
param_id = 'p{}'.format(i)
params[param_id] = param
return sql_bits, params
return ' and '.join(sql_bits), params