Apply black to everything, enforce via unit tests (#449)

I've run the black code formatting tool against everything:

    black tests datasette setup.py

I also added a new unit test, in tests/test_black.py, which will fail if the code does not
conform to black's exacting standards.

This unit test only runs on Python 3.6 or higher, because black itself doesn't run on 3.5.
This commit is contained in:
Simon Willison 2019-05-03 22:15:14 -04:00 committed by GitHub
commit 35d6ee2790
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
31 changed files with 2758 additions and 2702 deletions

View file

@ -21,27 +21,29 @@ except ImportError:
import sqlite3
# From https://www.sqlite.org/lang_keywords.html
reserved_words = set((
'abort action add after all alter analyze and as asc attach autoincrement '
'before begin between by cascade case cast check collate column commit '
'conflict constraint create cross current_date current_time '
'current_timestamp database default deferrable deferred delete desc detach '
'distinct drop each else end escape except exclusive exists explain fail '
'for foreign from full glob group having if ignore immediate in index '
'indexed initially inner insert instead intersect into is isnull join key '
'left like limit match natural no not notnull null of offset on or order '
'outer plan pragma primary query raise recursive references regexp reindex '
'release rename replace restrict right rollback row savepoint select set '
'table temp temporary then to transaction trigger union unique update using '
'vacuum values view virtual when where with without'
).split())
reserved_words = set(
(
"abort action add after all alter analyze and as asc attach autoincrement "
"before begin between by cascade case cast check collate column commit "
"conflict constraint create cross current_date current_time "
"current_timestamp database default deferrable deferred delete desc detach "
"distinct drop each else end escape except exclusive exists explain fail "
"for foreign from full glob group having if ignore immediate in index "
"indexed initially inner insert instead intersect into is isnull join key "
"left like limit match natural no not notnull null of offset on or order "
"outer plan pragma primary query raise recursive references regexp reindex "
"release rename replace restrict right rollback row savepoint select set "
"table temp temporary then to transaction trigger union unique update using "
"vacuum values view virtual when where with without"
).split()
)
SPATIALITE_DOCKERFILE_EXTRAS = r'''
SPATIALITE_DOCKERFILE_EXTRAS = r"""
RUN apt-get update && \
apt-get install -y python3-dev gcc libsqlite3-mod-spatialite && \
rm -rf /var/lib/apt/lists/*
ENV SQLITE_EXTENSIONS /usr/lib/x86_64-linux-gnu/mod_spatialite.so
'''
"""
class InterruptedError(Exception):
@ -67,27 +69,24 @@ class Results:
def urlsafe_components(token):
"Splits token on commas and URL decodes each component"
return [
urllib.parse.unquote_plus(b) for b in token.split(',')
]
return [urllib.parse.unquote_plus(b) for b in token.split(",")]
def path_from_row_pks(row, pks, use_rowid, quote=True):
""" Generate an optionally URL-quoted unique identifier
for a row from its primary keys."""
if use_rowid:
bits = [row['rowid']]
bits = [row["rowid"]]
else:
bits = [
row[pk]["value"] if isinstance(row[pk], dict) else row[pk]
for pk in pks
row[pk]["value"] if isinstance(row[pk], dict) else row[pk] for pk in pks
]
if quote:
bits = [urllib.parse.quote_plus(str(bit)) for bit in bits]
else:
bits = [str(bit) for bit in bits]
return ','.join(bits)
return ",".join(bits)
def compound_keys_after_sql(pks, start_index=0):
@ -106,16 +105,17 @@ def compound_keys_after_sql(pks, start_index=0):
and_clauses = []
last = pks_left[-1]
rest = pks_left[:-1]
and_clauses = ['{} = :p{}'.format(
escape_sqlite(pk), (i + start_index)
) for i, pk in enumerate(rest)]
and_clauses.append('{} > :p{}'.format(
escape_sqlite(last), (len(rest) + start_index)
))
or_clauses.append('({})'.format(' and '.join(and_clauses)))
and_clauses = [
"{} = :p{}".format(escape_sqlite(pk), (i + start_index))
for i, pk in enumerate(rest)
]
and_clauses.append(
"{} > :p{}".format(escape_sqlite(last), (len(rest) + start_index))
)
or_clauses.append("({})".format(" and ".join(and_clauses)))
pks_left.pop()
or_clauses.reverse()
return '({})'.format('\n or\n'.join(or_clauses))
return "({})".format("\n or\n".join(or_clauses))
class CustomJSONEncoder(json.JSONEncoder):
@ -127,11 +127,11 @@ class CustomJSONEncoder(json.JSONEncoder):
if isinstance(obj, bytes):
# Does it encode to utf8?
try:
return obj.decode('utf8')
return obj.decode("utf8")
except UnicodeDecodeError:
return {
'$base64': True,
'encoded': base64.b64encode(obj).decode('latin1'),
"$base64": True,
"encoded": base64.b64encode(obj).decode("latin1"),
}
return json.JSONEncoder.default(self, obj)
@ -163,20 +163,18 @@ class InvalidSql(Exception):
allowed_sql_res = [
re.compile(r'^select\b'),
re.compile(r'^explain select\b'),
re.compile(r'^explain query plan select\b'),
re.compile(r'^with\b'),
]
disallawed_sql_res = [
(re.compile('pragma'), 'Statement may not contain PRAGMA'),
re.compile(r"^select\b"),
re.compile(r"^explain select\b"),
re.compile(r"^explain query plan select\b"),
re.compile(r"^with\b"),
]
disallawed_sql_res = [(re.compile("pragma"), "Statement may not contain PRAGMA")]
def validate_sql_select(sql):
sql = sql.strip().lower()
if not any(r.match(sql) for r in allowed_sql_res):
raise InvalidSql('Statement must be a SELECT')
raise InvalidSql("Statement must be a SELECT")
for r, msg in disallawed_sql_res:
if r.search(sql):
raise InvalidSql(msg)
@ -184,9 +182,7 @@ def validate_sql_select(sql):
def append_querystring(url, querystring):
op = "&" if ("?" in url) else "?"
return "{}{}{}".format(
url, op, querystring
)
return "{}{}{}".format(url, op, querystring)
def path_with_added_args(request, args, path=None):
@ -198,14 +194,10 @@ def path_with_added_args(request, args, path=None):
for key, value in urllib.parse.parse_qsl(request.query_string):
if key not in args_to_remove:
current.append((key, value))
current.extend([
(key, value)
for key, value in args
if value is not None
])
current.extend([(key, value) for key, value in args if value is not None])
query_string = urllib.parse.urlencode(current)
if query_string:
query_string = '?{}'.format(query_string)
query_string = "?{}".format(query_string)
return path + query_string
@ -220,18 +212,21 @@ def path_with_removed_args(request, args, path=None):
# args can be a dict or a set
current = []
if isinstance(args, set):
def should_remove(key, value):
return key in args
elif isinstance(args, dict):
# Must match key AND value
def should_remove(key, value):
return args.get(key) == value
for key, value in urllib.parse.parse_qsl(query_string):
if not should_remove(key, value):
current.append((key, value))
query_string = urllib.parse.urlencode(current)
if query_string:
query_string = '?{}'.format(query_string)
query_string = "?{}".format(query_string)
return path + query_string
@ -247,54 +242,66 @@ def path_with_replaced_args(request, args, path=None):
current.extend([p for p in args if p[1] is not None])
query_string = urllib.parse.urlencode(current)
if query_string:
query_string = '?{}'.format(query_string)
query_string = "?{}".format(query_string)
return path + query_string
_css_re = re.compile(r'''['"\n\\]''')
_boring_keyword_re = re.compile(r'^[a-zA-Z_][a-zA-Z0-9_]*$')
_css_re = re.compile(r"""['"\n\\]""")
_boring_keyword_re = re.compile(r"^[a-zA-Z_][a-zA-Z0-9_]*$")
def escape_css_string(s):
return _css_re.sub(lambda m: '\\{:X}'.format(ord(m.group())), s)
return _css_re.sub(lambda m: "\\{:X}".format(ord(m.group())), s)
def escape_sqlite(s):
if _boring_keyword_re.match(s) and (s.lower() not in reserved_words):
return s
else:
return '[{}]'.format(s)
return "[{}]".format(s)
def make_dockerfile(files, metadata_file, extra_options, branch, template_dir, plugins_dir, static, install, spatialite, version_note):
cmd = ['datasette', 'serve', '--host', '0.0.0.0']
def make_dockerfile(
files,
metadata_file,
extra_options,
branch,
template_dir,
plugins_dir,
static,
install,
spatialite,
version_note,
):
cmd = ["datasette", "serve", "--host", "0.0.0.0"]
cmd.append('", "'.join(files))
cmd.extend(['--cors', '--inspect-file', 'inspect-data.json'])
cmd.extend(["--cors", "--inspect-file", "inspect-data.json"])
if metadata_file:
cmd.extend(['--metadata', '{}'.format(metadata_file)])
cmd.extend(["--metadata", "{}".format(metadata_file)])
if template_dir:
cmd.extend(['--template-dir', 'templates/'])
cmd.extend(["--template-dir", "templates/"])
if plugins_dir:
cmd.extend(['--plugins-dir', 'plugins/'])
cmd.extend(["--plugins-dir", "plugins/"])
if version_note:
cmd.extend(['--version-note', '{}'.format(version_note)])
cmd.extend(["--version-note", "{}".format(version_note)])
if static:
for mount_point, _ in static:
cmd.extend(['--static', '{}:{}'.format(mount_point, mount_point)])
cmd.extend(["--static", "{}:{}".format(mount_point, mount_point)])
if extra_options:
for opt in extra_options.split():
cmd.append('{}'.format(opt))
cmd.append("{}".format(opt))
cmd = [shlex.quote(part) for part in cmd]
# port attribute is a (fixed) env variable and should not be quoted
cmd.extend(['--port', '$PORT'])
cmd = ' '.join(cmd)
cmd.extend(["--port", "$PORT"])
cmd = " ".join(cmd)
if branch:
install = ['https://github.com/simonw/datasette/archive/{}.zip'.format(
branch
)] + list(install)
install = [
"https://github.com/simonw/datasette/archive/{}.zip".format(branch)
] + list(install)
else:
install = ['datasette'] + list(install)
install = ["datasette"] + list(install)
return '''
return """
FROM python:3.6
COPY . /app
WORKDIR /app
@ -303,11 +310,11 @@ RUN pip install -U {install_from}
RUN datasette inspect {files} --inspect-file inspect-data.json
ENV PORT 8001
EXPOSE 8001
CMD {cmd}'''.format(
files=' '.join(files),
CMD {cmd}""".format(
files=" ".join(files),
cmd=cmd,
install_from=' '.join(install),
spatialite_extras=SPATIALITE_DOCKERFILE_EXTRAS if spatialite else '',
install_from=" ".join(install),
spatialite_extras=SPATIALITE_DOCKERFILE_EXTRAS if spatialite else "",
).strip()
@ -324,7 +331,7 @@ def temporary_docker_directory(
install,
spatialite,
version_note,
extra_metadata=None
extra_metadata=None,
):
extra_metadata = extra_metadata or {}
tmp = tempfile.TemporaryDirectory()
@ -332,10 +339,7 @@ def temporary_docker_directory(
datasette_dir = os.path.join(tmp.name, name)
os.mkdir(datasette_dir)
saved_cwd = os.getcwd()
file_paths = [
os.path.join(saved_cwd, file_path)
for file_path in files
]
file_paths = [os.path.join(saved_cwd, file_path) for file_path in files]
file_names = [os.path.split(f)[-1] for f in files]
if metadata:
metadata_content = json.load(metadata)
@ -347,7 +351,7 @@ def temporary_docker_directory(
try:
dockerfile = make_dockerfile(
file_names,
metadata_content and 'metadata.json',
metadata_content and "metadata.json",
extra_options,
branch,
template_dir,
@ -359,24 +363,23 @@ def temporary_docker_directory(
)
os.chdir(datasette_dir)
if metadata_content:
open('metadata.json', 'w').write(json.dumps(metadata_content, indent=2))
open('Dockerfile', 'w').write(dockerfile)
open("metadata.json", "w").write(json.dumps(metadata_content, indent=2))
open("Dockerfile", "w").write(dockerfile)
for path, filename in zip(file_paths, file_names):
link_or_copy(path, os.path.join(datasette_dir, filename))
if template_dir:
link_or_copy_directory(
os.path.join(saved_cwd, template_dir),
os.path.join(datasette_dir, 'templates')
os.path.join(datasette_dir, "templates"),
)
if plugins_dir:
link_or_copy_directory(
os.path.join(saved_cwd, plugins_dir),
os.path.join(datasette_dir, 'plugins')
os.path.join(datasette_dir, "plugins"),
)
for mount_point, path in static:
link_or_copy_directory(
os.path.join(saved_cwd, path),
os.path.join(datasette_dir, mount_point)
os.path.join(saved_cwd, path), os.path.join(datasette_dir, mount_point)
)
yield datasette_dir
finally:
@ -396,7 +399,7 @@ def temporary_heroku_directory(
static,
install,
version_note,
extra_metadata=None
extra_metadata=None,
):
# FIXME: lots of duplicated code from above
@ -404,10 +407,7 @@ def temporary_heroku_directory(
tmp = tempfile.TemporaryDirectory()
saved_cwd = os.getcwd()
file_paths = [
os.path.join(saved_cwd, file_path)
for file_path in files
]
file_paths = [os.path.join(saved_cwd, file_path) for file_path in files]
file_names = [os.path.split(f)[-1] for f in files]
if metadata:
@ -422,53 +422,54 @@ def temporary_heroku_directory(
os.chdir(tmp.name)
if metadata_content:
open('metadata.json', 'w').write(json.dumps(metadata_content, indent=2))
open("metadata.json", "w").write(json.dumps(metadata_content, indent=2))
open('runtime.txt', 'w').write('python-3.6.7')
open("runtime.txt", "w").write("python-3.6.7")
if branch:
install = ['https://github.com/simonw/datasette/archive/{branch}.zip'.format(
branch=branch
)] + list(install)
install = [
"https://github.com/simonw/datasette/archive/{branch}.zip".format(
branch=branch
)
] + list(install)
else:
install = ['datasette'] + list(install)
install = ["datasette"] + list(install)
open('requirements.txt', 'w').write('\n'.join(install))
os.mkdir('bin')
open('bin/post_compile', 'w').write('datasette inspect --inspect-file inspect-data.json')
open("requirements.txt", "w").write("\n".join(install))
os.mkdir("bin")
open("bin/post_compile", "w").write(
"datasette inspect --inspect-file inspect-data.json"
)
extras = []
if template_dir:
link_or_copy_directory(
os.path.join(saved_cwd, template_dir),
os.path.join(tmp.name, 'templates')
os.path.join(tmp.name, "templates"),
)
extras.extend(['--template-dir', 'templates/'])
extras.extend(["--template-dir", "templates/"])
if plugins_dir:
link_or_copy_directory(
os.path.join(saved_cwd, plugins_dir),
os.path.join(tmp.name, 'plugins')
os.path.join(saved_cwd, plugins_dir), os.path.join(tmp.name, "plugins")
)
extras.extend(['--plugins-dir', 'plugins/'])
extras.extend(["--plugins-dir", "plugins/"])
if version_note:
extras.extend(['--version-note', version_note])
extras.extend(["--version-note", version_note])
if metadata_content:
extras.extend(['--metadata', 'metadata.json'])
extras.extend(["--metadata", "metadata.json"])
if extra_options:
extras.extend(extra_options.split())
for mount_point, path in static:
link_or_copy_directory(
os.path.join(saved_cwd, path),
os.path.join(tmp.name, mount_point)
os.path.join(saved_cwd, path), os.path.join(tmp.name, mount_point)
)
extras.extend(['--static', '{}:{}'.format(mount_point, mount_point)])
extras.extend(["--static", "{}:{}".format(mount_point, mount_point)])
quoted_files = " ".join(map(shlex.quote, file_names))
procfile_cmd = 'web: datasette serve --host 0.0.0.0 {quoted_files} --cors --port $PORT --inspect-file inspect-data.json {extras}'.format(
quoted_files=quoted_files,
extras=' '.join(extras),
procfile_cmd = "web: datasette serve --host 0.0.0.0 {quoted_files} --cors --port $PORT --inspect-file inspect-data.json {extras}".format(
quoted_files=quoted_files, extras=" ".join(extras)
)
open('Procfile', 'w').write(procfile_cmd)
open("Procfile", "w").write(procfile_cmd)
for path, filename in zip(file_paths, file_names):
link_or_copy(path, os.path.join(tmp.name, filename))
@ -484,9 +485,7 @@ def detect_primary_keys(conn, table):
" Figure out primary keys for a table. "
table_info_rows = [
row
for row in conn.execute(
'PRAGMA table_info("{}")'.format(table)
).fetchall()
for row in conn.execute('PRAGMA table_info("{}")'.format(table)).fetchall()
if row[-1]
]
table_info_rows.sort(key=lambda row: row[-1])
@ -494,33 +493,26 @@ def detect_primary_keys(conn, table):
def get_outbound_foreign_keys(conn, table):
infos = conn.execute(
'PRAGMA foreign_key_list([{}])'.format(table)
).fetchall()
infos = conn.execute("PRAGMA foreign_key_list([{}])".format(table)).fetchall()
fks = []
for info in infos:
if info is not None:
id, seq, table_name, from_, to_, on_update, on_delete, match = info
fks.append({
'other_table': table_name,
'column': from_,
'other_column': to_
})
fks.append(
{"other_table": table_name, "column": from_, "other_column": to_}
)
return fks
def get_all_foreign_keys(conn):
tables = [r[0] for r in conn.execute('select name from sqlite_master where type="table"')]
tables = [
r[0] for r in conn.execute('select name from sqlite_master where type="table"')
]
table_to_foreign_keys = {}
for table in tables:
table_to_foreign_keys[table] = {
'incoming': [],
'outgoing': [],
}
table_to_foreign_keys[table] = {"incoming": [], "outgoing": []}
for table in tables:
infos = conn.execute(
'PRAGMA foreign_key_list([{}])'.format(table)
).fetchall()
infos = conn.execute("PRAGMA foreign_key_list([{}])".format(table)).fetchall()
for info in infos:
if info is not None:
id, seq, table_name, from_, to_, on_update, on_delete, match = info
@ -528,22 +520,20 @@ def get_all_foreign_keys(conn):
# Weird edge case where something refers to a table that does
# not actually exist
continue
table_to_foreign_keys[table_name]['incoming'].append({
'other_table': table,
'column': to_,
'other_column': from_
})
table_to_foreign_keys[table]['outgoing'].append({
'other_table': table_name,
'column': from_,
'other_column': to_
})
table_to_foreign_keys[table_name]["incoming"].append(
{"other_table": table, "column": to_, "other_column": from_}
)
table_to_foreign_keys[table]["outgoing"].append(
{"other_table": table_name, "column": from_, "other_column": to_}
)
return table_to_foreign_keys
def detect_spatialite(conn):
rows = conn.execute('select 1 from sqlite_master where tbl_name = "geometry_columns"').fetchall()
rows = conn.execute(
'select 1 from sqlite_master where tbl_name = "geometry_columns"'
).fetchall()
return len(rows) > 0
@ -557,7 +547,7 @@ def detect_fts(conn, table):
def detect_fts_sql(table):
return r'''
return r"""
select name from sqlite_master
where rootpage = 0
and (
@ -567,7 +557,9 @@ def detect_fts_sql(table):
and sql like '%VIRTUAL TABLE%USING FTS%'
)
)
'''.format(table=table)
""".format(
table=table
)
def detect_json1(conn=None):
@ -589,51 +581,53 @@ def table_columns(conn, table):
]
filter_column_re = re.compile(r'^_filter_column_\d+$')
filter_column_re = re.compile(r"^_filter_column_\d+$")
def filters_should_redirect(special_args):
redirect_params = []
# Handle _filter_column=foo&_filter_op=exact&_filter_value=...
filter_column = special_args.get('_filter_column')
filter_op = special_args.get('_filter_op') or ''
filter_value = special_args.get('_filter_value') or ''
if '__' in filter_op:
filter_op, filter_value = filter_op.split('__', 1)
filter_column = special_args.get("_filter_column")
filter_op = special_args.get("_filter_op") or ""
filter_value = special_args.get("_filter_value") or ""
if "__" in filter_op:
filter_op, filter_value = filter_op.split("__", 1)
if filter_column:
redirect_params.append(
('{}__{}'.format(filter_column, filter_op), filter_value)
("{}__{}".format(filter_column, filter_op), filter_value)
)
for key in ('_filter_column', '_filter_op', '_filter_value'):
for key in ("_filter_column", "_filter_op", "_filter_value"):
if key in special_args:
redirect_params.append((key, None))
# Now handle _filter_column_1=name&_filter_op_1=contains&_filter_value_1=hello
column_keys = [k for k in special_args if filter_column_re.match(k)]
for column_key in column_keys:
number = column_key.split('_')[-1]
number = column_key.split("_")[-1]
column = special_args[column_key]
op = special_args.get('_filter_op_{}'.format(number)) or 'exact'
value = special_args.get('_filter_value_{}'.format(number)) or ''
if '__' in op:
op, value = op.split('__', 1)
op = special_args.get("_filter_op_{}".format(number)) or "exact"
value = special_args.get("_filter_value_{}".format(number)) or ""
if "__" in op:
op, value = op.split("__", 1)
if column:
redirect_params.append(('{}__{}'.format(column, op), value))
redirect_params.extend([
('_filter_column_{}'.format(number), None),
('_filter_op_{}'.format(number), None),
('_filter_value_{}'.format(number), None),
])
redirect_params.append(("{}__{}".format(column, op), value))
redirect_params.extend(
[
("_filter_column_{}".format(number), None),
("_filter_op_{}".format(number), None),
("_filter_value_{}".format(number), None),
]
)
return redirect_params
whitespace_re = re.compile(r'\s')
whitespace_re = re.compile(r"\s")
def is_url(value):
"Must start with http:// or https:// and contain JUST a URL"
if not isinstance(value, str):
return False
if not value.startswith('http://') and not value.startswith('https://'):
if not value.startswith("http://") and not value.startswith("https://"):
return False
# Any whitespace at all is invalid
if whitespace_re.search(value):
@ -641,8 +635,8 @@ def is_url(value):
return True
css_class_re = re.compile(r'^[a-zA-Z]+[_a-zA-Z0-9-]*$')
css_invalid_chars_re = re.compile(r'[^a-zA-Z0-9_\-]')
css_class_re = re.compile(r"^[a-zA-Z]+[_a-zA-Z0-9-]*$")
css_invalid_chars_re = re.compile(r"[^a-zA-Z0-9_\-]")
def to_css_class(s):
@ -656,16 +650,16 @@ def to_css_class(s):
"""
if css_class_re.match(s):
return s
md5_suffix = hashlib.md5(s.encode('utf8')).hexdigest()[:6]
md5_suffix = hashlib.md5(s.encode("utf8")).hexdigest()[:6]
# Strip leading _, -
s = s.lstrip('_').lstrip('-')
s = s.lstrip("_").lstrip("-")
# Replace any whitespace with hyphens
s = '-'.join(s.split())
s = "-".join(s.split())
# Remove any remaining invalid characters
s = css_invalid_chars_re.sub('', s)
s = css_invalid_chars_re.sub("", s)
# Attach the md5 suffix
bits = [b for b in (s, md5_suffix) if b]
return '-'.join(bits)
return "-".join(bits)
def link_or_copy(src, dst):
@ -689,8 +683,8 @@ def module_from_path(path, name):
# Adapted from http://sayspy.blogspot.com/2011/07/how-to-import-module-from-just-file.html
mod = imp.new_module(name)
mod.__file__ = path
with open(path, 'r') as file:
code = compile(file.read(), path, 'exec', dont_inherit=True)
with open(path, "r") as file:
code = compile(file.read(), path, "exec", dont_inherit=True)
exec(code, mod.__dict__)
return mod
@ -702,37 +696,39 @@ def get_plugins(pm):
static_path = None
templates_path = None
try:
if pkg_resources.resource_isdir(plugin.__name__, 'static'):
static_path = pkg_resources.resource_filename(plugin.__name__, 'static')
if pkg_resources.resource_isdir(plugin.__name__, 'templates'):
templates_path = pkg_resources.resource_filename(plugin.__name__, 'templates')
if pkg_resources.resource_isdir(plugin.__name__, "static"):
static_path = pkg_resources.resource_filename(plugin.__name__, "static")
if pkg_resources.resource_isdir(plugin.__name__, "templates"):
templates_path = pkg_resources.resource_filename(
plugin.__name__, "templates"
)
except (KeyError, ImportError):
# Caused by --plugins_dir= plugins - KeyError/ImportError thrown in Py3.5
pass
plugin_info = {
'name': plugin.__name__,
'static_path': static_path,
'templates_path': templates_path,
"name": plugin.__name__,
"static_path": static_path,
"templates_path": templates_path,
}
distinfo = plugin_to_distinfo.get(plugin)
if distinfo:
plugin_info['version'] = distinfo.version
plugin_info["version"] = distinfo.version
plugins.append(plugin_info)
return plugins
async def resolve_table_and_format(table_and_format, table_exists, allowed_formats=[]):
if '.' in table_and_format:
if "." in table_and_format:
# Check if a table exists with this exact name
it_exists = await table_exists(table_and_format)
if it_exists:
return table_and_format, None
# Check if table ends with a known format
formats = list(allowed_formats) + ['csv', 'jsono']
formats = list(allowed_formats) + ["csv", "jsono"]
for _format in formats:
if table_and_format.endswith(".{}".format(_format)):
table = table_and_format[:-(len(_format) + 1)]
table = table_and_format[: -(len(_format) + 1)]
return table, _format
return table_and_format, None
@ -747,9 +743,7 @@ def path_with_format(request, format, extra_qs=None):
if qs:
extra = urllib.parse.urlencode(sorted(qs.items()))
if request.query_string:
path = "{}?{}&{}".format(
path, request.query_string, extra
)
path = "{}?{}&{}".format(path, request.query_string, extra)
else:
path = "{}?{}".format(path, extra)
elif request.query_string:
@ -777,9 +771,9 @@ class CustomRow(OrderedDict):
def value_as_boolean(value):
if value.lower() not in ('on', 'off', 'true', 'false', '1', '0'):
if value.lower() not in ("on", "off", "true", "false", "1", "0"):
raise ValueAsBooleanError
return value.lower() in ('on', 'true', '1')
return value.lower() in ("on", "true", "1")
class ValueAsBooleanError(ValueError):
@ -799,9 +793,9 @@ class LimitedWriter:
def write(self, bytes):
self.bytes_count += len(bytes)
if self.limit_bytes and (self.bytes_count > self.limit_bytes):
raise WriteLimitExceeded("CSV contains more than {} bytes".format(
self.limit_bytes
))
raise WriteLimitExceeded(
"CSV contains more than {} bytes".format(self.limit_bytes)
)
self.writer.write(bytes)
@ -810,10 +804,7 @@ _infinities = {float("inf"), float("-inf")}
def remove_infinites(row):
if any((c in _infinities) if isinstance(c, float) else 0 for c in row):
return [
None if (isinstance(c, float) and c in _infinities) else c
for c in row
]
return [None if (isinstance(c, float) and c in _infinities) else c for c in row]
return row
@ -824,7 +815,8 @@ class StaticMount(click.ParamType):
if ":" not in value:
self.fail(
'"{}" should be of format mountpoint:directory'.format(value),
param, ctx
param,
ctx,
)
path, dirpath = value.split(":")
if not os.path.exists(dirpath) or not os.path.isdir(dirpath):