Move create-token command into cli.py, refs #1855

This commit is contained in:
Simon Willison 2022-12-12 20:18:42 -08:00
commit e95b490d88
2 changed files with 123 additions and 129 deletions

View file

@ -7,10 +7,11 @@ from click_default_group import DefaultGroup
import json
import os
import pathlib
from runpy import run_module
import shutil
from subprocess import call
import sys
from runpy import run_module
import time
import webbrowser
from .app import (
OBSOLETE_SETTINGS,
@ -628,6 +629,127 @@ def serve(
uvicorn.run(ds.app(), **uvicorn_kwargs)
@cli.command()
@click.argument("id")
@click.option(
"--secret",
help="Secret used for signing the API tokens",
envvar="DATASETTE_SECRET",
required=True,
)
@click.option(
"-e",
"--expires-after",
help="Token should expire after this many seconds",
type=int,
)
@click.option(
"alls",
"-a",
"--all",
type=str,
metavar="ACTION",
multiple=True,
help="Restrict token to this action",
)
@click.option(
"databases",
"-d",
"--database",
type=(str, str),
metavar="DB ACTION",
multiple=True,
help="Restrict token to this action on this database",
)
@click.option(
"resources",
"-r",
"--resource",
type=(str, str, str),
metavar="DB RESOURCE ACTION",
multiple=True,
help="Restrict token to this action on this database resource (a table, SQL view or named query)",
)
@click.option(
"--debug",
help="Show decoded token",
is_flag=True,
)
@click.option(
"--plugins-dir",
type=click.Path(exists=True, file_okay=False, dir_okay=True),
help="Path to directory containing custom plugins",
)
def create_token(
id, secret, expires_after, alls, databases, resources, debug, plugins_dir
):
"""
Create a signed API token for the specified actor ID
Example:
datasette create-token root --secret mysecret
To only allow create-table:
\b
datasette create-token root --secret mysecret \\
--all create-table
Or to only allow insert-row against a specific table:
\b
datasette create-token root --secret myscret \\
--resource mydb mytable insert-row
Restricted actions can be specified multiple times using
multiple --all, --database, and --resource options.
Add --debug to see a decoded version of the token.
"""
ds = Datasette(secret=secret, plugins_dir=plugins_dir)
# Run ds.invoke_startup() in an event loop
loop = asyncio.get_event_loop()
loop.run_until_complete(ds.invoke_startup())
def fix_action(action):
# Warn if invalid, rename to abbr if possible
permission = ds.permissions.get(action)
if not permission:
# Output red message
click.secho(
f" Unknown permission: {action} ",
fg="red",
err=True,
)
return action
return permission.abbr or action
bits = {"a": id, "token": "dstok", "t": int(time.time())}
if expires_after:
bits["d"] = expires_after
if alls or databases or resources:
bits["_r"] = {}
if alls:
bits["_r"]["a"] = [fix_action(a) for a in alls]
if databases:
bits["_r"]["d"] = {}
for database, action in databases:
bits["_r"]["d"].setdefault(database, []).append(fix_action(action))
if resources:
bits["_r"]["r"] = {}
for database, table, action in resources:
bits["_r"]["r"].setdefault(database, {}).setdefault(table, []).append(
fix_action(action)
)
token = ds.sign(bits, namespace="token")
click.echo("dstok_{}".format(token))
if debug:
click.echo("\nDecoded:\n")
click.echo(json.dumps(ds.unsign(token, namespace="token"), indent=2))
pm.hook.register_commands(cli=cli)

View file

@ -1,9 +1,6 @@
from datasette import hookimpl, Permission
from datasette.utils import actor_matches_allow
import asyncio
import click
import itsdangerous
import json
import time
@ -261,131 +258,6 @@ def actor_from_request(datasette, request):
return actor
@hookimpl
def register_commands(cli):
from datasette.app import Datasette
@cli.command()
@click.argument("id")
@click.option(
"--secret",
help="Secret used for signing the API tokens",
envvar="DATASETTE_SECRET",
required=True,
)
@click.option(
"-e",
"--expires-after",
help="Token should expire after this many seconds",
type=int,
)
@click.option(
"alls",
"-a",
"--all",
type=str,
metavar="ACTION",
multiple=True,
help="Restrict token to this action",
)
@click.option(
"databases",
"-d",
"--database",
type=(str, str),
metavar="DB ACTION",
multiple=True,
help="Restrict token to this action on this database",
)
@click.option(
"resources",
"-r",
"--resource",
type=(str, str, str),
metavar="DB RESOURCE ACTION",
multiple=True,
help="Restrict token to this action on this database resource (a table, SQL view or named query)",
)
@click.option(
"--debug",
help="Show decoded token",
is_flag=True,
)
@click.option(
"--plugins-dir",
type=click.Path(exists=True, file_okay=False, dir_okay=True),
help="Path to directory containing custom plugins",
)
def create_token(
id, secret, expires_after, alls, databases, resources, debug, plugins_dir
):
"""
Create a signed API token for the specified actor ID
Example:
datasette create-token root --secret mysecret
To only allow create-table:
\b
datasette create-token root --secret mysecret \\
--all create-table
Or to only allow insert-row against a specific table:
\b
datasette create-token root --secret myscret \\
--resource mydb mytable insert-row
Restricted actions can be specified multiple times using
multiple --all, --database, and --resource options.
Add --debug to see a decoded version of the token.
"""
ds = Datasette(secret=secret, plugins_dir=plugins_dir)
# Run ds.invoke_startup() in an event loop
loop = asyncio.get_event_loop()
loop.run_until_complete(ds.invoke_startup())
def fix_action(action):
# Warn if invalid, rename to abbr if possible
permission = ds.permissions.get(action)
if not permission:
# Output red message
click.secho(
f" Unknown permission: {action} ",
fg="red",
err=True,
)
return action
return permission.abbr or action
bits = {"a": id, "token": "dstok", "t": int(time.time())}
if expires_after:
bits["d"] = expires_after
if alls or databases or resources:
bits["_r"] = {}
if alls:
bits["_r"]["a"] = [fix_action(a) for a in alls]
if databases:
bits["_r"]["d"] = {}
for database, action in databases:
bits["_r"]["d"].setdefault(database, []).append(fix_action(action))
if resources:
bits["_r"]["r"] = {}
for database, table, action in resources:
bits["_r"]["r"].setdefault(database, {}).setdefault(
table, []
).append(fix_action(action))
token = ds.sign(bits, namespace="token")
click.echo("dstok_{}".format(token))
if debug:
click.echo("\nDecoded:\n")
click.echo(json.dumps(ds.unsign(token, namespace="token"), indent=2))
@hookimpl
def skip_csrf(scope):
# Skip CSRF check for requests with content-type: application/json