-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathauth.py
67 lines (51 loc) · 1.84 KB
/
auth.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import logging
import os
from fastapi import HTTPException, Depends, status
from fastapi.security import HTTPBasic, HTTPBasicCredentials
from pydantic import BaseModel
import motor.motor_asyncio
import bcrypt
class User(BaseModel):
username: str
password: str
class ResetPasswordRequest(BaseModel):
username: str
old_password: str
new_password: str
class DBCON(BaseModel):
DB_URL: str
DB_NAME: str
DB_USER: str
DB_PASSWORD: str
db_url = os.getenv('DB_URL')
db_name = os.getenv('DB_NAME')
db_user = os.getenv('DB_USER')
db_password = os.getenv('DB_PASSWORD')
client = motor.motor_asyncio.AsyncIOMotorClient(
db_url,
username=db_user,
password=db_password,
serverSelectionTimeoutMS=1000
)
db = client.administration
collection = db.user
security = HTTPBasic()
async def admincheck():
admin_user = await collection.find_one({"username": "admin"})
if admin_user is None:
logging.info('No Administrator detected, creating default admin account wih credentials admin:password.')
admin_password = "password"
encrypted_password = bcrypt.hashpw(admin_password.encode('utf-8'), bcrypt.gensalt())
new_admin = {
"username": "admin",
"password": encrypted_password.decode('utf-8'),
}
await collection.insert_one(new_admin)
async def get_current_user(credentials: HTTPBasicCredentials = Depends(security)):
user = await collection.find_one({"username": credentials.username})
if user is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found")
password_match = bcrypt.checkpw(credentials.password.encode('utf-8'), user['password'].encode('utf-8'))
if not password_match:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password")
return user