Skip to content

Commit

Permalink
Merge pull request #259 from prefeitura-rio/development
Browse files Browse the repository at this point in the history
Versão 2.3
  • Loading branch information
TanookiVerde authored Nov 8, 2024
2 parents 82f8dae + 40000fe commit b937b8f
Show file tree
Hide file tree
Showing 7 changed files with 164 additions and 131 deletions.
2 changes: 1 addition & 1 deletion app/datalake/uploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ async def _upload_as_native_table(
raise ValueError(
f"Partition column '{date_partition_column}' not found in DataFrame columns"
)

dataframe["data_particao"] = pd.to_datetime(dataframe[date_partition_column])
job_config_params["time_partitioning"] = bigquery.TimePartitioning(
type_=bigquery.TimePartitioningType.DAY,
Expand Down
13 changes: 8 additions & 5 deletions app/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,14 @@ class PermitionEnum(str, Enum):
HCI_FULL_PERMITION = "full_permition"


class LoginErrorEnum(str, Enum):
BAD_CREDENTIALS = "bad_credentials"
BAD_OTP = "bad_otp"
INACTIVE_EMPLOYEE = "inactive_employee"
REQUIRE_2FA = "require_2fa"
class LoginStatusEnum(str, Enum):
USER_NOT_FOUND = "user_not_found" # User don't exist in the DB
BAD_CREDENTIALS = "bad_credentials" # User exist but the password is wrong
REQUIRE_2FA = "require_2fa" # User exist and the password is correct, but 2FA is required
BAD_OTP = "bad_otp" # User exist and the password is correct, but the OTP is wrong
INACTIVE_EMPLOYEE = "inactive_employee" # User exist but is not an active employee
SUCCESS = "success" # User exist, password and OTP are correct


class AcceptTermsEnum(str, Enum):
SUCCESS = "success"
Expand Down
175 changes: 58 additions & 117 deletions app/routers/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,18 @@
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from fastapi.responses import StreamingResponse,JSONResponse
from loguru import logger

from app import config
from app.models import User
from app.types.frontend import LoginFormWith2FA, LoginForm
from app.types.pydantic_models import Token, Enable2FA
from app.utils import authenticate_user, generate_user_token, read_bq
from app.types.pydantic_models import Token
from app.utils import authenticate_user, generate_user_token
from app.security import TwoFactorAuth
from app.dependencies import assert_user_is_active
from app.enums import LoginErrorEnum
from app.enums import LoginStatusEnum
from app.types.errors import (
AuthenticationErrorModel
)
from app.config import (
BIGQUERY_ERGON_TABLE_ID,
)

router = APIRouter(prefix="/auth", tags=["Autenticação"])

Expand All @@ -35,27 +32,20 @@ async def login_without_2fa(
form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
) -> Token:

user = await authenticate_user(form_data.username, form_data.password)
if not user:
return JSONResponse(
status_code=401,
content={
"message": "Incorrect Username or Password",
"type": LoginErrorEnum.BAD_CREDENTIALS,
},
)
login_result = await authenticate_user(form_data.username, form_data.password)
logger.info(f"login_result: {login_result['status']}")

if user.is_2fa_required:
if login_result['status'] != LoginStatusEnum.SUCCESS:
return JSONResponse(
status_code=401,
content={
"message": "2FA required. Use the /2fa/login/ endpoint",
"type": LoginErrorEnum.REQUIRE_2FA,
"message": "Something went wrong",
"type": login_result['status'],
},
)

return {
"access_token": generate_user_token(user),
"access_token": generate_user_token(login_result['user']),
"token_type": "bearer",
"token_expire_minutes": int(config.JWT_ACCESS_TOKEN_EXPIRE_MINUTES),
}
Expand All @@ -71,17 +61,23 @@ async def login_without_2fa(
async def is_2fa_active(
form_data: LoginForm,
) -> bool:
user = await authenticate_user(form_data.username, form_data.password)
if not user:
login_result = await authenticate_user(form_data.username, form_data.password)
logger.info(f"login_result: {login_result['status']}")

if login_result['status'] in [
LoginStatusEnum.USER_NOT_FOUND,
LoginStatusEnum.BAD_CREDENTIALS,
LoginStatusEnum.INACTIVE_EMPLOYEE,
]:
return JSONResponse(
status_code=401,
content={
"message": "Incorrect Username or Password",
"type": LoginErrorEnum.BAD_CREDENTIALS,
"message": "Something went wrong",
"type": login_result['status'],
},
)

return user.is_2fa_activated
return login_result['user'].is_2fa_activated


@router.post(
Expand All @@ -95,95 +91,31 @@ async def login_with_2fa(
form_data: LoginFormWith2FA,
) -> Token:

user = await authenticate_user(form_data.username, form_data.password)
if not user:
login_result = await authenticate_user(
form_data.username,
form_data.password,
form_data.totp_code,
)
logger.info(f"login_result: {login_result['status']}")

if login_result['status'] == LoginStatusEnum.SUCCESS:
login_result['user'].is_2fa_activated = True
await login_result['user'].save()

return {
"access_token": generate_user_token(login_result['user']),
"token_type": "bearer",
"token_expire_minutes": int(config.JWT_ACCESS_TOKEN_EXPIRE_MINUTES),
}
else:
return JSONResponse(
status_code=401,
content={
"message": "Incorrect Username or Password",
"type": LoginErrorEnum.BAD_CREDENTIALS,
"message": "Something went wrong",
"type": login_result['status'],
},
)

# ----------------------------------------
# 2FA Verification
# ----------------------------------------
secret_key = await TwoFactorAuth.get_or_create_secret_key(user)
two_factor_auth = TwoFactorAuth(user, secret_key)

is_valid = two_factor_auth.verify_totp_code(form_data.totp_code)
if not is_valid:
return JSONResponse(
status_code=401,
content={
"message": "Incorrect OTP",
"type": LoginErrorEnum.BAD_OTP,
},
)
if not user.is_2fa_activated:
user.is_2fa_activated = True
await user.save()

# ----------------------------------------
# Validate access status in ERGON database
# ----------------------------------------
if user.is_ergon_validation_required:
is_active_employee = False
ergon_register = await read_bq(
f"""
SELECT *
FROM {BIGQUERY_ERGON_TABLE_ID}
WHERE cpf_particao = {user.cpf}
""",
from_file="/tmp/credentials.json",
)
# If has no ERGON register: Unauthorized
if len(ergon_register) == 0:
is_active_employee = False
# If has ERGON register and status_ativo is false: Unauthorized
elif ergon_register[0].get("status_ativo", False) is False:
is_active_employee = False
# If has ERGON register and status_ativo is true: Unauthorized
else:
is_active_employee = True

if not is_active_employee:
return JSONResponse(
status_code=401,
content={
"message": "User is not an active employee",
"type": LoginErrorEnum.INACTIVE_EMPLOYEE,
},
)

return {
"access_token": generate_user_token(user),
"token_type": "bearer",
"token_expire_minutes": int(config.JWT_ACCESS_TOKEN_EXPIRE_MINUTES),
}


@router.post(
"/2fa/enable/",
response_model=Enable2FA,
responses={
400: {"model": str}
}
)
async def enable_2fa(
current_user: Annotated[User, Depends(assert_user_is_active)],
) -> Enable2FA:
if current_user.is_2fa_activated:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="2FA already enabled",
)

secret_key = await TwoFactorAuth.get_or_create_secret_key(current_user)
two_factor_auth = TwoFactorAuth(current_user, secret_key)

return {"secret_key": two_factor_auth.secret_key}


@router.post(
"/2fa/generate-qrcode/",
Expand All @@ -197,22 +129,31 @@ async def enable_2fa(
async def generate_qrcode(
form_data: LoginForm,
) -> bytes:
current_user = await authenticate_user(form_data.username, form_data.password)
if not current_user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
login_result = await authenticate_user(form_data.username, form_data.password)
logger.info(f"login_result: {login_result['status']}")


if login_result['status'] in [
LoginStatusEnum.USER_NOT_FOUND,
LoginStatusEnum.BAD_CREDENTIALS,
LoginStatusEnum.INACTIVE_EMPLOYEE,
]:
return JSONResponse(
status_code=401,
content={
"message": "Something went wrong",
"type": login_result['status'],
},
)

if current_user.is_2fa_activated:
if login_result['user'].is_2fa_activated:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="2FA already activated using QR Code",
)

secret_key = await TwoFactorAuth.get_or_create_secret_key(current_user)
two_factor_auth = TwoFactorAuth(current_user, secret_key)
secret_key = await TwoFactorAuth.get_or_create_secret_key(login_result['user'])
two_factor_auth = TwoFactorAuth(login_result['user'], secret_key)

qr_code = two_factor_auth.qr_code
if qr_code is None:
Expand Down
4 changes: 2 additions & 2 deletions app/types/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@
from pydantic import BaseModel

from app.enums import (
LoginErrorEnum,
LoginStatusEnum,
AccessErrorEnum,
AcceptTermsEnum
)


class AuthenticationErrorModel(BaseModel):
message: str
type: LoginErrorEnum
type: LoginStatusEnum


class AccessErrorModel(BaseModel):
Expand Down
2 changes: 2 additions & 0 deletions app/types/frontend.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ class Encounter(BaseModel):
clinical_exams: List[ClinicalExam]
procedures: Optional[str]
filter_tags: List[str]
prescription: Optional[str]
provider: Optional[str]


class UserInfo(BaseModel):
Expand Down
Loading

0 comments on commit b937b8f

Please sign in to comment.