2017-11-10 10:48:16 -08:00
"""
Tests for various datasette helper functions .
"""
2017-11-10 11:25:54 -08:00
from datasette import utils
2019-06-23 20:13:09 -07:00
from datasette . utils . asgi import Request
2019-04-15 14:51:20 -07:00
from datasette . filters import Filters
2017-12-08 08:06:24 -08:00
import json
import os
2020-02-15 09:56:48 -08:00
import pathlib
2017-10-23 22:54:58 -07:00
import pytest
2019-04-06 18:58:51 -07:00
import sqlite3
2017-12-08 08:06:24 -08:00
import tempfile
from unittest . mock import patch
2017-10-23 22:54:58 -07:00
2019-05-03 22:15:14 -04:00
@pytest.mark.parametrize (
" path,expected " ,
[
( " foo " , [ " foo " ] ) ,
( " foo,bar " , [ " foo " , " bar " ] ) ,
( " 123,433,112 " , [ " 123 " , " 433 " , " 112 " ] ) ,
( " 123 % 2C433,112 " , [ " 123,433 " , " 112 " ] ) ,
( " 123 %2F 433 %2F 112 " , [ " 123/433/112 " ] ) ,
] ,
)
2018-04-08 17:06:10 -07:00
def test_urlsafe_components ( path , expected ) :
assert expected == utils . urlsafe_components ( path )
2017-10-23 22:54:58 -07:00
2019-05-03 22:15:14 -04:00
@pytest.mark.parametrize (
" path,added_args,expected " ,
[
( " /foo " , { " bar " : 1 } , " /foo?bar=1 " ) ,
( " /foo?bar=1 " , { " baz " : 2 } , " /foo?bar=1&baz=2 " ) ,
( " /foo?bar=1&bar=2 " , { " baz " : 3 } , " /foo?bar=1&bar=2&baz=3 " ) ,
( " /foo?bar=1 " , { " bar " : None } , " /foo " ) ,
# Test order is preserved
(
" /?_facet=prim_state&_facet=area_name " ,
( ( " prim_state " , " GA " ) , ) ,
" /?_facet=prim_state&_facet=area_name&prim_state=GA " ,
) ,
(
" /?_facet=state&_facet=city&state=MI " ,
( ( " city " , " Detroit " ) , ) ,
" /?_facet=state&_facet=city&state=MI&city=Detroit " ,
) ,
(
" /?_facet=state&_facet=city " ,
( ( " _facet " , " planet_int " ) , ) ,
" /?_facet=state&_facet=city&_facet=planet_int " ,
) ,
] ,
)
2018-05-12 18:35:25 -03:00
def test_path_with_added_args ( path , added_args , expected ) :
2019-06-23 20:13:09 -07:00
request = Request . fake ( path )
2018-05-12 18:35:25 -03:00
actual = utils . path_with_added_args ( request , added_args )
assert expected == actual
2019-05-03 22:15:14 -04:00
@pytest.mark.parametrize (
" path,args,expected " ,
[
( " /foo?bar=1 " , { " bar " } , " /foo " ) ,
( " /foo?bar=1&baz=2 " , { " bar " } , " /foo?baz=2 " ) ,
( " /foo?bar=1&bar=2&bar=3 " , { " bar " : " 2 " } , " /foo?bar=1&bar=3 " ) ,
] ,
)
2018-05-14 17:42:10 -03:00
def test_path_with_removed_args ( path , args , expected ) :
2019-06-23 20:13:09 -07:00
request = Request . fake ( path )
2018-05-14 17:42:10 -03:00
actual = utils . path_with_removed_args ( request , args )
assert expected == actual
2019-03-17 15:55:04 -07:00
# Run the test again but this time use the path= argument
2019-06-23 20:13:09 -07:00
request = Request . fake ( " / " )
2019-03-17 15:55:04 -07:00
actual = utils . path_with_removed_args ( request , args , path = path )
assert expected == actual
2018-05-14 17:42:10 -03:00
2019-05-03 22:15:14 -04:00
@pytest.mark.parametrize (
" path,args,expected " ,
[
( " /foo?bar=1 " , { " bar " : 2 } , " /foo?bar=2 " ) ,
( " /foo?bar=1&baz=2 " , { " bar " : None } , " /foo?baz=2 " ) ,
] ,
)
2018-05-15 06:34:45 -03:00
def test_path_with_replaced_args ( path , args , expected ) :
2019-06-23 20:13:09 -07:00
request = Request . fake ( path )
2018-05-15 06:34:45 -03:00
actual = utils . path_with_replaced_args ( request , args )
assert expected == actual
2018-06-21 07:56:28 -07:00
@pytest.mark.parametrize (
" row,pks,expected_path " ,
[
( { " A " : " foo " , " B " : " bar " } , [ " A " , " B " ] , " foo,bar " ) ,
( { " A " : " f,o " , " B " : " bar " } , [ " A " , " B " ] , " f % 2Co,bar " ) ,
( { " A " : 123 } , [ " A " ] , " 123 " ) ,
(
utils . CustomRow (
[ " searchable_id " , " tag " ] ,
[
2019-05-03 22:15:14 -04:00
( " searchable_id " , { " value " : 1 , " label " : " 1 " } ) ,
( " tag " , { " value " : " feline " , " label " : " feline " } ) ,
2018-06-21 07:56:28 -07:00
] ,
) ,
[ " searchable_id " , " tag " ] ,
" 1,feline " ,
) ,
] ,
)
2017-10-23 22:54:58 -07:00
def test_path_from_row_pks ( row , pks , expected_path ) :
2017-11-10 11:25:54 -08:00
actual_path = utils . path_from_row_pks ( row , pks , False )
2017-10-23 22:54:58 -07:00
assert expected_path == actual_path
2017-10-24 07:58:41 -07:00
2019-05-03 22:15:14 -04:00
@pytest.mark.parametrize (
" obj,expected " ,
[
(
{
" Description " : " Soft drinks " ,
" Picture " : b " \x15 \x1c \x02 \xc7 \xad \x05 \xfe " ,
" CategoryID " : 1 ,
} ,
"""
2017-10-24 07:58:41 -07:00
{ " CategoryID " : 1 , " Description " : " Soft drinks " , " Picture " : { " $base64 " : true , " encoded " : " FRwCx60F/g== " } }
2019-05-03 22:15:14 -04:00
""" .strip(),
)
] ,
)
2017-10-24 07:58:41 -07:00
def test_custom_json_encoder ( obj , expected ) :
2019-05-03 22:15:14 -04:00
actual = json . dumps ( obj , cls = utils . CustomJSONEncoder , sort_keys = True )
2017-10-24 07:58:41 -07:00
assert expected == actual
2017-10-24 17:06:23 -07:00
2019-05-03 22:15:14 -04:00
@pytest.mark.parametrize (
" bad_sql " ,
[
" update blah; " ,
2020-02-04 20:13:24 -06:00
" -- sql comment to skip \n update blah; " ,
" update blah set some_column= ' # Hello there \n \n * This is a list \n * of items \n -- \n [And a link](https://github.com/simonw/datasette-render-markdown). ' \n as demo_markdown " ,
2020-05-06 10:18:31 -07:00
" PRAGMA case_sensitive_like = true " ,
" SELECT * FROM pragma_not_on_allow_list( ' idx52 ' ) " ,
2019-05-03 22:15:14 -04:00
] ,
)
2017-11-04 19:49:18 -07:00
def test_validate_sql_select_bad ( bad_sql ) :
2017-11-10 11:25:54 -08:00
with pytest . raises ( utils . InvalidSql ) :
utils . validate_sql_select ( bad_sql )
2017-11-04 19:49:18 -07:00
2019-05-03 22:15:14 -04:00
@pytest.mark.parametrize (
" good_sql " ,
[
" select count(*) from airports " ,
" select foo from bar " ,
2020-02-04 20:13:24 -06:00
" --sql comment to skip \n select foo from bar " ,
" select ' # Hello there \n \n * This is a list \n * of items \n -- \n [And a link](https://github.com/simonw/datasette-render-markdown). ' \n as demo_markdown " ,
2019-05-03 22:15:14 -04:00
" select 1 + 1 " ,
2019-10-06 10:23:58 -07:00
" explain select 1 + 1 " ,
" explain query plan select 1 + 1 " ,
2019-05-03 22:15:14 -04:00
" SELECT \n blah FROM foo " ,
" WITH RECURSIVE cnt(x) AS (SELECT 1 UNION ALL SELECT x+1 FROM cnt LIMIT 10) SELECT x FROM cnt; " ,
2019-10-06 10:23:58 -07:00
" explain WITH RECURSIVE cnt(x) AS (SELECT 1 UNION ALL SELECT x+1 FROM cnt LIMIT 10) SELECT x FROM cnt; " ,
" explain query plan WITH RECURSIVE cnt(x) AS (SELECT 1 UNION ALL SELECT x+1 FROM cnt LIMIT 10) SELECT x FROM cnt; " ,
2020-05-06 10:18:31 -07:00
" SELECT * FROM pragma_index_info( ' idx52 ' ) " ,
" select * from pragma_table_xinfo( ' table ' ) " ,
2019-05-03 22:15:14 -04:00
] ,
)
2017-11-04 19:49:18 -07:00
def test_validate_sql_select_good ( good_sql ) :
2017-11-10 11:25:54 -08:00
utils . validate_sql_select ( good_sql )
2017-11-19 08:59:26 -08:00
2019-09-02 17:32:27 -07:00
@pytest.mark.parametrize ( " open_quote,close_quote " , [ ( ' " ' , ' " ' ) , ( " [ " , " ] " ) ] )
def test_detect_fts ( open_quote , close_quote ) :
2019-05-03 22:15:14 -04:00
sql = """
2017-11-19 08:59:26 -08:00
CREATE TABLE " Dumb_Table " (
" TreeID " INTEGER ,
" qSpecies " TEXT
) ;
CREATE TABLE " Street_Tree_List " (
" TreeID " INTEGER ,
" qSpecies " TEXT ,
" qAddress " TEXT ,
" SiteOrder " INTEGER ,
" qSiteInfo " TEXT ,
" PlantType " TEXT ,
" qCaretaker " TEXT
) ;
2017-11-24 14:51:00 -08:00
CREATE VIEW Test_View AS SELECT * FROM Dumb_Table ;
2019-09-02 17:32:27 -07:00
CREATE VIRTUAL TABLE { open } Street_Tree_List_fts { close } USING FTS4 ( " qAddress " , " qCaretaker " , " qSpecies " , content = { open } Street_Tree_List { close } ) ;
2017-12-06 20:54:25 -08:00
CREATE VIRTUAL TABLE r USING rtree ( a , b , c ) ;
2019-09-02 17:32:27 -07:00
""" .format(
open = open_quote , close = close_quote
)
2019-05-03 22:15:14 -04:00
conn = utils . sqlite3 . connect ( " :memory: " )
2017-11-19 08:59:26 -08:00
conn . executescript ( sql )
2019-05-03 22:15:14 -04:00
assert None is utils . detect_fts ( conn , " Dumb_Table " )
assert None is utils . detect_fts ( conn , " Test_View " )
assert None is utils . detect_fts ( conn , " r " )
assert " Street_Tree_List_fts " == utils . detect_fts ( conn , " Street_Tree_List " )
@pytest.mark.parametrize (
" url,expected " ,
[
( " http://www.google.com/ " , True ) ,
( " https://example.com/ " , True ) ,
( " www.google.com " , False ) ,
( " http://www.google.com/ is a search engine " , False ) ,
] ,
)
2017-11-29 09:05:24 -08:00
def test_is_url ( url , expected ) :
assert expected == utils . is_url ( url )
2017-11-29 23:09:54 -08:00
2019-05-03 22:15:14 -04:00
@pytest.mark.parametrize (
" s,expected " ,
[
( " simple " , " simple " ) ,
( " MixedCase " , " MixedCase " ) ,
( " -no-leading-hyphens " , " no-leading-hyphens-65bea6 " ) ,
( " _no-leading-underscores " , " no-leading-underscores-b921bc " ) ,
( " no spaces " , " no-spaces-7088d7 " ) ,
( " - " , " 336d5e " ) ,
( " no $ characters " , " no--characters-59e024 " ) ,
] ,
)
2017-11-29 23:09:54 -08:00
def test_to_css_class ( s , expected ) :
assert expected == utils . to_css_class ( s )
2017-12-08 08:06:24 -08:00
def test_temporary_docker_directory_uses_hard_link ( ) :
with tempfile . TemporaryDirectory ( ) as td :
os . chdir ( td )
2019-05-03 22:15:14 -04:00
open ( " hello " , " w " ) . write ( " world " )
2017-12-08 08:06:24 -08:00
# Default usage of this should use symlink
with utils . temporary_docker_directory (
2019-05-03 22:15:14 -04:00
files = [ " hello " ] ,
name = " t " ,
2017-12-08 08:06:24 -08:00
metadata = None ,
2017-12-09 10:38:04 -08:00
extra_options = None ,
branch = None ,
template_dir = None ,
2018-04-15 22:22:01 -07:00
plugins_dir = None ,
2017-12-09 10:38:04 -08:00
static = [ ] ,
2018-04-18 07:48:34 -07:00
install = [ ] ,
2018-05-31 07:47:22 -07:00
spatialite = False ,
2018-06-17 13:14:55 -07:00
version_note = None ,
2017-12-08 08:06:24 -08:00
) as temp_docker :
2019-05-03 22:15:14 -04:00
hello = os . path . join ( temp_docker , " hello " )
assert " world " == open ( hello ) . read ( )
2017-12-08 08:06:24 -08:00
# It should be a hard link
assert 2 == os . stat ( hello ) . st_nlink
2019-05-03 22:15:14 -04:00
@patch ( " os.link " )
2017-12-08 08:06:24 -08:00
def test_temporary_docker_directory_uses_copy_if_hard_link_fails ( mock_link ) :
# Copy instead if os.link raises OSError (normally due to different device)
mock_link . side_effect = OSError
with tempfile . TemporaryDirectory ( ) as td :
os . chdir ( td )
2019-05-03 22:15:14 -04:00
open ( " hello " , " w " ) . write ( " world " )
2017-12-08 08:06:24 -08:00
# Default usage of this should use symlink
with utils . temporary_docker_directory (
2019-05-03 22:15:14 -04:00
files = [ " hello " ] ,
name = " t " ,
2017-12-08 08:06:24 -08:00
metadata = None ,
2017-12-09 10:38:04 -08:00
extra_options = None ,
branch = None ,
template_dir = None ,
2018-04-15 22:22:01 -07:00
plugins_dir = None ,
2017-12-09 10:38:04 -08:00
static = [ ] ,
2018-04-18 07:48:34 -07:00
install = [ ] ,
2018-05-31 07:47:22 -07:00
spatialite = False ,
2018-06-17 13:14:55 -07:00
version_note = None ,
2017-12-08 08:06:24 -08:00
) as temp_docker :
2019-05-03 22:15:14 -04:00
hello = os . path . join ( temp_docker , " hello " )
assert " world " == open ( hello ) . read ( )
2017-12-08 08:06:24 -08:00
# It should be a copy, not a hard link
assert 1 == os . stat ( hello ) . st_nlink
2018-03-29 22:10:09 -07:00
2019-05-03 15:59:01 +02:00
def test_temporary_docker_directory_quotes_args ( ) :
2019-05-03 22:15:14 -04:00
with tempfile . TemporaryDirectory ( ) as td :
2019-05-03 15:59:01 +02:00
os . chdir ( td )
2019-05-03 22:15:14 -04:00
open ( " hello " , " w " ) . write ( " world " )
2019-05-03 15:59:01 +02:00
with utils . temporary_docker_directory (
2019-05-03 22:15:14 -04:00
files = [ " hello " ] ,
name = " t " ,
2019-05-03 15:59:01 +02:00
metadata = None ,
2019-05-03 22:15:14 -04:00
extra_options = " --$HOME " ,
2019-05-03 15:59:01 +02:00
branch = None ,
template_dir = None ,
plugins_dir = None ,
static = [ ] ,
install = [ ] ,
spatialite = False ,
2019-05-03 22:15:14 -04:00
version_note = " $PWD " ,
2019-05-03 15:59:01 +02:00
) as temp_docker :
2019-05-03 22:15:14 -04:00
df = os . path . join ( temp_docker , " Dockerfile " )
2019-05-03 15:59:01 +02:00
df_contents = open ( df ) . read ( )
assert " ' $PWD ' " in df_contents
assert " ' --$HOME ' " in df_contents
2018-03-29 22:10:09 -07:00
def test_compound_keys_after_sql ( ) :
2019-05-03 22:15:14 -04:00
assert " ((a > :p0)) " == utils . compound_keys_after_sql ( [ " a " ] )
assert """
2018-04-03 06:39:50 -07:00
( ( a > : p0 )
2018-03-29 22:10:09 -07:00
or
2018-04-03 06:39:50 -07:00
( a = : p0 and b > : p1 ) )
2019-05-03 22:15:14 -04:00
""" .strip() == utils.compound_keys_after_sql(
[ " a " , " b " ]
)
assert """
2018-04-03 06:39:50 -07:00
( ( a > : p0 )
2018-03-29 22:10:09 -07:00
or
2018-04-03 06:39:50 -07:00
( a = : p0 and b > : p1 )
2018-03-29 22:10:09 -07:00
or
2018-04-03 06:39:50 -07:00
( a = : p0 and b = : p1 and c > : p2 ) )
2019-05-03 22:15:14 -04:00
""" .strip() == utils.compound_keys_after_sql(
[ " a " , " b " , " c " ]
)
2018-06-14 23:51:23 -07:00
2019-03-31 11:02:22 -07:00
async def table_exists ( table ) :
2018-06-14 23:51:23 -07:00
return table == " exists.csv "
2019-03-31 11:02:22 -07:00
@pytest.mark.asyncio
2018-06-14 23:51:23 -07:00
@pytest.mark.parametrize (
" table_and_format,expected_table,expected_format " ,
[
( " blah " , " blah " , None ) ,
( " blah.csv " , " blah " , " csv " ) ,
( " blah.json " , " blah " , " json " ) ,
( " blah.baz " , " blah.baz " , None ) ,
( " exists.csv " , " exists.csv " , None ) ,
] ,
)
2019-03-31 11:02:22 -07:00
async def test_resolve_table_and_format (
2018-06-14 23:51:23 -07:00
table_and_format , expected_table , expected_format
) :
2019-03-31 11:02:22 -07:00
actual_table , actual_format = await utils . resolve_table_and_format (
2019-05-03 22:15:14 -04:00
table_and_format , table_exists , [ " json " ]
2018-06-14 23:51:23 -07:00
)
assert expected_table == actual_table
assert expected_format == actual_format
2019-04-06 18:58:51 -07:00
def test_table_columns ( ) :
conn = sqlite3 . connect ( " :memory: " )
2019-05-03 22:15:14 -04:00
conn . executescript (
"""
2019-04-06 18:58:51 -07:00
create table places ( id integer primary key , name text , bob integer )
2019-05-03 22:15:14 -04:00
"""
)
2019-04-06 18:58:51 -07:00
assert [ " id " , " name " , " bob " ] == utils . table_columns ( conn , " places " )
2018-06-14 23:51:23 -07:00
@pytest.mark.parametrize (
" path,format,extra_qs,expected " ,
[
( " /foo?sql=select+1 " , " csv " , { } , " /foo.csv?sql=select+1 " ) ,
( " /foo?sql=select+1 " , " json " , { } , " /foo.json?sql=select+1 " ) ,
( " /foo/bar " , " json " , { } , " /foo/bar.json " ) ,
( " /foo/bar " , " csv " , { } , " /foo/bar.csv " ) ,
( " /foo/bar.csv " , " json " , { } , " /foo/bar.csv?_format=json " ) ,
( " /foo/bar " , " csv " , { " _dl " : 1 } , " /foo/bar.csv?_dl=1 " ) ,
( " /foo/b.csv " , " json " , { " _dl " : 1 } , " /foo/b.csv?_dl=1&_format=json " ) ,
(
" /sf-trees/Street_Tree_List?_search=cherry&_size=1000 " ,
" csv " ,
{ " _dl " : 1 } ,
" /sf-trees/Street_Tree_List.csv?_search=cherry&_size=1000&_dl=1 " ,
) ,
] ,
)
def test_path_with_format ( path , format , extra_qs , expected ) :
2019-06-23 20:13:09 -07:00
request = Request . fake ( path )
2018-06-14 23:51:23 -07:00
actual = utils . path_with_format ( request , format , extra_qs )
assert expected == actual
2019-02-05 20:53:44 -08:00
@pytest.mark.parametrize (
" bytes,expected " ,
[
2019-05-03 22:15:14 -04:00
( 120 , " 120 bytes " ) ,
( 1024 , " 1.0 KB " ) ,
( 1024 * 1024 , " 1.0 MB " ) ,
( 1024 * 1024 * 1024 , " 1.0 GB " ) ,
( 1024 * 1024 * 1024 * 1.3 , " 1.3 GB " ) ,
( 1024 * 1024 * 1024 * 1024 , " 1.0 TB " ) ,
] ,
2019-02-05 20:53:44 -08:00
)
def test_format_bytes ( bytes , expected ) :
assert expected == utils . format_bytes ( bytes )
2019-12-29 18:48:13 +00:00
@pytest.mark.parametrize (
" query,expected " ,
[
( " dog " , ' " dog " ' ) ,
( " cat, " , ' " cat, " ' ) ,
( " cat dog " , ' " cat " " dog " ' ) ,
# If a phrase is already double quoted, leave it so
( ' " cat dog " ' , ' " cat dog " ' ) ,
( ' " cat dog " fish ' , ' " cat dog " " fish " ' ) ,
# Sensibly handle unbalanced double quotes
( ' cat " ' , ' " cat " ' ) ,
( ' " cat dog " " fish ' , ' " cat dog " " fish " ' ) ,
] ,
)
def test_escape_fts ( query , expected ) :
assert expected == utils . escape_fts ( query )
2020-02-15 09:56:48 -08:00
def test_check_connection_spatialite_raises ( ) :
path = str ( pathlib . Path ( __file__ ) . parent / " spatialite.db " )
conn = sqlite3 . connect ( path )
with pytest . raises ( utils . SpatialiteConnectionProblem ) :
utils . check_connection ( conn )
def test_check_connection_passes ( ) :
conn = sqlite3 . connect ( " :memory: " )
utils . check_connection ( conn )
2020-03-16 19:47:37 -07:00
@pytest.mark.asyncio
async def test_request_post_vars ( ) :
scope = {
" http_version " : " 1.1 " ,
" method " : " POST " ,
" path " : " / " ,
" raw_path " : b " / " ,
" query_string " : b " " ,
" scheme " : " http " ,
" type " : " http " ,
" headers " : [ [ b " content-type " , b " application/x-www-form-urlencoded " ] ] ,
}
async def receive ( ) :
return { " type " : " http.request " , " body " : b " foo=bar&baz=1 " , " more_body " : False }
request = Request ( scope , receive )
assert { " foo " : " bar " , " baz " : " 1 " } == await request . post_vars ( )
2020-05-27 12:25:52 -07:00
2020-05-29 15:51:30 -07:00
def test_request_args ( ) :
request = Request . fake ( " /foo?multi=1&multi=2&single=3 " )
assert " 1 " == request . args . get ( " multi " )
assert " 3 " == request . args . get ( " single " )
2020-05-29 16:18:01 -07:00
assert " 1 " == request . args [ " multi " ]
assert " 3 " == request . args [ " single " ]
2020-05-29 15:51:30 -07:00
assert [ " 1 " , " 2 " ] == request . args . getlist ( " multi " )
assert [ ] == request . args . getlist ( " missing " )
2020-05-29 16:18:01 -07:00
assert " multi " in request . args
assert " single " in request . args
assert " missing " not in request . args
expected = [ " multi " , " single " ]
assert expected == list ( request . args . keys ( ) )
for i , key in enumerate ( request . args ) :
assert expected [ i ] == key
assert 2 == len ( request . args )
2020-05-29 15:51:30 -07:00
with pytest . raises ( KeyError ) :
request . args [ " missing " ]
2020-05-27 12:25:52 -07:00
def test_call_with_supported_arguments ( ) :
def foo ( a , b ) :
return " {} + {} " . format ( a , b )
assert " 1+2 " == utils . call_with_supported_arguments ( foo , a = 1 , b = 2 )
assert " 1+2 " == utils . call_with_supported_arguments ( foo , a = 1 , b = 2 , c = 3 )
with pytest . raises ( TypeError ) :
utils . call_with_supported_arguments ( foo , a = 1 )