-
Notifications
You must be signed in to change notification settings - Fork 3
Conversation
02b0a09
to
49babf5
Compare
src/atc/delta/deltastream_handle.py
Outdated
checkpoint_path=tc.table_property(id, "checkpoint_path", None), | ||
trigger_type=tc.table_property(id, "trigger_type", None), | ||
trigger_time=tc.table_property(id, "trigger_time", None), | ||
await_termination=tc.table_property(id, "await_termination", None), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mrmasterplan please take a look on this part regarding the default types.
Note: the error in the pipeline is very likely caused by the awaittermination that does not work properly! |
self._checkpoint_path = checkpoint_path | ||
self._trigger_type = trigger_type.lower() if trigger_type else "availablenow" | ||
self._trigger_time = trigger_time.lower() if trigger_time else None | ||
self._awaitTermination = await_termination if await_termination else True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does not correspons to default value in ini
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe it shouldnt be the configurator that should determine this... It should perhaps be a input?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@JeppeBlixen please add your view on this.
if self._awaitTermination: | ||
writer.awaitTermination() | ||
|
||
def overwrite(self, df: DataFrame, mergeSchema: bool = None) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
consider making a new superclass for streaming purpose. Maybe new names for the methods? Since they behave in a another way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agree
writer = self._add_trigger_type(writer) | ||
|
||
if self._awaitTermination: | ||
writer.start().awaitTermination() # Consider removing awaitTermination |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be a class method instead @JeppeBlixen ?
location: str = None, | ||
trigger_type: str = "availablenow", | ||
trigger_time: str = None, | ||
await_termination=False, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove await_termination from here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should might not be controlled by the configurator but should perhapse be controlled by a class method.
checkpoint_path=tc.table_property(id, "checkpoint_path", None), | ||
trigger_type=tc.table_property(id, "trigger_type", ""), | ||
trigger_time=tc.table_property(id, "trigger_time", ""), | ||
await_termination=tc.table_property(id, "await_termination", ""), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove await termination from from_tc... ?
@@ -24,3 +24,6 @@ UpsertLoaderDb: | |||
UpsertLoaderDummy: | |||
name: "{UpsertLoaderDb}.Dummy" | |||
path: "{UpsertLoaderDb_path}/dummy" | |||
format: "delta" | |||
checkpoint_path: /tmp/checkpoints/_upsertcheckpoints | |||
await_termination: True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove await_termination?
@JeppeBlixen - me and @mrmasterplan discussed that we need to agree on how to use "AwaitTermination" in the streaming setup. |
@LauJohansson please consider adding a configurable trigger. In a unit test situation I would probably want to run the stream on a small subset of the rows in a table. Like a limit(100) or similar. Maybe the Classes should be prepared for this. Just an idea. |
@mrmasterplan - good idea. When the current PR works I will find a way to implement it. But we need this PR to work properly! |
Introduction
This feature introduces the Autoloader. https://docs.databricks.com/ingestion/auto-loader/index.html#
Furthermore, it introduces DeltaStreamHandle: https://docs.databricks.com/structured-streaming/index.html
💡 This PR is the first step for the ATC to implement streaming.
❓ Please be welcome to contribute to this PR if you are familiar in working with structured streaming
New features
Changes
drop_table
method.