mirror of
https://github.com/simonw/datasette.git
synced 2025-12-10 16:51:24 +01:00
datasette.create_token() method, closes #1951
This commit is contained in:
parent
d4cc1374f4
commit
fdf7c27b54
3 changed files with 113 additions and 31 deletions
|
|
@ -1,5 +1,5 @@
|
|||
import asyncio
|
||||
from typing import Sequence, Union, Tuple, Optional
|
||||
from typing import Sequence, Union, Tuple, Optional, Dict, Iterable
|
||||
import asgi_csrf
|
||||
import collections
|
||||
import datetime
|
||||
|
|
@ -16,6 +16,7 @@ import re
|
|||
import secrets
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
import urllib.parse
|
||||
from concurrent import futures
|
||||
from pathlib import Path
|
||||
|
|
@ -465,6 +466,45 @@ class Datasette:
|
|||
def unsign(self, signed, namespace="default"):
|
||||
return URLSafeSerializer(self._secret, namespace).loads(signed)
|
||||
|
||||
def create_token(
|
||||
self,
|
||||
actor_id: str,
|
||||
*,
|
||||
expires_after: Optional[int] = None,
|
||||
restrict_all: Optional[Iterable[str]] = None,
|
||||
restrict_database: Optional[Dict[str, Iterable[str]]] = None,
|
||||
restrict_resource: Optional[Dict[str, Dict[str, Iterable[str]]]] = None,
|
||||
):
|
||||
token = {"a": actor_id, "token": "dstok", "t": int(time.time())}
|
||||
if expires_after:
|
||||
token["d"] = expires_after
|
||||
|
||||
def abbreviate_action(action):
|
||||
# rename to abbr if possible
|
||||
permission = self.permissions.get(action)
|
||||
if not permission:
|
||||
return action
|
||||
return permission.abbr or action
|
||||
|
||||
if expires_after:
|
||||
token["d"] = expires_after
|
||||
if restrict_all or restrict_database or restrict_resource:
|
||||
token["_r"] = {}
|
||||
if restrict_all:
|
||||
token["_r"]["a"] = [abbreviate_action(a) for a in restrict_all]
|
||||
if restrict_database:
|
||||
token["_r"]["d"] = {}
|
||||
for database, actions in restrict_database.items():
|
||||
token["_r"]["d"][database] = [abbreviate_action(a) for a in actions]
|
||||
if restrict_resource:
|
||||
token["_r"]["r"] = {}
|
||||
for database, resources in restrict_resource.items():
|
||||
for resource, actions in resources.items():
|
||||
token["_r"]["r"].setdefault(database, {})[resource] = [
|
||||
abbreviate_action(a) for a in actions
|
||||
]
|
||||
return "dstok_{}".format(self.sign(token, namespace="token"))
|
||||
|
||||
def get_database(self, name=None, route=None):
|
||||
if route is not None:
|
||||
matches = [db for db in self.databases.values() if db.route == route]
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue