-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathceph_comm.py
83 lines (74 loc) · 2.84 KB
/
ceph_comm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import os
import io
import json
import boto3
import datetime
from io import BytesIO
import pandas as pd
class CephCommunication:
"""
Class to establish communication with a ceph s3 bucket.
It connects with the bucket and provides methods to read and write data in the parquet format.
"""
def __init__(
self, s3_endpoint_url, aws_access_key_id, aws_secret_access_key, s3_bucket
):
self.s3_endpoint_url = s3_endpoint_url
self.aws_access_key_id = aws_access_key_id
self.aws_secret_access_key = aws_secret_access_key
self.s3_resource = boto3.resource(
"s3",
endpoint_url=self.s3_endpoint_url,
aws_access_key_id=self.aws_access_key_id,
aws_secret_access_key=self.aws_secret_access_key,
)
self.bucket = s3_bucket
## Todo: Add try catch
def upload_to_ceph(self, dataframe, s3_path, filename):
"""
This helper function takes as input the data frame to be uploaded, and the output filename.
It then saves the dataframe in the defined ceph bucket.
"""
parquet_buffer = BytesIO()
dataframe.to_parquet(parquet_buffer)
s3_obj = self.s3_resource.Object(self.bucket, f"{s3_path}/{filename}")
status = s3_obj.put(Body=parquet_buffer.getvalue())
return status
def read_from_ceph(self, s3_path, filename):
"""
Helper function to read from ceph and see if the saved data is correct.
"""
buffer = BytesIO()
s3_object = self.s3_resource.Object(self.bucket, f"{s3_path}/{filename}")
s3_object.download_fileobj(buffer)
df_temp = pd.read_parquet(buffer)
return df_temp
def upload_to_ceph_as_json(self, dataframe, s3_path, filename):
"""
This helper function takes as input the data frame to be uploaded, and the output filename.
It then saves the dataframe in the defined ceph bucket as a json file.
"""
json_buffer = io.StringIO()
dataframe.to_json(json_buffer, orient='records')
s3_obj = self.s3_resource.Object(self.bucket, f"{s3_path}/{filename}")
status = s3_obj.put(Body=json_buffer.getvalue())
return status
def get_json_from_ceph(self, s3_path):
"""
Helper function to read json from ceph as a dataframe.
"""
output = []
content = self.s3_resource.Object(self.bucket, s3_path)
file = content.get()["Body"].read().decode("utf-8")
prs = json.loads(file)
pr_df = pd.DataFrame(prs)
return pr_df
def save_to_disk(dataframe, path, filename):
"""
Helper function to save the dataframe
as a parquet file to disk.
"""
dataset_base_path = Path(path)
dataset_base_path.mkdir(parents=True, exist_ok=True)
dataframe.to_parquet(f"{path}/{filename}")
return True