-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
69 lines (54 loc) · 1.99 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import string
from fastapi import FastAPI, Request
from authlib.integrations.requests_client import OAuth2Session
from fastapi.responses import RedirectResponse
import uvicorn
import jwt
from jwt import PyJWKClient
from dotenv import load_dotenv
import os
load_dotenv()
app = FastAPI()
token_endpoint = os.getenv('TOKEN_ENDPOINT')
authorization_endpoint = os.getenv('AUTHORIZATION_ENDPOINT')
redirect_uri = os.getenv('REDIRECT_URI')
client_id = os.getenv('CLIENT_ID')
client_secret = os.getenv('CLIENT_SECRET')
jwks_endpoint = os.getenv('JWKS_ENDPOINT')
scope = 'openid profile'
session = OAuth2Session(client_id, client_secret, None, None, scope, None, redirect_uri)
@app.get("/login")
def login():
uri, state = session.create_authorization_url(url=authorization_endpoint)
response = RedirectResponse(url=uri, status_code=302)
return response
@app.get("/oauth2/callback")
def oauth2Callback(request: Request):
try:
queryParams = request.query_params
token_response = session.fetch_token(url=token_endpoint, grant_type='authorization_code', code=queryParams["code"])
jwtToken = token_response["id_token"]
return validateToken(jwtToken)
except Exception as e:
return {"message": str(e), "error": True}
def validateToken(jwtToken: string):
try:
# get public key from jwks uri
jwks_client = PyJWKClient(jwks_endpoint)
signing_key = jwks_client.get_signing_key_from_jwt(jwtToken)
# get the algorithm type from the request header
header = jwt.get_unverified_header(jwtToken)
algorithm = header["alg"]
# finally try to decode the token
data = jwt.decode(
jwt=jwtToken,
key=signing_key.key,
audience=client_id,
algorithms=algorithm,
options={"verify_exp": True},
)
return data
except Exception as e:
return {"message": str(e), "error": True}
if __name__ == '__main__':
uvicorn.run(app, port=3000, host='0.0.0.0')