forked from astronomer/airflow-dbcleanup-plugin
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutils.py
26 lines (22 loc) · 729 Bytes
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from flask import Response
import os
import json
class env_check(object):
def __init__(self, env_var):
self.env_var = env_var
def __call__(self, f):
def g(*args, **kwargs):
res = {
"jobStatus": "failed",
"statusCode": 501,
"message": "This feature is only supported on Astronomer Software and Astronomer Nebula",
}
response = Response(json.dumps(res), mimetype="application/json")
response.status_code = 501
return response
if (
os.environ.get(self.env_var) is None
or os.environ.get(self.env_var) == "software"
):
return f
return g