微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Python Authlib 是否支持解码 AWS Cognito 访问 / id 令牌?

如何解决Python Authlib 是否支持解码 AWS Cognito 访问 / id 令牌?

我是 authlib 库的新手,想知道如何使用 verify authlib AWS Cognito 访问令牌,而无需安装额外的库依赖项。我在 fastapi 服务器上使用 openidc 身份验证代码流。

遗憾的是,AWS Cognito 不提供令牌自省端点。但是,他们确实记录了 token verification process

最初我查看了 authlib 文档,试图找出如何使用 authlib 来实现这一点。最终,在检查了源代码之后,开始拼凑一个实现。

我的令牌以字符串格式接收,因此要使用 authlib 解码令牌,我目前正在尝试执行以下操作:

import asyncio
import base64
import json
import os

import httpx
from authlib.jose import JsonWebKey,KeySet,jwt


def decode_token(
    client_id: str,issuer: str,keyset: KeySet,token: str,token_use: str
):
    """
    Verify an AWS Cognito token

    Rather than reinvent the wheel and install additional libraries
    try and use what is provided by authlib to try and achieve the
    process documented at Amazon Cognito User Pools : Using Tokens
    Verifying A JWT.

    1. Verify token contains header,payload and signature and can be
    base64 decoded.
    2. Validate the JWT key signature using public key used to encode
    JWT. This can be found by inspecting kid in the token header.
    3. Validate the expiry,client_id,issuer and token_use claims.
    token_use can be access or id. If both access and id tokens are
    used then token_use claim should be access or id.

    :param: client_id is the AWS cognito user pool client_id
    :param: issuer is the AWS issuer url for the user pool
    :param: keyset authlib.jose.Keyset instance
    :param: token is a token issued by AWS cognito
    """

    # Split the token into header and payload
    parts = token.split(".")
    total_parts = len(parts)

    if total_parts != 3:
        msg = (
            "An AWS cognito token should contain"," a header,payload and signature",)
        raise ValueError(msg)

    hdr = parts[0]
    payload = parts[1]
    signature = parts[2]

    # Decode base64 header and payload,Python requires correct padding!
    # Exception raised if jwt sections are not base64 encoded
    hdr_x = base64.urlsafe_b64decode(hdr + "=" * (4 - len(hdr) % 4))
    payload_x = base64.urlsafe_b64decode(payload + "=" * (4 - len(payload) % 4))
    signature_x = base64.urlsafe_b64decode(signature + "=" * (4 - len(signature) % 4))

    # display parts for debugging
    print("Token Header := " + str(hdr_x))
    print("\n\nToken Payload := " + str(payload_x))
    print("\n\nToken Signature := " + str(signature_x))

    # Get the id of the key that the jwt signature was encoded with
    hdr_dict = json.loads(hdr_x.decode("utf-8"))
    token_kid = hdr_dict["kid"]

    # display kid for debugging
    print(f"\n\nToken encoded with kid : {token_kid}")

    # Use authlib keyset to try and find the kid
    key = None
    try:
        key = keyset.find_by_kid(token_kid)
    except ValueError as ve:
        raise ve

    # display key Metadata for debugging
    print(f"\n\nMetadata for key {token_kid} is {key}")

    # Found this utility function after looking at authlib
    # source code. Is it documented anywhere else?
    pem = key.as_pem()

    # Use auth lib to verify token signature against key
    claims = jwt.decode(token,pem)
    claims.validate()

    # Create payload dict
    payload_dict = json.loads(payload_x.decode("utf-8"))

    # Match the client ID against expected
    token_client_id = payload_dict["client_id"]
    if client_id != token_client_id:
        raise ValueError("Token client id mismatched expected value")
    print("i\n\nClient ID validation passed")

    # Match the issuer against expected
    token_issuer_id = payload_dict["iss"]
    if issuer != token_issuer_id:
        raise ValueError("Token iss mismatched expected value")
    print("\n\nIssuer validation passed")

    # Match the token_use claim against expected
    token_token_use = payload_dict["token_use"]
    if token_token_use != "access" and token_token_use != "id":
        raise ValueError(
            f"token_use claim in token contains invalid value : {token_token_use}"
        )

    if token_use != "both":
        if token_use != token_token_use:
            raise ValueError("Token use does not match expected")

    print("\n\nToken_use validation passed")

    print("\n\nAWS Cognito token verification passed")


async def get_keyset(jwks_url: str):
    async with httpx.Asyncclient() as client:
        resp = await client.get(jwks_url)
        resp.raise_for_status()

        return JsonWebKey.import_key_set(resp.text)


async def main():
    client_id = os.environ.get("CLIENT_ID")
    issuer = os.environ.get("ISSUER")
    jwks_url = os.environ.get("JWKS_URL")
    token = os.environ.get("JWT")

    if client_id is None:
        raise ValueError("Client_ID env var not set")

    if issuer is None:
        raise ValueError("ISSUER env var not set")

    if token is None:
        raise ValueError("JWT env var not set")

    if jwks_url is None:
        raise ValueError("JWKS_URL env var not set")

    print(f"Client ID is:\t{client_id}")
    print(f"Issuer is:\t{issuer}")
    print(f"Token is:\t{token}")
    print(f"\n\nJWKS url is:\t{jwks_url}")

    keyset = await get_keyset(jwks_url)
    if keyset is None:
        raise ValueError("Failed to load keyset from {jwks}")

    print(f"\n\nRetrieved keyset:\n\t{keyset.as_json()}")

    decode_token(client_id,issuer,keyset,token,"access")


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

有没有使用 authlib 更快的方法来做到这一点?例如,它是否提供了接受令牌字符串和 jwks 端点并自动验证令牌签名和声明的方法

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?