Skip to content
This repository has been archived by the owner on Aug 25, 2023. It is now read-only.

AutoloaderHandle and StreamingHandle #206

Closed
wants to merge 65 commits into from
Closed

Conversation

LauJohansson
Copy link
Contributor

@LauJohansson LauJohansson commented Nov 23, 2022

Introduction

This feature introduces the Autoloader. https://docs.databricks.com/ingestion/auto-loader/index.html#
Furthermore, it introduces DeltaStreamHandle: https://docs.databricks.com/structured-streaming/index.html

💡 This PR is the first step for the ATC to implement streaming.

Please be welcome to contribute to this PR if you are familiar in working with structured streaming

New features

  • AutoloaderStreamHandle
    • A handle class for reading data using Autoloader given any dataformat supported by the autoloader. The dataframes handled by this class is stream-only.
  • DeltaStreamHandle
    • An initial handle class for stream reading of delta files - and writing stream. The dataframes handled by this class is stream-only.

Changes

  • SparkHandle
    • DeltaHandle and DeltaStreamHandle now inherit from SparkHandle. A lot of the methods are the same across both classes such as the drop_table method.

@LauJohansson LauJohansson temporarily deployed to azure November 23, 2022 13:23 Inactive
checkpoint_path=tc.table_property(id, "checkpoint_path", None),
trigger_type=tc.table_property(id, "trigger_type", None),
trigger_time=tc.table_property(id, "trigger_time", None),
await_termination=tc.table_property(id, "await_termination", None),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mrmasterplan please take a look on this part regarding the default types.

@LauJohansson
Copy link
Contributor Author

Note: the error in the pipeline is very likely caused by the awaittermination that does not work properly!

self._checkpoint_path = checkpoint_path
self._trigger_type = trigger_type.lower() if trigger_type else "availablenow"
self._trigger_time = trigger_time.lower() if trigger_time else None
self._awaitTermination = await_termination if await_termination else True
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does not correspons to default value in ini

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe it shouldnt be the configurator that should determine this... It should perhaps be a input?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JeppeBlixen please add your view on this.

if self._awaitTermination:
writer.awaitTermination()

def overwrite(self, df: DataFrame, mergeSchema: bool = None) -> None:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider making a new superclass for streaming purpose. Maybe new names for the methods? Since they behave in a another way.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree

writer = self._add_trigger_type(writer)

if self._awaitTermination:
writer.start().awaitTermination() # Consider removing awaitTermination
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be a class method instead @JeppeBlixen ?

location: str = None,
trigger_type: str = "availablenow",
trigger_time: str = None,
await_termination=False,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove await_termination from here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should might not be controlled by the configurator but should perhapse be controlled by a class method.

checkpoint_path=tc.table_property(id, "checkpoint_path", None),
trigger_type=tc.table_property(id, "trigger_type", ""),
trigger_time=tc.table_property(id, "trigger_time", ""),
await_termination=tc.table_property(id, "await_termination", ""),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove await termination from from_tc... ?

@@ -24,3 +24,6 @@ UpsertLoaderDb:
UpsertLoaderDummy:
name: "{UpsertLoaderDb}.Dummy"
path: "{UpsertLoaderDb_path}/dummy"
format: "delta"
checkpoint_path: /tmp/checkpoints/_upsertcheckpoints
await_termination: True
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove await_termination?

@LauJohansson
Copy link
Contributor Author

LauJohansson commented Jan 4, 2023

@JeppeBlixen - me and @mrmasterplan discussed that we need to agree on how to use "AwaitTermination" in the streaming setup.

@mrmasterplan
Copy link
Contributor

@LauJohansson please consider adding a configurable trigger. In a unit test situation I would probably want to run the stream on a small subset of the rows in a table. Like a limit(100) or similar. Maybe the Classes should be prepared for this. Just an idea.

@LauJohansson
Copy link
Contributor Author

@mrmasterplan - good idea. When the current PR works I will find a way to implement it. But we need this PR to work properly!

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

AutoLoader framework
2 participants