-
Notifications
You must be signed in to change notification settings - Fork 0
/
dremio_api.py
98 lines (88 loc) · 3.55 KB
/
dremio_api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
import logging
import requests
logger = logging.getLogger(__name__)
class DremioAPI:
def __init__(self, dremio_pat, dremio_url, timeout=10, verify=False):
self.dremio_url = dremio_url
self.timeout = timeout
self.verify = verify
self.headers = {
'Content-Type': 'application/json',
'Authorization': 'Bearer ' + dremio_pat
}
def get_dataset_id(self, dataset: str):
dataset_path = dataset.replace(".","/").replace('"','')
url = self.dremio_url + '/api/v3/catalog/by-path/' + dataset_path
logger.info(f"Getting ID of {dataset}")
response = requests.request("GET", url, headers=self.headers, timeout=self.timeout, verify=self.verify)
data = response.json()
try:
dataset_id = data["id"]
except KeyError:
logger.warning(data)
logger.warning(f"Dataset ID for {dataset_path} not found")
dataset_id = ""
return dataset_id
def get_catalog(self, catalog_id=""):
response = requests.get(
self.dremio_url + f'/api/v3/catalog/{catalog_id}',
headers=self.headers, timeout=self.timeout, verify=self.verify
)
data = response.json()
return data
def get_query_info(self, job_id: str):
logger.debug("Waiting for job completion...")
while True:
response = requests.get(
self.dremio_url + '/api/v3/job/' + job_id,
headers=self.headers,
timeout=self.timeout,
verify=self.verify
)
data = response.json()
job_state = data['jobState']
if job_state == 'COMPLETED':
logger.debug('Job successful')
break
elif job_state in {"CANCELED", "FAILED"}:
status = job_state + " - " + data.get("errorMessage", "")
logger.debug(status)
break
return job_state
def post_sql_query(self, sql: str):
logger.info(sql)
response = requests.post(
self.dremio_url + '/api/v3/sql',
headers=self.headers,
json={"sql": sql },
timeout=self.timeout, verify=self.verify
)
job_id = response.json()['id']
self.get_query_info(job_id)
return job_id
def get_query_data(self, job_id: str, limit=500) -> dict:
job_state = self.get_query_info(job_id)
if job_state == 'COMPLETED':
rows = []
new_rows = ['init_dummy']
current_offset = 0
job_results_json = {}
while len(new_rows) > 0:
page = 'offset=' + str(current_offset) + '&limit=' + str(limit)
logger.debug("Paging " + page)
job_results = requests.get(
self.dremio_url + '/api/v3/job/' + job_id + '/results?' + page,
headers=self.headers,
timeout=self.timeout,
verify=self.verify
)
if job_results.status_code != 200:
raise Exception(f'Error - {job_results.text}')
job_results_json = job_results.json()
new_rows = job_results_json['rows']
current_offset += len(new_rows)
rows.extend(new_rows)
columns = job_results_json.get('columns', [])
return {"rows": rows, "columns": columns}
else:
raise Exception(f'Query data could not be retrieved - Incorrect Job State: {job_state}')