-
Notifications
You must be signed in to change notification settings - Fork 0
/
authentication.py
206 lines (176 loc) · 7.1 KB
/
authentication.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
import json
from typing import Annotated
from fastapi import APIRouter, Cookie, Request, Response
from fastapi.responses import RedirectResponse
from starlette.config import Config
import urllib
from app.api.endpoints.projects import getUserName
from cryptography.fernet import Fernet
# from starlette.requests import Request
from starlette.responses import HTMLResponse
from authlib.integrations.starlette_client import OAuth, OAuthError
router = APIRouter()
import jwt
import os
import time
## Read Oauth client info from .env for production
config = Config(".env")
oauth = OAuth(config)
oauth.register(
name="dev",
server_metadata_url="https://gitdev.nfdi4plants.org/.well-known/openid-configuration",
client_kwargs={"scope": "openid profile api"},
)
oauth.register(
name="tuebingen",
server_metadata_url="https://gitlab.nfdi4plants.de/.well-known/openid-configuration",
client_kwargs={"scope": "openid profile api"},
)
oauth.register(
name="freiburg",
server_metadata_url="https://git.nfdi4plants.org/.well-known/openid-configuration",
client_kwargs={"scope": "openid profile api"},
)
oauth.register(
name="plantmicrobe",
server_metadata_url="https://gitlab.plantmicrobe.de/.well-known/openid-configuration",
client_kwargs={"scope": "openid api profile"},
)
oauth.register(
name="tuebingen_testenv",
server_metadata_url="https://gitlab.test-nfdi4plants.de/.well-known/openid-configuration",
client_kwargs={"scope": "openid api profile"},
)
def encryptToken(content: bytes) -> bytes:
fernetKey = os.environ.get("FERNET").encode()
return Fernet(fernetKey).encrypt(content)
def writeLogJson(endpoint: str, status: int, startTime: float, error=None):
try:
with open("log.json", "r") as log:
jsonLog = json.load(log)
jsonLog.append(
{
"endpoint": endpoint,
"status": status,
"error": str(error),
"date": time.strftime("%d/%m/%Y - %H:%M:%S", time.localtime()),
"response_time": time.time() - startTime,
}
)
with open("log.json", "w") as logWrite:
json.dump(jsonLog, logWrite, indent=4, separators=(",", ": "))
except:
print("Error while logging to json!")
# redirect user to requested keycloak to enter login credentials
@router.get("/login", summary="Initiate login process for specified DataHUB")
async def login(request: Request, datahub: str):
# redirect_uri = (
# f"http://localhost:8000/arcmanager/api/v1/auth/callback?datahub={datahub}"
# )
redirect_uri = (
f"https://nfdi4plants.de/arcmanager/api/v1/auth/callback?datahub={datahub}"
)
try:
# construct authorization url for requested datahub and redirect
if datahub == "dev":
return await oauth.dev.authorize_redirect(request, redirect_uri)
elif datahub == "tübingen":
# change uri with 'ü' to 'ue'
# redirect_uri = "http://localhost:8000/arcmanager/api/v1/auth/callback?datahub=tuebingen"
redirect_uri = "https://nfdi4plants.de/arcmanager/api/v1/auth/callback?datahub=tuebingen"
return await oauth.tuebingen.authorize_redirect(request, redirect_uri)
elif datahub == "freiburg":
return await oauth.freiburg.authorize_redirect(request, redirect_uri)
elif datahub == "plantmicrobe":
return await oauth.plantmicrobe.authorize_redirect(request, redirect_uri)
elif datahub == "tuebingen":
return await oauth.tuebingen.authorize_redirect(request, redirect_uri)
elif datahub == "tuebingen_testenv":
return await oauth.tuebingen_testenv.authorize_redirect(
request, redirect_uri
)
else:
return "invalid DataHUB selection"
# if authentication fails (e.g. due to a timeout), then return back to the frontend containing an error in the cookies
except:
# response = RedirectResponse("http://localhost:5173")
response = RedirectResponse("https://nfdi4plants.de/arcmanager/app/index.html")
response.set_cookie("error", "DataHUB not available")
return response
# retrieve tokens after successful login and store it in session object
@router.get(
"/callback",
summary="Redirection after successful user login and creation of server-side user session",
include_in_schema=False,
)
async def callback(request: Request, datahub: str):
startTime = time.time()
# response = RedirectResponse("http://localhost:5173")
response = RedirectResponse("https://nfdi4plants.de/arcmanager/app/index.html")
try:
if datahub == "dev":
token = await oauth.dev.authorize_access_token(request)
elif datahub == "tuebingen":
token = await oauth.tuebingen.authorize_access_token(request)
elif datahub == "freiburg":
token = await oauth.freiburg.authorize_access_token(request)
elif datahub == "plantmicrobe":
token = await oauth.plantmicrobe.authorize_access_token(request)
elif datahub == "tuebingen_testenv":
token = await oauth.tuebingen_testenv.authorize_access_token(request)
except OAuthError as error:
return HTMLResponse(f"<h1>{error}</h1>")
try:
access_token = token.get("access_token")
except:
raise OAuthError(description="Failed retrieving the token data")
userInfo = token.get("userinfo")["sub"]
# read out private key from .env
pr_key = (
b"-----BEGIN RSA PRIVATE KEY-----\n"
+ os.environ.get("PRIVATE_RSA").encode()
+ b"\n-----END RSA PRIVATE KEY-----"
)
cookieData = {
"gitlab": encryptToken(access_token.encode()).decode(),
"target": datahub,
}
# encode cookie data with rsa key
encodedCookie = jwt.encode(cookieData, pr_key, algorithm="RS256")
request.session["data"] = encodedCookie
response.set_cookie(
"data",
encodedCookie,
httponly=True,
secure=True,
samesite="strict",
)
response.set_cookie("logged_in", "true", httponly=False)
response.set_cookie("timer", time.time(), httponly=False)
try:
username = await getUserName(datahub, userInfo, access_token)
response.set_cookie(
"username",
urllib.parse.quote(username),
httponly=False,
)
except:
response.set_cookie("username", "user", httponly=False)
# delete any leftover error cookie
response.delete_cookie("error")
request.session.clear()
try:
writeLogJson("callback", 307, startTime)
except:
pass
return response
@router.get("/logout", summary="Manually delete server-side user session")
async def logout(request: Request):
response = Response("logout successful", media_type="text/plain")
# if the user logs out, delete the "data" cookie containing the gitlab token, as well as the other cookies set
response.delete_cookie("data")
response.delete_cookie("logged_in")
response.delete_cookie("username")
response.delete_cookie("timer")
request.cookies.clear()
return response