-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
99 lines (81 loc) · 2.98 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel
from ipaddress import IPv4Address
from tasks import doDNSUpdate, waitTTLsecs, blackHole
from jose import JWTError, jwt
from datetime import datetime, timedelta
from typing import Optional
from config import *
app = FastAPI()
class Data(BaseModel):
name: str
prim_IP: IPv4Address
subst_IP: IPv4Address
tsig_keyname: str
tsig_secret: str
ddos: bool = True
def planttasks(data, background_tasks):
# 3 tasks:
# 1 - DNS Update
# 2 - wait for TTL (300) secs
# 3 - make a scrapli call to RTHB prim_IP on the router
background_tasks.add_task(doDNSUpdate, data=data)
background_tasks.add_task(waitTTLsecs, ttl=TTL)
background_tasks.add_task(blackHole, ip=data.prim_IP, ddos=data.ddos)
# data example:
# data = { "name": "acs",
# "prim_IP": "195.230.111.106",
# "subst_IP": "82.202.189.51",
# "tsig_keyname": "acs.key",
# "tsig_secret": "fKwttnpfMaD10CKh0/QqV13sBiGUvRDtRTLbwTdxpbw=",
# "ddos": True
# }
# cover/uncover (depends on ddos: True/False) a service described with the data object
@app.post("/api/ddosornotddos")
async def ddosornotddos(data: Data, background_tasks: BackgroundTasks):
try:
planttasks(data, background_tasks)
return "tasks planted successfully"
except:
return ":x: something went wrong"
# cover/uncover a service described with a data object wich is encoded in a JWT token
@app.get("/api/ddosornotddosjwt")
async def ddosornotddosjwt(token: str, background_tasks: BackgroundTasks):
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
del payload["exp"]
data = Data(**payload)
try:
planttasks(data, background_tasks)
return ":robot: tasks planted successfully"
except:
return ":x: something went wrong"
except:
return ":x: token is not valid"
## JWT helper function
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
# takes a data object and returns a JWT token
# it then can be used in the ddosornotddosjwt function (api hook)
@app.post("/api/gettoken")
def gettoken(data: Data):
newdata = {
"name": data.name,
"prim_IP": str(data.prim_IP),
"subst_IP": str(data.subst_IP),
"tsig_keyname": data.tsig_keyname,
"tsig_secret": data.tsig_secret,
"ddos": data.ddos,
}
access_token_expires = timedelta(days=ACCESS_TOKEN_EXPIRE_DAYS)
access_token = create_access_token(
data=newdata, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}