Use f-strings in place of .format()

Code transformed like so:

    pip install flynt
    flynt .
    black .
This commit is contained in:
Simon Willison 2020-11-15 15:24:22 -08:00 committed by GitHub
commit 30e64c8d3b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
35 changed files with 213 additions and 277 deletions

View file

@ -115,13 +115,10 @@ def compound_keys_after_sql(pks, start_index=0):
last = pks_left[-1]
rest = pks_left[:-1]
and_clauses = [
"{} = :p{}".format(escape_sqlite(pk), (i + start_index))
for i, pk in enumerate(rest)
f"{escape_sqlite(pk)} = :p{i + start_index}" for i, pk in enumerate(rest)
]
and_clauses.append(
"{} > :p{}".format(escape_sqlite(last), (len(rest) + start_index))
)
or_clauses.append("({})".format(" and ".join(and_clauses)))
and_clauses.append(f"{escape_sqlite(last)} > :p{len(rest) + start_index}")
or_clauses.append(f"({' and '.join(and_clauses)})")
pks_left.pop()
or_clauses.reverse()
return "({})".format("\n or\n".join(or_clauses))
@ -195,7 +192,7 @@ allowed_pragmas = (
)
disallawed_sql_res = [
(
re.compile("pragma(?!_({}))".format("|".join(allowed_pragmas))),
re.compile(f"pragma(?!_({'|'.join(allowed_pragmas)}))"),
"Statement may not contain PRAGMA",
)
]
@ -215,7 +212,7 @@ def validate_sql_select(sql):
def append_querystring(url, querystring):
op = "&" if ("?" in url) else "?"
return "{}{}{}".format(url, op, querystring)
return f"{url}{op}{querystring}"
def path_with_added_args(request, args, path=None):
@ -230,7 +227,7 @@ def path_with_added_args(request, args, path=None):
current.extend([(key, value) for key, value in args if value is not None])
query_string = urllib.parse.urlencode(current)
if query_string:
query_string = "?{}".format(query_string)
query_string = f"?{query_string}"
return path + query_string
@ -259,7 +256,7 @@ def path_with_removed_args(request, args, path=None):
current.append((key, value))
query_string = urllib.parse.urlencode(current)
if query_string:
query_string = "?{}".format(query_string)
query_string = f"?{query_string}"
return path + query_string
@ -275,7 +272,7 @@ def path_with_replaced_args(request, args, path=None):
current.extend([p for p in args if p[1] is not None])
query_string = urllib.parse.urlencode(current)
if query_string:
query_string = "?{}".format(query_string)
query_string = f"?{query_string}"
return path + query_string
@ -285,7 +282,7 @@ _boring_keyword_re = re.compile(r"^[a-zA-Z_][a-zA-Z0-9_]*$")
def escape_css_string(s):
return _css_re.sub(
lambda m: "\\" + ("{:X}".format(ord(m.group())).zfill(6)),
lambda m: "\\" + (f"{ord(m.group()):X}".zfill(6)),
s.replace("\r\n", "\n"),
)
@ -294,7 +291,7 @@ def escape_sqlite(s):
if _boring_keyword_re.match(s) and (s.lower() not in reserved_words):
return s
else:
return "[{}]".format(s)
return f"[{s}]"
def make_dockerfile(
@ -319,27 +316,27 @@ def make_dockerfile(
cmd.extend(["-i", filename])
cmd.extend(["--cors", "--inspect-file", "inspect-data.json"])
if metadata_file:
cmd.extend(["--metadata", "{}".format(metadata_file)])
cmd.extend(["--metadata", f"{metadata_file}"])
if template_dir:
cmd.extend(["--template-dir", "templates/"])
if plugins_dir:
cmd.extend(["--plugins-dir", "plugins/"])
if version_note:
cmd.extend(["--version-note", "{}".format(version_note)])
cmd.extend(["--version-note", f"{version_note}"])
if static:
for mount_point, _ in static:
cmd.extend(["--static", "{}:{}".format(mount_point, mount_point)])
cmd.extend(["--static", f"{mount_point}:{mount_point}"])
if extra_options:
for opt in extra_options.split():
cmd.append("{}".format(opt))
cmd.append(f"{opt}")
cmd = [shlex.quote(part) for part in cmd]
# port attribute is a (fixed) env variable and should not be quoted
cmd.extend(["--port", "$PORT"])
cmd = " ".join(cmd)
if branch:
install = [
"https://github.com/simonw/datasette/archive/{}.zip".format(branch)
] + list(install)
install = [f"https://github.com/simonw/datasette/archive/{branch}.zip"] + list(
install
)
else:
install = ["datasette"] + list(install)
@ -449,7 +446,7 @@ def detect_primary_keys(conn, table):
" Figure out primary keys for a table. "
table_info_rows = [
row
for row in conn.execute('PRAGMA table_info("{}")'.format(table)).fetchall()
for row in conn.execute(f'PRAGMA table_info("{table}")').fetchall()
if row[-1]
]
table_info_rows.sort(key=lambda row: row[-1])
@ -457,7 +454,7 @@ def detect_primary_keys(conn, table):
def get_outbound_foreign_keys(conn, table):
infos = conn.execute("PRAGMA foreign_key_list([{}])".format(table)).fetchall()
infos = conn.execute(f"PRAGMA foreign_key_list([{table}])").fetchall()
fks = []
for info in infos:
if info is not None:
@ -476,7 +473,7 @@ def get_all_foreign_keys(conn):
for table in tables:
table_to_foreign_keys[table] = {"incoming": [], "outgoing": []}
for table in tables:
infos = conn.execute("PRAGMA foreign_key_list([{}])".format(table)).fetchall()
infos = conn.execute(f"PRAGMA foreign_key_list([{table}])").fetchall()
for info in infos:
if info is not None:
id, seq, table_name, from_, to_, on_update, on_delete, match = info
@ -544,9 +541,7 @@ def table_columns(conn, table):
def table_column_details(conn, table):
return [
Column(*r)
for r in conn.execute(
"PRAGMA table_info({});".format(escape_sqlite(table))
).fetchall()
for r in conn.execute(f"PRAGMA table_info({escape_sqlite(table)});").fetchall()
]
@ -562,9 +557,7 @@ def filters_should_redirect(special_args):
if "__" in filter_op:
filter_op, filter_value = filter_op.split("__", 1)
if filter_column:
redirect_params.append(
("{}__{}".format(filter_column, filter_op), filter_value)
)
redirect_params.append((f"{filter_column}__{filter_op}", filter_value))
for key in ("_filter_column", "_filter_op", "_filter_value"):
if key in special_args:
redirect_params.append((key, None))
@ -573,17 +566,17 @@ def filters_should_redirect(special_args):
for column_key in column_keys:
number = column_key.split("_")[-1]
column = special_args[column_key]
op = special_args.get("_filter_op_{}".format(number)) or "exact"
value = special_args.get("_filter_value_{}".format(number)) or ""
op = special_args.get(f"_filter_op_{number}") or "exact"
value = special_args.get(f"_filter_value_{number}") or ""
if "__" in op:
op, value = op.split("__", 1)
if column:
redirect_params.append(("{}__{}".format(column, op), value))
redirect_params.append((f"{column}__{op}", value))
redirect_params.extend(
[
("_filter_column_{}".format(number), None),
("_filter_op_{}".format(number), None),
("_filter_value_{}".format(number), None),
(f"_filter_column_{number}", None),
(f"_filter_op_{number}", None),
(f"_filter_value_{number}", None),
]
)
return redirect_params
@ -672,7 +665,7 @@ async def resolve_table_and_format(
# Check if table ends with a known format
formats = list(allowed_formats) + ["csv", "jsono"]
for _format in formats:
if table_and_format.endswith(".{}".format(_format)):
if table_and_format.endswith(f".{_format}"):
table = table_and_format[: -(len(_format) + 1)]
return table, _format
return table_and_format, None
@ -683,20 +676,20 @@ def path_with_format(
):
qs = extra_qs or {}
path = request.path if request else path
if replace_format and path.endswith(".{}".format(replace_format)):
if replace_format and path.endswith(f".{replace_format}"):
path = path[: -(1 + len(replace_format))]
if "." in path:
qs["_format"] = format
else:
path = "{}.{}".format(path, format)
path = f"{path}.{format}"
if qs:
extra = urllib.parse.urlencode(sorted(qs.items()))
if request and request.query_string:
path = "{}?{}&{}".format(path, request.query_string, extra)
path = f"{path}?{request.query_string}&{extra}"
else:
path = "{}?{}".format(path, extra)
path = f"{path}?{extra}"
elif request and request.query_string:
path = "{}?{}".format(path, request.query_string)
path = f"{path}?{request.query_string}"
return path
@ -742,9 +735,7 @@ class LimitedWriter:
async def write(self, bytes):
self.bytes_count += len(bytes)
if self.limit_bytes and (self.bytes_count > self.limit_bytes):
raise WriteLimitExceeded(
"CSV contains more than {} bytes".format(self.limit_bytes)
)
raise WriteLimitExceeded(f"CSV contains more than {self.limit_bytes} bytes")
await self.writer.write(bytes)
@ -763,14 +754,14 @@ class StaticMount(click.ParamType):
def convert(self, value, param, ctx):
if ":" not in value:
self.fail(
'"{}" should be of format mountpoint:directory'.format(value),
f'"{value}" should be of format mountpoint:directory',
param,
ctx,
)
path, dirpath = value.split(":", 1)
dirpath = os.path.abspath(dirpath)
if not os.path.exists(dirpath) or not os.path.isdir(dirpath):
self.fail("%s is not a valid directory path" % value, param, ctx)
self.fail(f"{value} is not a valid directory path", param, ctx)
return path, dirpath
@ -781,9 +772,9 @@ def format_bytes(bytes):
break
current = current / 1024
if unit == "bytes":
return "{} {}".format(int(current), unit)
return f"{int(current)} {unit}"
else:
return "{:.1f} {}".format(current, unit)
return f"{current:.1f} {unit}"
_escape_fts_re = re.compile(r'\s+|(".*?")')
@ -820,7 +811,7 @@ class MultiParams:
self._data = new_data
def __repr__(self):
return "<MultiParams: {}>".format(self._data)
return f"<MultiParams: {self._data}>"
def __contains__(self, key):
return key in self._data
@ -867,7 +858,7 @@ def check_connection(conn):
for table in tables:
try:
conn.execute(
"PRAGMA table_info({});".format(escape_sqlite(table)),
f"PRAGMA table_info({escape_sqlite(table)});",
)
except sqlite3.OperationalError as e:
if e.args[0] == "no such module: VirtualSpatialIndex":

View file

@ -260,7 +260,7 @@ async def asgi_send_file(
):
headers = headers or {}
if filename:
headers["content-disposition"] = 'attachment; filename="{}"'.format(filename)
headers["content-disposition"] = f'attachment; filename="{filename}"'
first = True
headers["content-length"] = str((await aiofiles.os.stat(str(filepath))).st_size)
async with aiofiles.open(str(filepath), mode="rb") as fp:

View file

@ -32,7 +32,7 @@ class TestResponse:
return any(
h
for h in self.httpx_response.headers.get_list("set-cookie")
if h.startswith('{}="";'.format(cookie))
if h.startswith(f'{cookie}="";')
)
@property
@ -125,9 +125,7 @@ class TestClient:
if allow_redirects and response.status in (301, 302):
assert (
redirect_count < self.max_redirects
), "Redirected {} times, max_redirects={}".format(
redirect_count, self.max_redirects
)
), f"Redirected {redirect_count} times, max_redirects={self.max_redirects}"
location = response.headers["Location"]
return await self._request(
location, allow_redirects=True, redirect_count=redirect_count + 1