Skip to content

Commit

Permalink
Merge pull request #58 from omnivector-solutions/tucker/refactored-pe…
Browse files Browse the repository at this point in the history
…rmission-mapping

Refactored permission mapping in the TokenDecoder
  • Loading branch information
dusktreader authored Oct 14, 2024
2 parents 29f9b08 + 59b58ab commit 0e5dc9e
Show file tree
Hide file tree
Showing 9 changed files with 133 additions and 94 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

## Unreleased

## v2.1.0 - 2024-10-13

- Refactored pemission claim mapping

## v2.0.3 - 2024-10-10

- Made audience optional in cli
Expand Down
3 changes: 2 additions & 1 deletion armasec/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from armasec.armasec import Armasec
from armasec.openid_config_loader import OpenidConfigLoader
from armasec.token_decoder import TokenDecoder
from armasec.token_decoder import TokenDecoder, extract_keycloak_permissions
from armasec.token_manager import TokenManager
from armasec.token_payload import TokenPayload
from armasec.token_security import TokenSecurity
Expand All @@ -12,4 +12,5 @@
"TokenPayload",
"TokenDecoder",
"OpenidConfigLoader",
"extract_keycloak_permissions",
]
9 changes: 5 additions & 4 deletions armasec/schemas/armasec_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
This module provides a pydantic schema describing Armasec's configuration parameters.
"""

from typing import Any, Dict, List, Optional, Set, Union
from typing import Any, Dict, List, Optional, Set, Union, Callable

import snick
from pydantic import BaseModel, Field
Expand Down Expand Up @@ -46,12 +46,13 @@ class DomainConfig(BaseModel):
),
)
)
payload_claim_mapping: Optional[Dict[str, Any]] = Field(
permission_extractor: Optional[Callable[[Dict[str, Any]], List[str]]] = Field(
None,
description=snick.unwrap(
"""
Optional mappings that are applied to map claims to top-level properties of
TokenPayload. See docs for `TokenDecoder` for more info.
Optional function that may be used to extract permissions from the decoded token
dictionary when the permissions are not a top-level claim in the token.
See docs for `TokenDecoder` for more info.
"""
),
)
95 changes: 68 additions & 27 deletions armasec/token_decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
from functools import partial
from typing import Callable

import jmespath
import buzz
from jose import jwt

from armasec.exceptions import AuthenticationError, PayloadMappingError
Expand All @@ -30,7 +28,7 @@ def __init__(
algorithm: str = "RS256",
debug_logger: Callable[..., None] | None = None,
decode_options_override: dict | None = None,
payload_claim_mapping: dict | None = None,
permission_extractor: Callable[[dict], list[str]] | None = None,
):
"""
Initializes a TokenDecoder.
Expand All @@ -44,36 +42,46 @@ def __init__(
decode_options_override: Options that can override the default behavior of the jwt
decode method. For example, one can ignore token expiration by
setting this to `{ "verify_exp": False }`
payload_claim_mapping: Optional mappings that are applied to map claims to top-level
attribute of TokenPayload using a dict format of:
permission_extractor: Optional function that may be used to extract permissions from
the decoded token dictionary when the permissions are not a
top-level claim in the token. If not provided, permissions will
be assumed to be a top-level claim in the token.
```
{
"top_level_attribute": "decoded.token.JMESPath"
}
```
The values _must_ be a valid JMESPath.
Consider this example:
Consider the example token:
```
{
"permissions": "resource_access.default.roles"
"exp": 1728627701,
"iat": 1728626801,
"jti": "24fdb7ef-d773-4e6b-982a-b8126dd58af7",
"sub": "dfa64115-40b5-46ab-924c-c376e73f631d",
"azp": "my-client",
"resource_access": {
"my-client": {
"roles": [
"read:stuff"
]
},
},
}
```
The above example would result in a TokenPayload like:
In this example, the permissions are found at
`resource_access.my-client.roles`. To produce a TokenPayload
with the permissions set as expected, you could supply a
permission extractor like this:
```
TokenPayload(permissions=token["resource_access"]["default"]["roles"])
def my_extractor(decoded_token: dict) -> list[str]:
resource_key = decoded_token["azp"]
return decoded_token["resource_access"][resource_key]["roles"]
```
Raises a 500 if the path does not match
"""
self.algorithm = algorithm
self.jwks = jwks
self.debug_logger = debug_logger if debug_logger else noop
self.decode_options_override = decode_options_override if decode_options_override else {}
self.payload_claim_mapping = payload_claim_mapping if payload_claim_mapping else {}
self.permission_extractor = permission_extractor

def get_decode_key(self, token: str) -> dict:
"""
Expand Down Expand Up @@ -128,18 +136,15 @@ def decode(self, token: str, **claims) -> TokenPayload:
self.debug_logger(f"Raw payload dictionary is {payload_dict}")

with PayloadMappingError.handle_errors(
"Failed to map decoded token to payload",
"Failed to map decoded token to TokenPayload",
do_except=partial(log_error, self.debug_logger),
):
for payload_key, token_jmespath in self.payload_claim_mapping.items():
mapped_value = jmespath.search(token_jmespath, payload_dict)
buzz.require_condition(
mapped_value is not None,
f"No matching values found for claim mapping {token_jmespath} -> {payload_key}",
raise_exc_class=KeyError,
if self.permission_extractor is not None:
self.debug_logger("Attempting to extract permissions.")
payload_dict["permissions"] = self.permission_extractor(payload_dict)
self.debug_logger(
f"Payload dictionary with extracted permissions is {payload_dict}"
)
payload_dict[payload_key] = mapped_value
self.debug_logger(f"Mapped payload dictionary is {payload_dict}")

self.debug_logger("Attempting to convert to TokenPayload")
token_payload = TokenPayload(
Expand All @@ -148,3 +153,39 @@ def decode(self, token: str, **claims) -> TokenPayload:
)
self.debug_logger(f"Built token_payload as {token_payload}")
return token_payload


def extract_keycloak_permissions(decoded_token: dict) -> list[str]:
"""
Provide a permission extractor for Keycloak.
By default, Keycloak packages the roles for a given client
nested within the "resource_access" claim. In order to extract
those roles into the expected permissions in the TokenPayload,
this permission_extractor can be used.
Here is an example decoded token from Keycloak (with some stuff
removed to improve readability):
```
{
"exp": 1728627701,
"iat": 1728626801,
"jti": "24fdb7ef-d773-4e6b-982a-b8126dd58af7",
"sub": "dfa64115-40b5-46ab-924c-c376e73f631d",
"azp": "my-client",
"resource_access": {
"my-client": {
"roles": [
"read:stuff"
]
},
},
}
```
This extractor would extract the roles `["read:stuff"]` as the
permissions for the TokenPayload returned by the TokenDecoder.
"""
resource_key = decoded_token["azp"]
return decoded_token["resource_access"][resource_key]["roles"]
2 changes: 1 addition & 1 deletion armasec/token_security.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ def _load_manager(self, domain_config: DomainConfig) -> TokenManager:
loader.jwks,
domain_config.algorithm,
debug_logger=self.debug_logger,
payload_claim_mapping=domain_config.payload_claim_mapping,
permission_extractor=domain_config.permission_extractor,
)
return TokenManager(
loader.config,
Expand Down
12 changes: 6 additions & 6 deletions docs/source/tutorials/getting_started_with_keycloak.md
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ and this user does not have any roles mapped to it!
## Start up the example app

```python title="example.py" linenums="1"
from armasec import Armasec
from armasec import Armasec, extract_keycloak_permissions
from fastapi import FastAPI, Depends


Expand All @@ -211,7 +211,7 @@ armasec = Armasec(
domain="localhost:8080/realms/master",
audience="http://keycloak.local",
use_https=False,
payload_claim_mapping=dict(permissions="resource_access.armasec_tutorial.roles"),
permission_extractor=extract_keycloak_permissions,
debug_logger=print,
debug_exceptions=True,
)
Expand All @@ -224,10 +224,10 @@ async def check_access():
Note in this example that the `use_https` flag must be set to false to allow a local
server using unsecured HTTP.

Also not that we need to add a `payload_claim_mapping` because Keycloak does not provide
a permissions claim at the top level. This mapping copies the roles found at
`resource_access.armasec_tutorial.roles` to a top-level attribute of the token payload
called permissions.
Also not that we need to tell Armasec to use `extract_keycloak_permissions()` because
Keycloak does not provide a permissions claim at the top level. This extractor function
extracts the roles from `resource_access.armasec_tutorial.roles` so that they can
be used as the "permissions" in the decoded token payload.

Copy the `example.py` app to a local source file called "example.py".

Expand Down
26 changes: 2 additions & 24 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 3 additions & 12 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "armasec"
version = "2.0.3"
version = "2.1.0"
description = "Injectable FastAPI auth via OIDC"
authors = ["Omnivector Engineering Team <info@omnivector.solutions>"]
license = "MIT"
Expand All @@ -27,6 +27,7 @@ pydantic = "^2.7"
httpx = "^0"
snick = "^1.3"
py-buzz = "^4.1"
pluggy = "^1.4.0"

# These must be included as a main dependency for the pytest extension to work out of the box
respx = "^0"
Expand All @@ -39,13 +40,10 @@ loguru = {version = "^0.5.3", optional = true}
rich = {version = "^13.5.2", optional = true}
pendulum = {version = "^3.0.0", optional = true}
pyperclip = {version = "^1.8.2", optional = true}
jmespath = "^1.0.1"
pluggy = "^1.4.0"

[tool.poetry.extras]
cli = ["typer", "loguru", "rich", "pendulum", "pyperclip"]


[tool.poetry.group.dev.dependencies]
ipython = ">=7,<9"
asgi-lifespan = "^1.0.1"
Expand All @@ -63,8 +61,6 @@ pygments = "^2.16.1"
plummet = {extras = ["time-machine"], version = "^1.2.1"}
pytest-mock = "^3.11.1"
ruff = "^0.3"
types-jmespath = "^1.0.2.7"


[tool.poetry.scripts]
armasec = {callable = "armasec_cli.main:app", extras = ["cli"]}
Expand All @@ -81,12 +77,7 @@ testpaths = ["tests"]
asyncio_mode = "auto"

[[tool.mypy.overrides]]
module = [
"jose",
"buzz",
"snick",
"auto_name_enum",
]
module = ["jose"]
ignore_missing_imports = true

[tool.ruff]
Expand Down
Loading

0 comments on commit 0e5dc9e

Please sign in to comment.