Apply black to everything

I ran this:

    black datasette tests
This commit is contained in:
Simon Willison 2019-05-03 17:56:52 -04:00
commit 6300d3e269
28 changed files with 2725 additions and 2644 deletions

View file

@ -17,9 +17,7 @@ class TestClient:
def get(self, path, allow_redirects=True):
return self.sanic_test_client.get(
path,
allow_redirects=allow_redirects,
gather_request=False
path, allow_redirects=allow_redirects, gather_request=False
)
@ -79,39 +77,35 @@ def app_client_no_files():
client.ds = ds
yield client
@pytest.fixture(scope="session")
def app_client_with_memory():
yield from make_app_client(memory=True)
@pytest.fixture(scope="session")
def app_client_with_hash():
yield from make_app_client(config={
'hash_urls': True,
}, is_immutable=True)
yield from make_app_client(config={"hash_urls": True}, is_immutable=True)
@pytest.fixture(scope='session')
@pytest.fixture(scope="session")
def app_client_shorter_time_limit():
yield from make_app_client(20)
@pytest.fixture(scope='session')
@pytest.fixture(scope="session")
def app_client_returned_rows_matches_page_size():
yield from make_app_client(max_returned_rows=50)
@pytest.fixture(scope='session')
@pytest.fixture(scope="session")
def app_client_larger_cache_size():
yield from make_app_client(config={
'cache_size_kb': 2500,
})
yield from make_app_client(config={"cache_size_kb": 2500})
@pytest.fixture(scope='session')
@pytest.fixture(scope="session")
def app_client_csv_max_mb_one():
yield from make_app_client(config={
'max_csv_mb': 1,
})
yield from make_app_client(config={"max_csv_mb": 1})
@pytest.fixture(scope="session")
@ -119,7 +113,7 @@ def app_client_with_dot():
yield from make_app_client(filename="fixtures.dot.db")
@pytest.fixture(scope='session')
@pytest.fixture(scope="session")
def app_client_with_cors():
yield from make_app_client(cors=True)
@ -128,7 +122,7 @@ def generate_compound_rows(num):
for a, b, c in itertools.islice(
itertools.product(string.ascii_lowercase, repeat=3), num
):
yield a, b, c, '{}-{}-{}'.format(a, b, c)
yield a, b, c, "{}-{}-{}".format(a, b, c)
def generate_sortable_rows(num):
@ -137,107 +131,81 @@ def generate_sortable_rows(num):
itertools.product(string.ascii_lowercase, repeat=2), num
):
yield {
'pk1': a,
'pk2': b,
'content': '{}-{}'.format(a, b),
'sortable': rand.randint(-100, 100),
'sortable_with_nulls': rand.choice([
None, rand.random(), rand.random()
]),
'sortable_with_nulls_2': rand.choice([
None, rand.random(), rand.random()
]),
'text': rand.choice(['$null', '$blah']),
"pk1": a,
"pk2": b,
"content": "{}-{}".format(a, b),
"sortable": rand.randint(-100, 100),
"sortable_with_nulls": rand.choice([None, rand.random(), rand.random()]),
"sortable_with_nulls_2": rand.choice([None, rand.random(), rand.random()]),
"text": rand.choice(["$null", "$blah"]),
}
METADATA = {
'title': 'Datasette Fixtures',
'description': 'An example SQLite database demonstrating Datasette',
'license': 'Apache License 2.0',
'license_url': 'https://github.com/simonw/datasette/blob/master/LICENSE',
'source': 'tests/fixtures.py',
'source_url': 'https://github.com/simonw/datasette/blob/master/tests/fixtures.py',
'about': 'About Datasette',
'about_url': 'https://github.com/simonw/datasette',
"plugins": {
"name-of-plugin": {
"depth": "root"
}
},
'databases': {
'fixtures': {
'description': 'Test tables description',
"plugins": {
"name-of-plugin": {
"depth": "database"
}
},
'tables': {
'simple_primary_key': {
'description_html': 'Simple <em>primary</em> key',
'title': 'This <em>HTML</em> is escaped',
"title": "Datasette Fixtures",
"description": "An example SQLite database demonstrating Datasette",
"license": "Apache License 2.0",
"license_url": "https://github.com/simonw/datasette/blob/master/LICENSE",
"source": "tests/fixtures.py",
"source_url": "https://github.com/simonw/datasette/blob/master/tests/fixtures.py",
"about": "About Datasette",
"about_url": "https://github.com/simonw/datasette",
"plugins": {"name-of-plugin": {"depth": "root"}},
"databases": {
"fixtures": {
"description": "Test tables description",
"plugins": {"name-of-plugin": {"depth": "database"}},
"tables": {
"simple_primary_key": {
"description_html": "Simple <em>primary</em> key",
"title": "This <em>HTML</em> is escaped",
"plugins": {
"name-of-plugin": {
"depth": "table",
"special": "this-is-simple_primary_key"
"special": "this-is-simple_primary_key",
}
}
},
},
'sortable': {
'sortable_columns': [
'sortable',
'sortable_with_nulls',
'sortable_with_nulls_2',
'text',
"sortable": {
"sortable_columns": [
"sortable",
"sortable_with_nulls",
"sortable_with_nulls_2",
"text",
],
"plugins": {
"name-of-plugin": {
"depth": "table"
}
}
"plugins": {"name-of-plugin": {"depth": "table"}},
},
'no_primary_key': {
'sortable_columns': [],
'hidden': True,
"no_primary_key": {"sortable_columns": [], "hidden": True},
"units": {"units": {"distance": "m", "frequency": "Hz"}},
"primary_key_multiple_columns_explicit_label": {
"label_column": "content2"
},
'units': {
'units': {
'distance': 'm',
'frequency': 'Hz'
}
"simple_view": {"sortable_columns": ["content"]},
"searchable_view_configured_by_metadata": {
"fts_table": "searchable_fts",
"fts_pk": "pk",
},
'primary_key_multiple_columns_explicit_label': {
'label_column': 'content2',
},
'simple_view': {
'sortable_columns': ['content'],
},
'searchable_view_configured_by_metadata': {
'fts_table': 'searchable_fts',
'fts_pk': 'pk'
}
},
'queries': {
'pragma_cache_size': 'PRAGMA cache_size;',
'neighborhood_search': {
'sql': '''
"queries": {
"pragma_cache_size": "PRAGMA cache_size;",
"neighborhood_search": {
"sql": """
select neighborhood, facet_cities.name, state
from facetable
join facet_cities
on facetable.city_id = facet_cities.id
where neighborhood like '%' || :text || '%'
order by neighborhood;
''',
'title': 'Search neighborhoods',
'description_html': '<b>Demonstrating</b> simple like search',
""",
"title": "Search neighborhoods",
"description_html": "<b>Demonstrating</b> simple like search",
},
}
},
}
},
}
},
}
PLUGIN1 = '''
PLUGIN1 = """
from datasette import hookimpl
import base64
import pint
@ -304,9 +272,9 @@ def render_cell(value, column, table, database, datasette):
table=table,
)
})
'''
"""
PLUGIN2 = '''
PLUGIN2 = """
from datasette import hookimpl
import jinja2
import json
@ -349,9 +317,10 @@ def render_cell(value, database):
label=jinja2.escape(data["label"] or "") or "&nbsp;"
)
)
'''
"""
TABLES = '''
TABLES = (
"""
CREATE TABLE simple_primary_key (
id varchar(30) primary key,
content text
@ -581,26 +550,42 @@ CREATE VIEW searchable_view AS
CREATE VIEW searchable_view_configured_by_metadata AS
SELECT * from searchable;
''' + '\n'.join([
'INSERT INTO no_primary_key VALUES ({i}, "a{i}", "b{i}", "c{i}");'.format(i=i + 1)
for i in range(201)
]) + '\n'.join([
'INSERT INTO compound_three_primary_keys VALUES ("{a}", "{b}", "{c}", "{content}");'.format(
a=a, b=b, c=c, content=content
) for a, b, c, content in generate_compound_rows(1001)
]) + '\n'.join([
'''INSERT INTO sortable VALUES (
"""
+ "\n".join(
[
'INSERT INTO no_primary_key VALUES ({i}, "a{i}", "b{i}", "c{i}");'.format(
i=i + 1
)
for i in range(201)
]
)
+ "\n".join(
[
'INSERT INTO compound_three_primary_keys VALUES ("{a}", "{b}", "{c}", "{content}");'.format(
a=a, b=b, c=c, content=content
)
for a, b, c, content in generate_compound_rows(1001)
]
)
+ "\n".join(
[
"""INSERT INTO sortable VALUES (
"{pk1}", "{pk2}", "{content}", {sortable},
{sortable_with_nulls}, {sortable_with_nulls_2}, "{text}");
'''.format(
**row
).replace('None', 'null') for row in generate_sortable_rows(201)
])
TABLE_PARAMETERIZED_SQL = [(
"insert into binary_data (data) values (?);", [b'this is binary data']
)]
""".format(
**row
).replace(
"None", "null"
)
for row in generate_sortable_rows(201)
]
)
)
TABLE_PARAMETERIZED_SQL = [
("insert into binary_data (data) values (?);", [b"this is binary data"])
]
if __name__ == '__main__':
if __name__ == "__main__":
# Can be called with data.db OR data.db metadata.json
db_filename = sys.argv[-1]
metadata_filename = None
@ -615,9 +600,7 @@ if __name__ == '__main__':
conn.execute(sql, params)
print("Test tables written to {}".format(db_filename))
if metadata_filename:
open(metadata_filename, 'w').write(json.dumps(METADATA))
open(metadata_filename, "w").write(json.dumps(METADATA))
print("- metadata written to {}".format(metadata_filename))
else:
print("Usage: {} db_to_write.db [metadata_to_write.json]".format(
sys.argv[0]
))
print("Usage: {} db_to_write.db [metadata_to_write.json]".format(sys.argv[0]))

File diff suppressed because it is too large Load diff

View file

@ -1,22 +1,26 @@
from .fixtures import ( # noqa
from .fixtures import ( # noqa
app_client,
app_client_csv_max_mb_one,
app_client_with_cors
app_client_with_cors,
)
EXPECTED_TABLE_CSV = '''id,content
EXPECTED_TABLE_CSV = """id,content
1,hello
2,world
3,
4,RENDER_CELL_DEMO
'''.replace('\n', '\r\n')
""".replace(
"\n", "\r\n"
)
EXPECTED_CUSTOM_CSV = '''content
EXPECTED_CUSTOM_CSV = """content
hello
world
'''.replace('\n', '\r\n')
""".replace(
"\n", "\r\n"
)
EXPECTED_TABLE_WITH_LABELS_CSV = '''
EXPECTED_TABLE_WITH_LABELS_CSV = """
pk,planet_int,on_earth,state,city_id,city_id_label,neighborhood,tags
1,1,1,CA,1,San Francisco,Mission,"[""tag1"", ""tag2""]"
2,1,1,CA,1,San Francisco,Dogpatch,"[""tag1"", ""tag3""]"
@ -33,45 +37,47 @@ pk,planet_int,on_earth,state,city_id,city_id_label,neighborhood,tags
13,1,1,MI,3,Detroit,Corktown,[]
14,1,1,MI,3,Detroit,Mexicantown,[]
15,2,0,MC,4,Memnonia,Arcadia Planitia,[]
'''.lstrip().replace('\n', '\r\n')
""".lstrip().replace(
"\n", "\r\n"
)
def test_table_csv(app_client):
response = app_client.get('/fixtures/simple_primary_key.csv')
response = app_client.get("/fixtures/simple_primary_key.csv")
assert response.status == 200
assert not response.headers.get("Access-Control-Allow-Origin")
assert 'text/plain; charset=utf-8' == response.headers['Content-Type']
assert "text/plain; charset=utf-8" == response.headers["Content-Type"]
assert EXPECTED_TABLE_CSV == response.text
def test_table_csv_cors_headers(app_client_with_cors):
response = app_client_with_cors.get('/fixtures/simple_primary_key.csv')
response = app_client_with_cors.get("/fixtures/simple_primary_key.csv")
assert response.status == 200
assert "*" == response.headers["Access-Control-Allow-Origin"]
def test_table_csv_with_labels(app_client):
response = app_client.get('/fixtures/facetable.csv?_labels=1')
response = app_client.get("/fixtures/facetable.csv?_labels=1")
assert response.status == 200
assert 'text/plain; charset=utf-8' == response.headers['Content-Type']
assert "text/plain; charset=utf-8" == response.headers["Content-Type"]
assert EXPECTED_TABLE_WITH_LABELS_CSV == response.text
def test_custom_sql_csv(app_client):
response = app_client.get(
'/fixtures.csv?sql=select+content+from+simple_primary_key+limit+2'
"/fixtures.csv?sql=select+content+from+simple_primary_key+limit+2"
)
assert response.status == 200
assert 'text/plain; charset=utf-8' == response.headers['Content-Type']
assert "text/plain; charset=utf-8" == response.headers["Content-Type"]
assert EXPECTED_CUSTOM_CSV == response.text
def test_table_csv_download(app_client):
response = app_client.get('/fixtures/simple_primary_key.csv?_dl=1')
response = app_client.get("/fixtures/simple_primary_key.csv?_dl=1")
assert response.status == 200
assert 'text/csv; charset=utf-8' == response.headers['Content-Type']
assert "text/csv; charset=utf-8" == response.headers["Content-Type"]
expected_disposition = 'attachment; filename="simple_primary_key.csv"'
assert expected_disposition == response.headers['Content-Disposition']
assert expected_disposition == response.headers["Content-Disposition"]
def test_max_csv_mb(app_client_csv_max_mb_one):
@ -88,12 +94,8 @@ def test_max_csv_mb(app_client_csv_max_mb_one):
def test_table_csv_stream(app_client):
# Without _stream should return header + 100 rows:
response = app_client.get(
"/fixtures/compound_three_primary_keys.csv?_size=max"
)
response = app_client.get("/fixtures/compound_three_primary_keys.csv?_size=max")
assert 101 == len([b for b in response.body.split(b"\r\n") if b])
# With _stream=1 should return header + 1001 rows
response = app_client.get(
"/fixtures/compound_three_primary_keys.csv?_stream=1"
)
response = app_client.get("/fixtures/compound_three_primary_keys.csv?_stream=1")
assert 1002 == len([b for b in response.body.split(b"\r\n") if b])

View file

@ -9,13 +9,13 @@ from pathlib import Path
import pytest
import re
docs_path = Path(__file__).parent.parent / 'docs'
label_re = re.compile(r'\.\. _([^\s:]+):')
docs_path = Path(__file__).parent.parent / "docs"
label_re = re.compile(r"\.\. _([^\s:]+):")
def get_headings(filename, underline="-"):
content = (docs_path / filename).open().read()
heading_re = re.compile(r'(\w+)(\([^)]*\))?\n\{}+\n'.format(underline))
heading_re = re.compile(r"(\w+)(\([^)]*\))?\n\{}+\n".format(underline))
return set(h[0] for h in heading_re.findall(content))
@ -24,38 +24,37 @@ def get_labels(filename):
return set(label_re.findall(content))
@pytest.mark.parametrize('config', app.CONFIG_OPTIONS)
@pytest.mark.parametrize("config", app.CONFIG_OPTIONS)
def test_config_options_are_documented(config):
assert config.name in get_headings("config.rst")
@pytest.mark.parametrize("name,filename", (
("serve", "datasette-serve-help.txt"),
("package", "datasette-package-help.txt"),
("publish now", "datasette-publish-now-help.txt"),
("publish heroku", "datasette-publish-heroku-help.txt"),
("publish cloudrun", "datasette-publish-cloudrun-help.txt"),
))
@pytest.mark.parametrize(
"name,filename",
(
("serve", "datasette-serve-help.txt"),
("package", "datasette-package-help.txt"),
("publish now", "datasette-publish-now-help.txt"),
("publish heroku", "datasette-publish-heroku-help.txt"),
("publish cloudrun", "datasette-publish-cloudrun-help.txt"),
),
)
def test_help_includes(name, filename):
expected = open(str(docs_path / filename)).read()
runner = CliRunner()
result = runner.invoke(cli, name.split() + ["--help"], terminal_width=88)
actual = "$ datasette {} --help\n\n{}".format(
name, result.output
)
actual = "$ datasette {} --help\n\n{}".format(name, result.output)
# actual has "Usage: cli package [OPTIONS] FILES"
# because it doesn't know that cli will be aliased to datasette
expected = expected.replace("Usage: datasette", "Usage: cli")
assert expected == actual
@pytest.mark.parametrize('plugin', [
name for name in dir(app.pm.hook) if not name.startswith('_')
])
@pytest.mark.parametrize(
"plugin", [name for name in dir(app.pm.hook) if not name.startswith("_")]
)
def test_plugin_hooks_are_documented(plugin):
headings = [
s.split("(")[0] for s in get_headings("plugins.rst", "~")
]
headings = [s.split("(")[0] for s in get_headings("plugins.rst", "~")]
assert plugin in headings

View file

@ -2,102 +2,57 @@ from datasette.filters import Filters
import pytest
@pytest.mark.parametrize('args,expected_where,expected_params', [
(
@pytest.mark.parametrize(
"args,expected_where,expected_params",
[
((("name_english__contains", "foo"),), ['"name_english" like :p0'], ["%foo%"]),
(
('name_english__contains', 'foo'),
(("foo", "bar"), ("bar__contains", "baz")),
['"bar" like :p0', '"foo" = :p1'],
["%baz%", "bar"],
),
['"name_english" like :p0'],
['%foo%']
),
(
(
('foo', 'bar'),
('bar__contains', 'baz'),
(("foo__startswith", "bar"), ("bar__endswith", "baz")),
['"bar" like :p0', '"foo" like :p1'],
["%baz", "bar%"],
),
['"bar" like :p0', '"foo" = :p1'],
['%baz%', 'bar']
),
(
(
('foo__startswith', 'bar'),
('bar__endswith', 'baz'),
(("foo__lt", "1"), ("bar__gt", "2"), ("baz__gte", "3"), ("bax__lte", "4")),
['"bar" > :p0', '"bax" <= :p1', '"baz" >= :p2', '"foo" < :p3'],
[2, 4, 3, 1],
),
['"bar" like :p0', '"foo" like :p1'],
['%baz', 'bar%']
),
(
(
('foo__lt', '1'),
('bar__gt', '2'),
('baz__gte', '3'),
('bax__lte', '4'),
(("foo__like", "2%2"), ("zax__glob", "3*")),
['"foo" like :p0', '"zax" glob :p1'],
["2%2", "3*"],
),
['"bar" > :p0', '"bax" <= :p1', '"baz" >= :p2', '"foo" < :p3'],
[2, 4, 3, 1]
),
(
# Multiple like arguments:
(
('foo__like', '2%2'),
('zax__glob', '3*'),
(("foo__like", "2%2"), ("foo__like", "3%3")),
['"foo" like :p0', '"foo" like :p1'],
["2%2", "3%3"],
),
['"foo" like :p0', '"zax" glob :p1'],
['2%2', '3*']
),
# Multiple like arguments:
(
(
('foo__like', '2%2'),
('foo__like', '3%3'),
(("foo__isnull", "1"), ("baz__isnull", "1"), ("bar__gt", "10")),
['"bar" > :p0', '"baz" is null', '"foo" is null'],
[10],
),
['"foo" like :p0', '"foo" like :p1'],
['2%2', '3%3']
),
(
((("foo__in", "1,2,3"),), ["foo in (:p0, :p1, :p2)"], ["1", "2", "3"]),
# date
((("foo__date", "1988-01-01"),), ["date(foo) = :p0"], ["1988-01-01"]),
# JSON array variants of __in (useful for unexpected characters)
((("foo__in", "[1,2,3]"),), ["foo in (:p0, :p1, :p2)"], [1, 2, 3]),
(
('foo__isnull', '1'),
('baz__isnull', '1'),
('bar__gt', '10'),
(("foo__in", '["dog,cat", "cat[dog]"]'),),
["foo in (:p0, :p1)"],
["dog,cat", "cat[dog]"],
),
['"bar" > :p0', '"baz" is null', '"foo" is null'],
[10]
),
(
(
('foo__in', '1,2,3'),
),
['foo in (:p0, :p1, :p2)'],
["1", "2", "3"]
),
# date
(
(
("foo__date", "1988-01-01"),
),
["date(foo) = :p0"],
["1988-01-01"]
),
# JSON array variants of __in (useful for unexpected characters)
(
(
('foo__in', '[1,2,3]'),
),
['foo in (:p0, :p1, :p2)'],
[1, 2, 3]
),
(
(
('foo__in', '["dog,cat", "cat[dog]"]'),
),
['foo in (:p0, :p1)'],
["dog,cat", "cat[dog]"]
),
])
],
)
def test_build_where(args, expected_where, expected_params):
f = Filters(sorted(args))
sql_bits, actual_params = f.build_where_clauses("table")
assert expected_where == sql_bits
assert {
'p{}'.format(i): param
for i, param in enumerate(expected_params)
"p{}".format(i): param for i, param in enumerate(expected_params)
} == actual_params

File diff suppressed because it is too large Load diff

View file

@ -5,7 +5,7 @@ import pytest
import tempfile
TABLES = '''
TABLES = """
CREATE TABLE "election_results" (
"county" INTEGER,
"party" INTEGER,
@ -32,13 +32,13 @@ CREATE TABLE "office" (
"id" INTEGER PRIMARY KEY ,
"name" TEXT
);
'''
"""
@pytest.fixture(scope='session')
@pytest.fixture(scope="session")
def ds_instance():
with tempfile.TemporaryDirectory() as tmpdir:
filepath = os.path.join(tmpdir, 'fixtures.db')
filepath = os.path.join(tmpdir, "fixtures.db")
conn = sqlite3.connect(filepath)
conn.executescript(TABLES)
yield Datasette([filepath])
@ -46,58 +46,47 @@ def ds_instance():
def test_inspect_hidden_tables(ds_instance):
info = ds_instance.inspect()
tables = info['fixtures']['tables']
tables = info["fixtures"]["tables"]
expected_hidden = (
'election_results_fts',
'election_results_fts_content',
'election_results_fts_docsize',
'election_results_fts_segdir',
'election_results_fts_segments',
'election_results_fts_stat',
)
expected_visible = (
'election_results',
'county',
'party',
'office',
"election_results_fts",
"election_results_fts_content",
"election_results_fts_docsize",
"election_results_fts_segdir",
"election_results_fts_segments",
"election_results_fts_stat",
)
expected_visible = ("election_results", "county", "party", "office")
assert sorted(expected_hidden) == sorted(
[table for table in tables if tables[table]['hidden']]
[table for table in tables if tables[table]["hidden"]]
)
assert sorted(expected_visible) == sorted(
[table for table in tables if not tables[table]['hidden']]
[table for table in tables if not tables[table]["hidden"]]
)
def test_inspect_foreign_keys(ds_instance):
info = ds_instance.inspect()
tables = info['fixtures']['tables']
for table_name in ('county', 'party', 'office'):
assert 0 == tables[table_name]['count']
foreign_keys = tables[table_name]['foreign_keys']
assert [] == foreign_keys['outgoing']
assert [{
'column': 'id',
'other_column': table_name,
'other_table': 'election_results'
}] == foreign_keys['incoming']
tables = info["fixtures"]["tables"]
for table_name in ("county", "party", "office"):
assert 0 == tables[table_name]["count"]
foreign_keys = tables[table_name]["foreign_keys"]
assert [] == foreign_keys["outgoing"]
assert [
{
"column": "id",
"other_column": table_name,
"other_table": "election_results",
}
] == foreign_keys["incoming"]
election_results = tables['election_results']
assert 0 == election_results['count']
assert sorted([{
'column': 'county',
'other_column': 'id',
'other_table': 'county'
}, {
'column': 'party',
'other_column': 'id',
'other_table': 'party'
}, {
'column': 'office',
'other_column': 'id',
'other_table': 'office'
}], key=lambda d: d['column']) == sorted(
election_results['foreign_keys']['outgoing'],
key=lambda d: d['column']
)
assert [] == election_results['foreign_keys']['incoming']
election_results = tables["election_results"]
assert 0 == election_results["count"]
assert sorted(
[
{"column": "county", "other_column": "id", "other_table": "county"},
{"column": "party", "other_column": "id", "other_table": "party"},
{"column": "office", "other_column": "id", "other_table": "office"},
],
key=lambda d: d["column"],
) == sorted(election_results["foreign_keys"]["outgoing"], key=lambda d: d["column"])
assert [] == election_results["foreign_keys"]["incoming"]

View file

@ -1,7 +1,5 @@
from bs4 import BeautifulSoup as Soup
from .fixtures import ( # noqa
app_client,
)
from .fixtures import app_client # noqa
import base64
import json
import re
@ -13,41 +11,26 @@ def test_plugins_dir_plugin(app_client):
response = app_client.get(
"/fixtures.json?sql=select+convert_units(100%2C+'m'%2C+'ft')"
)
assert pytest.approx(328.0839) == response.json['rows'][0][0]
assert pytest.approx(328.0839) == response.json["rows"][0][0]
@pytest.mark.parametrize(
"path,expected_decoded_object",
[
(
"/",
{
"template": "index.html",
"database": None,
"table": None,
},
),
("/", {"template": "index.html", "database": None, "table": None}),
(
"/fixtures/",
{
"template": "database.html",
"database": "fixtures",
"table": None,
},
{"template": "database.html", "database": "fixtures", "table": None},
),
(
"/fixtures/sortable",
{
"template": "table.html",
"database": "fixtures",
"table": "sortable",
},
{"template": "table.html", "database": "fixtures", "table": "sortable"},
),
],
)
def test_plugin_extra_css_urls(app_client, path, expected_decoded_object):
response = app_client.get(path)
links = Soup(response.body, 'html.parser').findAll('link')
links = Soup(response.body, "html.parser").findAll("link")
special_href = [
l for l in links if l.attrs["href"].endswith("/extra-css-urls-demo.css")
][0]["href"]
@ -59,47 +42,43 @@ def test_plugin_extra_css_urls(app_client, path, expected_decoded_object):
def test_plugin_extra_js_urls(app_client):
response = app_client.get('/')
scripts = Soup(response.body, 'html.parser').findAll('script')
response = app_client.get("/")
scripts = Soup(response.body, "html.parser").findAll("script")
assert [
s for s in scripts
if s.attrs == {
'integrity': 'SRIHASH',
'crossorigin': 'anonymous',
'src': 'https://example.com/jquery.js'
s
for s in scripts
if s.attrs
== {
"integrity": "SRIHASH",
"crossorigin": "anonymous",
"src": "https://example.com/jquery.js",
}
]
def test_plugins_with_duplicate_js_urls(app_client):
# If two plugins both require jQuery, jQuery should be loaded only once
response = app_client.get(
"/fixtures"
)
response = app_client.get("/fixtures")
# This test is a little tricky, as if the user has any other plugins in
# their current virtual environment those may affect what comes back too.
# What matters is that https://example.com/jquery.js is only there once
# and it comes before plugin1.js and plugin2.js which could be in either
# order
scripts = Soup(response.body, 'html.parser').findAll('script')
srcs = [s['src'] for s in scripts if s.get('src')]
scripts = Soup(response.body, "html.parser").findAll("script")
srcs = [s["src"] for s in scripts if s.get("src")]
# No duplicates allowed:
assert len(srcs) == len(set(srcs))
# jquery.js loaded once:
assert 1 == srcs.count('https://example.com/jquery.js')
assert 1 == srcs.count("https://example.com/jquery.js")
# plugin1.js and plugin2.js are both there:
assert 1 == srcs.count('https://example.com/plugin1.js')
assert 1 == srcs.count('https://example.com/plugin2.js')
assert 1 == srcs.count("https://example.com/plugin1.js")
assert 1 == srcs.count("https://example.com/plugin2.js")
# jquery comes before them both
assert srcs.index(
'https://example.com/jquery.js'
) < srcs.index(
'https://example.com/plugin1.js'
assert srcs.index("https://example.com/jquery.js") < srcs.index(
"https://example.com/plugin1.js"
)
assert srcs.index(
'https://example.com/jquery.js'
) < srcs.index(
'https://example.com/plugin2.js'
assert srcs.index("https://example.com/jquery.js") < srcs.index(
"https://example.com/plugin2.js"
)
@ -107,13 +86,9 @@ def test_plugins_render_cell_link_from_json(app_client):
sql = """
select '{"href": "http://example.com/", "label":"Example"}'
""".strip()
path = "/fixtures?" + urllib.parse.urlencode({
"sql": sql,
})
path = "/fixtures?" + urllib.parse.urlencode({"sql": sql})
response = app_client.get(path)
td = Soup(
response.body, "html.parser"
).find("table").find("tbody").find("td")
td = Soup(response.body, "html.parser").find("table").find("tbody").find("td")
a = td.find("a")
assert a is not None, str(a)
assert a.attrs["href"] == "http://example.com/"
@ -129,10 +104,7 @@ def test_plugins_render_cell_demo(app_client):
"column": "content",
"table": "simple_primary_key",
"database": "fixtures",
"config": {
"depth": "table",
"special": "this-is-simple_primary_key"
}
"config": {"depth": "table", "special": "this-is-simple_primary_key"},
} == json.loads(td.string)

View file

@ -35,7 +35,14 @@ def test_publish_cloudrun(mock_call, mock_output, mock_which):
result = runner.invoke(cli.cli, ["publish", "cloudrun", "test.db"])
assert 0 == result.exit_code
tag = "gcr.io/{}/datasette".format(mock_output.return_value)
mock_call.assert_has_calls([
mock.call("gcloud builds submit --tag {}".format(tag), shell=True),
mock.call("gcloud beta run deploy --allow-unauthenticated --image {}".format(tag), shell=True)])
mock_call.assert_has_calls(
[
mock.call("gcloud builds submit --tag {}".format(tag), shell=True),
mock.call(
"gcloud beta run deploy --allow-unauthenticated --image {}".format(
tag
),
shell=True,
),
]
)

View file

@ -57,7 +57,9 @@ def test_publish_heroku(mock_call, mock_check_output, mock_which):
open("test.db", "w").write("data")
result = runner.invoke(cli.cli, ["publish", "heroku", "test.db"])
assert 0 == result.exit_code, result.output
mock_call.assert_called_once_with(["heroku", "builds:create", "-a", "f", "--include-vcs-ignore"])
mock_call.assert_called_once_with(
["heroku", "builds:create", "-a", "f", "--include-vcs-ignore"]
)
@mock.patch("shutil.which")

View file

@ -13,72 +13,78 @@ import tempfile
from unittest.mock import patch
@pytest.mark.parametrize('path,expected', [
('foo', ['foo']),
('foo,bar', ['foo', 'bar']),
('123,433,112', ['123', '433', '112']),
('123%2C433,112', ['123,433', '112']),
('123%2F433%2F112', ['123/433/112']),
])
@pytest.mark.parametrize(
"path,expected",
[
("foo", ["foo"]),
("foo,bar", ["foo", "bar"]),
("123,433,112", ["123", "433", "112"]),
("123%2C433,112", ["123,433", "112"]),
("123%2F433%2F112", ["123/433/112"]),
],
)
def test_urlsafe_components(path, expected):
assert expected == utils.urlsafe_components(path)
@pytest.mark.parametrize('path,added_args,expected', [
('/foo', {'bar': 1}, '/foo?bar=1'),
('/foo?bar=1', {'baz': 2}, '/foo?bar=1&baz=2'),
('/foo?bar=1&bar=2', {'baz': 3}, '/foo?bar=1&bar=2&baz=3'),
('/foo?bar=1', {'bar': None}, '/foo'),
# Test order is preserved
('/?_facet=prim_state&_facet=area_name', (
('prim_state', 'GA'),
), '/?_facet=prim_state&_facet=area_name&prim_state=GA'),
('/?_facet=state&_facet=city&state=MI', (
('city', 'Detroit'),
), '/?_facet=state&_facet=city&state=MI&city=Detroit'),
('/?_facet=state&_facet=city', (
('_facet', 'planet_int'),
), '/?_facet=state&_facet=city&_facet=planet_int'),
])
@pytest.mark.parametrize(
"path,added_args,expected",
[
("/foo", {"bar": 1}, "/foo?bar=1"),
("/foo?bar=1", {"baz": 2}, "/foo?bar=1&baz=2"),
("/foo?bar=1&bar=2", {"baz": 3}, "/foo?bar=1&bar=2&baz=3"),
("/foo?bar=1", {"bar": None}, "/foo"),
# Test order is preserved
(
"/?_facet=prim_state&_facet=area_name",
(("prim_state", "GA"),),
"/?_facet=prim_state&_facet=area_name&prim_state=GA",
),
(
"/?_facet=state&_facet=city&state=MI",
(("city", "Detroit"),),
"/?_facet=state&_facet=city&state=MI&city=Detroit",
),
(
"/?_facet=state&_facet=city",
(("_facet", "planet_int"),),
"/?_facet=state&_facet=city&_facet=planet_int",
),
],
)
def test_path_with_added_args(path, added_args, expected):
request = Request(
path.encode('utf8'),
{}, '1.1', 'GET', None
)
request = Request(path.encode("utf8"), {}, "1.1", "GET", None)
actual = utils.path_with_added_args(request, added_args)
assert expected == actual
@pytest.mark.parametrize('path,args,expected', [
('/foo?bar=1', {'bar'}, '/foo'),
('/foo?bar=1&baz=2', {'bar'}, '/foo?baz=2'),
('/foo?bar=1&bar=2&bar=3', {'bar': '2'}, '/foo?bar=1&bar=3'),
])
@pytest.mark.parametrize(
"path,args,expected",
[
("/foo?bar=1", {"bar"}, "/foo"),
("/foo?bar=1&baz=2", {"bar"}, "/foo?baz=2"),
("/foo?bar=1&bar=2&bar=3", {"bar": "2"}, "/foo?bar=1&bar=3"),
],
)
def test_path_with_removed_args(path, args, expected):
request = Request(
path.encode('utf8'),
{}, '1.1', 'GET', None
)
request = Request(path.encode("utf8"), {}, "1.1", "GET", None)
actual = utils.path_with_removed_args(request, args)
assert expected == actual
# Run the test again but this time use the path= argument
request = Request(
"/".encode('utf8'),
{}, '1.1', 'GET', None
)
request = Request("/".encode("utf8"), {}, "1.1", "GET", None)
actual = utils.path_with_removed_args(request, args, path=path)
assert expected == actual
@pytest.mark.parametrize('path,args,expected', [
('/foo?bar=1', {'bar': 2}, '/foo?bar=2'),
('/foo?bar=1&baz=2', {'bar': None}, '/foo?baz=2'),
])
@pytest.mark.parametrize(
"path,args,expected",
[
("/foo?bar=1", {"bar": 2}, "/foo?bar=2"),
("/foo?bar=1&baz=2", {"bar": None}, "/foo?baz=2"),
],
)
def test_path_with_replaced_args(path, args, expected):
request = Request(
path.encode('utf8'),
{}, '1.1', 'GET', None
)
request = Request(path.encode("utf8"), {}, "1.1", "GET", None)
actual = utils.path_with_replaced_args(request, args)
assert expected == actual
@ -93,17 +99,8 @@ def test_path_with_replaced_args(path, args, expected):
utils.CustomRow(
["searchable_id", "tag"],
[
(
"searchable_id",
{"value": 1, "label": "1"},
),
(
"tag",
{
"value": "feline",
"label": "feline",
},
),
("searchable_id", {"value": 1, "label": "1"}),
("tag", {"value": "feline", "label": "feline"}),
],
),
["searchable_id", "tag"],
@ -116,47 +113,54 @@ def test_path_from_row_pks(row, pks, expected_path):
assert expected_path == actual_path
@pytest.mark.parametrize('obj,expected', [
({
'Description': 'Soft drinks',
'Picture': b"\x15\x1c\x02\xc7\xad\x05\xfe",
'CategoryID': 1,
}, """
@pytest.mark.parametrize(
"obj,expected",
[
(
{
"Description": "Soft drinks",
"Picture": b"\x15\x1c\x02\xc7\xad\x05\xfe",
"CategoryID": 1,
},
"""
{"CategoryID": 1, "Description": "Soft drinks", "Picture": {"$base64": true, "encoded": "FRwCx60F/g=="}}
""".strip()),
])
""".strip(),
)
],
)
def test_custom_json_encoder(obj, expected):
actual = json.dumps(
obj,
cls=utils.CustomJSONEncoder,
sort_keys=True
)
actual = json.dumps(obj, cls=utils.CustomJSONEncoder, sort_keys=True)
assert expected == actual
@pytest.mark.parametrize('bad_sql', [
'update blah;',
'PRAGMA case_sensitive_like = true'
"SELECT * FROM pragma_index_info('idx52')",
])
@pytest.mark.parametrize(
"bad_sql",
[
"update blah;",
"PRAGMA case_sensitive_like = true" "SELECT * FROM pragma_index_info('idx52')",
],
)
def test_validate_sql_select_bad(bad_sql):
with pytest.raises(utils.InvalidSql):
utils.validate_sql_select(bad_sql)
@pytest.mark.parametrize('good_sql', [
'select count(*) from airports',
'select foo from bar',
'select 1 + 1',
'SELECT\nblah FROM foo',
'WITH RECURSIVE cnt(x) AS (SELECT 1 UNION ALL SELECT x+1 FROM cnt LIMIT 10) SELECT x FROM cnt;'
])
@pytest.mark.parametrize(
"good_sql",
[
"select count(*) from airports",
"select foo from bar",
"select 1 + 1",
"SELECT\nblah FROM foo",
"WITH RECURSIVE cnt(x) AS (SELECT 1 UNION ALL SELECT x+1 FROM cnt LIMIT 10) SELECT x FROM cnt;",
],
)
def test_validate_sql_select_good(good_sql):
utils.validate_sql_select(good_sql)
def test_detect_fts():
sql = '''
sql = """
CREATE TABLE "Dumb_Table" (
"TreeID" INTEGER,
"qSpecies" TEXT
@ -173,34 +177,40 @@ def test_detect_fts():
CREATE VIEW Test_View AS SELECT * FROM Dumb_Table;
CREATE VIRTUAL TABLE "Street_Tree_List_fts" USING FTS4 ("qAddress", "qCaretaker", "qSpecies", content="Street_Tree_List");
CREATE VIRTUAL TABLE r USING rtree(a, b, c);
'''
conn = utils.sqlite3.connect(':memory:')
"""
conn = utils.sqlite3.connect(":memory:")
conn.executescript(sql)
assert None is utils.detect_fts(conn, 'Dumb_Table')
assert None is utils.detect_fts(conn, 'Test_View')
assert None is utils.detect_fts(conn, 'r')
assert 'Street_Tree_List_fts' == utils.detect_fts(conn, 'Street_Tree_List')
assert None is utils.detect_fts(conn, "Dumb_Table")
assert None is utils.detect_fts(conn, "Test_View")
assert None is utils.detect_fts(conn, "r")
assert "Street_Tree_List_fts" == utils.detect_fts(conn, "Street_Tree_List")
@pytest.mark.parametrize('url,expected', [
('http://www.google.com/', True),
('https://example.com/', True),
('www.google.com', False),
('http://www.google.com/ is a search engine', False),
])
@pytest.mark.parametrize(
"url,expected",
[
("http://www.google.com/", True),
("https://example.com/", True),
("www.google.com", False),
("http://www.google.com/ is a search engine", False),
],
)
def test_is_url(url, expected):
assert expected == utils.is_url(url)
@pytest.mark.parametrize('s,expected', [
('simple', 'simple'),
('MixedCase', 'MixedCase'),
('-no-leading-hyphens', 'no-leading-hyphens-65bea6'),
('_no-leading-underscores', 'no-leading-underscores-b921bc'),
('no spaces', 'no-spaces-7088d7'),
('-', '336d5e'),
('no $ characters', 'no--characters-59e024'),
])
@pytest.mark.parametrize(
"s,expected",
[
("simple", "simple"),
("MixedCase", "MixedCase"),
("-no-leading-hyphens", "no-leading-hyphens-65bea6"),
("_no-leading-underscores", "no-leading-underscores-b921bc"),
("no spaces", "no-spaces-7088d7"),
("-", "336d5e"),
("no $ characters", "no--characters-59e024"),
],
)
def test_to_css_class(s, expected):
assert expected == utils.to_css_class(s)
@ -208,11 +218,11 @@ def test_to_css_class(s, expected):
def test_temporary_docker_directory_uses_hard_link():
with tempfile.TemporaryDirectory() as td:
os.chdir(td)
open('hello', 'w').write('world')
open("hello", "w").write("world")
# Default usage of this should use symlink
with utils.temporary_docker_directory(
files=['hello'],
name='t',
files=["hello"],
name="t",
metadata=None,
extra_options=None,
branch=None,
@ -223,23 +233,23 @@ def test_temporary_docker_directory_uses_hard_link():
spatialite=False,
version_note=None,
) as temp_docker:
hello = os.path.join(temp_docker, 'hello')
assert 'world' == open(hello).read()
hello = os.path.join(temp_docker, "hello")
assert "world" == open(hello).read()
# It should be a hard link
assert 2 == os.stat(hello).st_nlink
@patch('os.link')
@patch("os.link")
def test_temporary_docker_directory_uses_copy_if_hard_link_fails(mock_link):
# Copy instead if os.link raises OSError (normally due to different device)
mock_link.side_effect = OSError
with tempfile.TemporaryDirectory() as td:
os.chdir(td)
open('hello', 'w').write('world')
open("hello", "w").write("world")
# Default usage of this should use symlink
with utils.temporary_docker_directory(
files=['hello'],
name='t',
files=["hello"],
name="t",
metadata=None,
extra_options=None,
branch=None,
@ -250,49 +260,53 @@ def test_temporary_docker_directory_uses_copy_if_hard_link_fails(mock_link):
spatialite=False,
version_note=None,
) as temp_docker:
hello = os.path.join(temp_docker, 'hello')
assert 'world' == open(hello).read()
hello = os.path.join(temp_docker, "hello")
assert "world" == open(hello).read()
# It should be a copy, not a hard link
assert 1 == os.stat(hello).st_nlink
def test_temporary_docker_directory_quotes_args():
with tempfile.TemporaryDirectory() as td:
with tempfile.TemporaryDirectory() as td:
os.chdir(td)
open('hello', 'w').write('world')
open("hello", "w").write("world")
with utils.temporary_docker_directory(
files=['hello'],
name='t',
files=["hello"],
name="t",
metadata=None,
extra_options='--$HOME',
extra_options="--$HOME",
branch=None,
template_dir=None,
plugins_dir=None,
static=[],
install=[],
spatialite=False,
version_note='$PWD',
version_note="$PWD",
) as temp_docker:
df = os.path.join(temp_docker, 'Dockerfile')
df = os.path.join(temp_docker, "Dockerfile")
df_contents = open(df).read()
assert "'$PWD'" in df_contents
assert "'--$HOME'" in df_contents
def test_compound_keys_after_sql():
assert '((a > :p0))' == utils.compound_keys_after_sql(['a'])
assert '''
assert "((a > :p0))" == utils.compound_keys_after_sql(["a"])
assert """
((a > :p0)
or
(a = :p0 and b > :p1))
'''.strip() == utils.compound_keys_after_sql(['a', 'b'])
assert '''
""".strip() == utils.compound_keys_after_sql(
["a", "b"]
)
assert """
((a > :p0)
or
(a = :p0 and b > :p1)
or
(a = :p0 and b = :p1 and c > :p2))
'''.strip() == utils.compound_keys_after_sql(['a', 'b', 'c'])
""".strip() == utils.compound_keys_after_sql(
["a", "b", "c"]
)
async def table_exists(table):
@ -314,7 +328,7 @@ async def test_resolve_table_and_format(
table_and_format, expected_table, expected_format
):
actual_table, actual_format = await utils.resolve_table_and_format(
table_and_format, table_exists, ['json']
table_and_format, table_exists, ["json"]
)
assert expected_table == actual_table
assert expected_format == actual_format
@ -322,9 +336,11 @@ async def test_resolve_table_and_format(
def test_table_columns():
conn = sqlite3.connect(":memory:")
conn.executescript("""
conn.executescript(
"""
create table places (id integer primary key, name text, bob integer)
""")
"""
)
assert ["id", "name", "bob"] == utils.table_columns(conn, "places")
@ -347,10 +363,7 @@ def test_table_columns():
],
)
def test_path_with_format(path, format, extra_qs, expected):
request = Request(
path.encode('utf8'),
{}, '1.1', 'GET', None
)
request = Request(path.encode("utf8"), {}, "1.1", "GET", None)
actual = utils.path_with_format(request, format, extra_qs)
assert expected == actual
@ -358,13 +371,13 @@ def test_path_with_format(path, format, extra_qs, expected):
@pytest.mark.parametrize(
"bytes,expected",
[
(120, '120 bytes'),
(1024, '1.0 KB'),
(1024 * 1024, '1.0 MB'),
(1024 * 1024 * 1024, '1.0 GB'),
(1024 * 1024 * 1024 * 1.3, '1.3 GB'),
(1024 * 1024 * 1024 * 1024, '1.0 TB'),
]
(120, "120 bytes"),
(1024, "1.0 KB"),
(1024 * 1024, "1.0 MB"),
(1024 * 1024 * 1024, "1.0 GB"),
(1024 * 1024 * 1024 * 1.3, "1.3 GB"),
(1024 * 1024 * 1024 * 1024, "1.0 TB"),
],
)
def test_format_bytes(bytes, expected):
assert expected == utils.format_bytes(bytes)