mirror of
https://github.com/simonw/datasette.git
synced 2025-12-10 16:51:24 +01:00
Expecting SQLite columns to all be valid utf8 doesn't work, because we are deailing with all kinds of databases. Instead, we now use the 'replace' encoding mode to replace any non-UTF8 characters with a [X] character.
319 lines
9.1 KiB
Python
319 lines
9.1 KiB
Python
from sanic import Sanic
|
|
from sanic import response
|
|
from sanic.exceptions import NotFound
|
|
from sanic.views import HTTPMethodView
|
|
from sanic_jinja2 import SanicJinja2
|
|
import sqlite3
|
|
from pathlib import Path
|
|
from functools import wraps
|
|
import urllib.parse
|
|
import json
|
|
import base64
|
|
import hashlib
|
|
import sys
|
|
import time
|
|
|
|
app_root = Path(__file__).parent
|
|
|
|
BUILD_METADATA = 'build-metadata.json'
|
|
DB_GLOBS = ('*.db', '*.sqlite', '*.sqlite3')
|
|
HASH_BLOCK_SIZE = 1024 * 1024
|
|
|
|
conns = {}
|
|
|
|
|
|
app = Sanic(__name__)
|
|
jinja = SanicJinja2(app)
|
|
|
|
|
|
def get_conn(name):
|
|
if name not in conns:
|
|
info = ensure_build_metadata()[name]
|
|
conns[name] = sqlite3.connect(
|
|
'file:{}?immutable=1'.format(info['file']),
|
|
uri=True
|
|
)
|
|
conns[name].row_factory = sqlite3.Row
|
|
conns[name].text_factory = lambda x: str(x, 'utf-8', 'replace')
|
|
return conns[name]
|
|
|
|
|
|
def ensure_build_metadata(regenerate=False):
|
|
build_metadata = app_root / BUILD_METADATA
|
|
if build_metadata.exists() and not regenerate:
|
|
json.loads(build_metadata.read_text())
|
|
metadata = {}
|
|
for glob in DB_GLOBS:
|
|
for path in app_root.glob(glob):
|
|
name = path.stem
|
|
if name in metadata:
|
|
raise Exception('Multiple files with same stem %s' % name)
|
|
# Calculate hash, efficiently
|
|
m = hashlib.sha256()
|
|
with path.open('rb') as fp:
|
|
while True:
|
|
data = fp.read(HASH_BLOCK_SIZE)
|
|
if not data:
|
|
break
|
|
m.update(data)
|
|
# List tables and their row counts
|
|
tables = {}
|
|
with sqlite3.connect('file:{}?immutable=1'.format(path.name), uri=True) as conn:
|
|
conn.row_factory = sqlite3.Row
|
|
table_names = [
|
|
r['name']
|
|
for r in conn.execute('select * from sqlite_master where type="table"')
|
|
]
|
|
for table in table_names:
|
|
tables[table] = conn.execute('select count(*) from "{}"'.format(table)).fetchone()[0]
|
|
|
|
metadata[name] = {
|
|
'hash': m.hexdigest(),
|
|
'file': path.name,
|
|
'tables': tables,
|
|
}
|
|
build_metadata.write_text(json.dumps(metadata, indent=4))
|
|
return metadata
|
|
|
|
|
|
class BaseView(HTTPMethodView):
|
|
template = None
|
|
|
|
def redirect(self, request, path):
|
|
if request.query_string:
|
|
path = '{}?{}'.format(
|
|
path, request.query_string
|
|
)
|
|
r = response.redirect(path)
|
|
r.headers['Link'] = '<{}>; rel=preload'.format(path)
|
|
return r
|
|
|
|
async def get(self, request, db_name, **kwargs):
|
|
name, hash, should_redirect = resolve_db_name(db_name, **kwargs)
|
|
if should_redirect:
|
|
return self.redirect(request, should_redirect)
|
|
try:
|
|
as_json = kwargs.pop('as_json')
|
|
except KeyError:
|
|
as_json = False
|
|
extra_template_data = {}
|
|
start = time.time()
|
|
try:
|
|
data, extra_template_data = self.data(
|
|
request, name, hash, **kwargs
|
|
)
|
|
except sqlite3.OperationalError as e:
|
|
data = {
|
|
'ok': False,
|
|
'error': str(e),
|
|
}
|
|
end = time.time()
|
|
data['took_ms'] = (end - start) * 1000
|
|
if as_json:
|
|
# Special case for .jsono extension
|
|
if as_json == '.jsono':
|
|
columns = data.get('columns')
|
|
rows = data.get('rows')
|
|
if rows and columns:
|
|
data['rows'] = [
|
|
dict(zip(columns, row))
|
|
for row in rows
|
|
]
|
|
r = response.HTTPResponse(
|
|
json.dumps(
|
|
data, cls=CustomJSONEncoder
|
|
),
|
|
content_type='application/json',
|
|
headers={
|
|
'Access-Control-Allow-Origin': '*'
|
|
}
|
|
)
|
|
else:
|
|
context = {**data, **dict(
|
|
extra_template_data()
|
|
if callable(extra_template_data)
|
|
else extra_template_data
|
|
)}
|
|
r = jinja.render(
|
|
self.template,
|
|
request,
|
|
**context,
|
|
)
|
|
# Set far-future cache expiry
|
|
r.headers['Cache-Control'] = 'max-age={}'.format(
|
|
365 * 24 * 60 * 60
|
|
)
|
|
return r
|
|
|
|
|
|
@app.route('/')
|
|
async def index(request, sql=None):
|
|
databases = ensure_build_metadata(True)
|
|
return jinja.render(
|
|
'index.html',
|
|
request,
|
|
databases=databases,
|
|
)
|
|
|
|
|
|
@app.route('/favicon.ico')
|
|
async def favicon(request):
|
|
return response.text('')
|
|
|
|
|
|
class DatabaseView(BaseView):
|
|
template = 'database.html'
|
|
|
|
def data(self, request, name, hash):
|
|
conn = get_conn(name)
|
|
sql = request.args.get('sql') or 'select * from sqlite_master'
|
|
rows = conn.execute(sql)
|
|
columns = [r[0] for r in rows.description]
|
|
return {
|
|
'database': name,
|
|
'rows': rows,
|
|
'columns': columns,
|
|
}, {
|
|
'database_hash': hash,
|
|
}
|
|
|
|
|
|
class TableView(BaseView):
|
|
template = 'table.html'
|
|
|
|
def data(self, request, name, hash, table):
|
|
conn = get_conn(name)
|
|
rows = conn.execute('select * from {} limit 20'.format(table))
|
|
columns = [r[0] for r in rows.description]
|
|
pks = pks_for_table(conn, table)
|
|
rows = list(rows)
|
|
return {
|
|
'database': name,
|
|
'table': table,
|
|
'rows': rows,
|
|
'columns': columns,
|
|
'primary_keys': pks,
|
|
}, lambda: {
|
|
'database_hash': hash,
|
|
'row_link': lambda row: path_from_row_pks(row, pks),
|
|
}
|
|
|
|
|
|
class RowView(BaseView):
|
|
template = 'table.html'
|
|
|
|
def data(self, request, name, hash, table, pk_path):
|
|
conn = get_conn(name)
|
|
pk_values = compound_pks_from_path(pk_path)
|
|
pks = pks_for_table(conn, table)
|
|
wheres = [
|
|
'{}=?'.format(pk)
|
|
for pk in pks
|
|
]
|
|
sql = 'select * from "{}" where {}'.format(
|
|
table, ' AND '.join(wheres)
|
|
)
|
|
rows = conn.execute(sql, pk_values)
|
|
columns = [r[0] for r in rows.description]
|
|
pks = pks_for_table(conn, table)
|
|
rows = list(rows)
|
|
if not rows:
|
|
raise NotFound('Record not found: {}'.format(pk_values))
|
|
return {
|
|
'database': name,
|
|
'table': table,
|
|
'rows': rows,
|
|
'columns': columns,
|
|
'primary_keys': pks,
|
|
}, {
|
|
'database_hash': hash,
|
|
'row_link': None,
|
|
}
|
|
|
|
|
|
app.add_route(DatabaseView.as_view(), '/<db_name:[^/]+?><as_json:(.jsono?)?$>')
|
|
app.add_route(TableView.as_view(), '/<db_name:[^/]+>/<table:[^/]+?><as_json:(.jsono?)?$>')
|
|
app.add_route(RowView.as_view(), '/<db_name:[^/]+>/<table:[^/]+?>/<pk_path:[^/]+?><as_json:(.jsono?)?$>')
|
|
|
|
|
|
def resolve_db_name(db_name, **kwargs):
|
|
databases = ensure_build_metadata()
|
|
hash = None
|
|
name = None
|
|
if '-' in db_name:
|
|
# Might be name-and-hash, or might just be
|
|
# a name with a hyphen in it
|
|
name, hash = db_name.rsplit('-', 1)
|
|
if name not in databases:
|
|
# Try the whole name
|
|
name = db_name
|
|
hash = None
|
|
else:
|
|
name = db_name
|
|
# Verify the hash
|
|
try:
|
|
info = databases[name]
|
|
except KeyError:
|
|
raise NotFound('Database not found: {}'.format(name))
|
|
expected = info['hash'][:7]
|
|
if expected != hash:
|
|
should_redirect = '/{}-{}'.format(
|
|
name, expected,
|
|
)
|
|
if 'table' in kwargs:
|
|
should_redirect += '/' + kwargs['table']
|
|
return name, expected, should_redirect
|
|
return name, expected, None
|
|
|
|
|
|
def compound_pks_from_path(path):
|
|
return [
|
|
urllib.parse.unquote_plus(b) for b in path.split(',')
|
|
]
|
|
|
|
|
|
def pks_for_table(conn, table):
|
|
rows = [
|
|
row for row in conn.execute(
|
|
'PRAGMA table_info("{}")'.format(table)
|
|
).fetchall()
|
|
if row[-1]
|
|
]
|
|
rows.sort(key=lambda row: row[-1])
|
|
return [str(r[1]) for r in rows]
|
|
|
|
|
|
def path_from_row_pks(row, pks):
|
|
if not pks:
|
|
return ''
|
|
bits = []
|
|
for pk in pks:
|
|
bits.append(
|
|
urllib.parse.quote_plus(str(row[pk]))
|
|
)
|
|
return ','.join(bits)
|
|
|
|
|
|
class CustomJSONEncoder(json.JSONEncoder):
|
|
def default(self, obj):
|
|
if isinstance(obj, sqlite3.Row):
|
|
return tuple(obj)
|
|
if isinstance(obj, sqlite3.Cursor):
|
|
return list(obj)
|
|
if isinstance(obj, bytes):
|
|
# Does it encode to utf8?
|
|
try:
|
|
return obj.decode('utf8')
|
|
except UnicodeDecodeError:
|
|
return {
|
|
'$base64': True,
|
|
'encoded': base64.b64encode(obj).decode('latin1'),
|
|
}
|
|
return json.JSONEncoder.default(self, obj)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
if '--build' in sys.argv:
|
|
ensure_build_metadata(True)
|
|
else:
|
|
app.run(host="0.0.0.0", port=8006)
|