如何解决Python Authlib 是否支持解码 AWS Cognito 访问 / id 令牌?
我是 authlib 库的新手,想知道如何使用 verify authlib AWS Cognito 访问令牌,而无需安装额外的库依赖项。我在 fastapi 服务器上使用 openidc 身份验证代码流。
遗憾的是,AWS Cognito 不提供令牌自省端点。但是,他们确实记录了 token verification process。
最初我查看了 authlib 文档,试图找出如何使用 authlib 来实现这一点。最终,在检查了源代码之后,开始拼凑一个实现。
我的令牌以字符串格式接收,因此要使用 authlib 解码令牌,我目前正在尝试执行以下操作:
import asyncio
import base64
import json
import os
import httpx
from authlib.jose import JsonWebKey,KeySet,jwt
def decode_token(
client_id: str,issuer: str,keyset: KeySet,token: str,token_use: str
):
"""
Verify an AWS Cognito token
Rather than reinvent the wheel and install additional libraries
try and use what is provided by authlib to try and achieve the
process documented at Amazon Cognito User Pools : Using Tokens
Verifying A JWT.
1. Verify token contains header,payload and signature and can be
base64 decoded.
2. Validate the JWT key signature using public key used to encode
JWT. This can be found by inspecting kid in the token header.
3. Validate the expiry,client_id,issuer and token_use claims.
token_use can be access or id. If both access and id tokens are
used then token_use claim should be access or id.
:param: client_id is the AWS cognito user pool client_id
:param: issuer is the AWS issuer url for the user pool
:param: keyset authlib.jose.Keyset instance
:param: token is a token issued by AWS cognito
"""
# Split the token into header and payload
parts = token.split(".")
total_parts = len(parts)
if total_parts != 3:
msg = (
"An AWS cognito token should contain"," a header,payload and signature",)
raise ValueError(msg)
hdr = parts[0]
payload = parts[1]
signature = parts[2]
# Decode base64 header and payload,Python requires correct padding!
# Exception raised if jwt sections are not base64 encoded
hdr_x = base64.urlsafe_b64decode(hdr + "=" * (4 - len(hdr) % 4))
payload_x = base64.urlsafe_b64decode(payload + "=" * (4 - len(payload) % 4))
signature_x = base64.urlsafe_b64decode(signature + "=" * (4 - len(signature) % 4))
# display parts for debugging
print("Token Header := " + str(hdr_x))
print("\n\nToken Payload := " + str(payload_x))
print("\n\nToken Signature := " + str(signature_x))
# Get the id of the key that the jwt signature was encoded with
hdr_dict = json.loads(hdr_x.decode("utf-8"))
token_kid = hdr_dict["kid"]
# display kid for debugging
print(f"\n\nToken encoded with kid : {token_kid}")
# Use authlib keyset to try and find the kid
key = None
try:
key = keyset.find_by_kid(token_kid)
except ValueError as ve:
raise ve
# display key Metadata for debugging
print(f"\n\nMetadata for key {token_kid} is {key}")
# Found this utility function after looking at authlib
# source code. Is it documented anywhere else?
pem = key.as_pem()
# Use auth lib to verify token signature against key
claims = jwt.decode(token,pem)
claims.validate()
# Create payload dict
payload_dict = json.loads(payload_x.decode("utf-8"))
# Match the client ID against expected
token_client_id = payload_dict["client_id"]
if client_id != token_client_id:
raise ValueError("Token client id mismatched expected value")
print("i\n\nClient ID validation passed")
# Match the issuer against expected
token_issuer_id = payload_dict["iss"]
if issuer != token_issuer_id:
raise ValueError("Token iss mismatched expected value")
print("\n\nIssuer validation passed")
# Match the token_use claim against expected
token_token_use = payload_dict["token_use"]
if token_token_use != "access" and token_token_use != "id":
raise ValueError(
f"token_use claim in token contains invalid value : {token_token_use}"
)
if token_use != "both":
if token_use != token_token_use:
raise ValueError("Token use does not match expected")
print("\n\nToken_use validation passed")
print("\n\nAWS Cognito token verification passed")
async def get_keyset(jwks_url: str):
async with httpx.Asyncclient() as client:
resp = await client.get(jwks_url)
resp.raise_for_status()
return JsonWebKey.import_key_set(resp.text)
async def main():
client_id = os.environ.get("CLIENT_ID")
issuer = os.environ.get("ISSUER")
jwks_url = os.environ.get("JWKS_URL")
token = os.environ.get("JWT")
if client_id is None:
raise ValueError("Client_ID env var not set")
if issuer is None:
raise ValueError("ISSUER env var not set")
if token is None:
raise ValueError("JWT env var not set")
if jwks_url is None:
raise ValueError("JWKS_URL env var not set")
print(f"Client ID is:\t{client_id}")
print(f"Issuer is:\t{issuer}")
print(f"Token is:\t{token}")
print(f"\n\nJWKS url is:\t{jwks_url}")
keyset = await get_keyset(jwks_url)
if keyset is None:
raise ValueError("Failed to load keyset from {jwks}")
print(f"\n\nRetrieved keyset:\n\t{keyset.as_json()}")
decode_token(client_id,issuer,keyset,token,"access")
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
有没有使用 authlib 更快的方法来做到这一点?例如,它是否提供了接受令牌字符串和 jwks 端点并自动验证令牌签名和声明的方法?
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。