Skip to content

Commit

Permalink
use create resource to upload databus file to project
Browse files Browse the repository at this point in the history
  • Loading branch information
msaipraneeth committed Sep 3, 2023
1 parent be52706 commit aa95baa
Showing 1 changed file with 32 additions and 20 deletions.
52 changes: 32 additions & 20 deletions cmem_plugin_databus/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,28 @@ def autocomplete(
]


class ResponseStream:
"""A Base class for producing messages from Dataset to a Kafka topic."""

def __enter__(self):
return self.read()

def __exit__(self, exc_type, exc_val, exc_tb):
self.close()

def __init__(self, response, chunk_size=1048576):
self.response = response
self.chunk_size = chunk_size

def read(self):
for _ in self.response.iter_content(chunk_size=self.chunk_size):
yield _

def close(self):
pass



@Plugin(
label="Simple Databus Loading Plugin",
description="Loads a specfic file from the Databus to a local directory",
Expand Down Expand Up @@ -276,28 +298,18 @@ def execute(
)
return

with databus_file_resp as resp:
for _, chunk in enumerate(resp.iter_content(chunk_size=self.chunk_size)):
data += bytearray(chunk)
desc = f"Downloading File {get_clock(_)}"
context.report.update(
ExecutionReport(
entity_count=len(data) // 1000000,
operation="load",
operation_desc=desc,
)
)
graph_uri = self.__get_graph_uri(context)
post_resp = post_streamed_bytes(
str(graph_uri),
byte_iterator_context_update(
bytes(data), context, self.chunk_size, "Uploading File"
),
replace=True,
upload_response = create_resource(
project_name=context.task.project_id(),
resource_name=self.target_file,
file_resource=ResponseStream(databus_file_resp),
replace=True
)
if post_resp.status_code < 400:
if upload_response.status_code < 400:
context.report.update(ExecutionReport(operation_desc="Upload Successful ✓"))
else:
context.report.update(
ExecutionReport(operation_desc="Upload Failed ❌", error=post_resp.text)
ExecutionReport(
operation_desc="Upload Failed ❌",
error=upload_response.text
)
)

0 comments on commit aa95baa

Please sign in to comment.