This repository has been archived by the owner on Aug 25, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 3
AutoloaderHandle and StreamingHandle #206
Closed
Closed
Changes from 64 commits
Commits
Show all changes
65 commits
Select commit
Hold shift + click to select a range
1b1bf14
feat: initial autoloader handle
LauJohansson e3c9575
add better upsert logic
LauJohansson b22849c
stop all streams and remove checkpoints
LauJohansson ff40da0
more tests
LauJohansson f6348da
common spark handle class
LauJohansson 6a35eae
skip autoloader tests first
LauJohansson 6f97e0b
linting
LauJohansson ae2a486
add autoloader tests
LauJohansson f6212ba
add comment regarding awaitTermination
LauJohansson 9ec8372
add await termination
LauJohansson d4ea6c1
file exists except all
LauJohansson 3511be3
fix: use delta handle to put data in table
LauJohansson fe0b38d
use delta format when delta
LauJohansson 850864f
remove self read in write method
LauJohansson eab53eb
cannot count on read. use isstreaming
LauJohansson 8a0b917
cehck straming df and use 10.4 spark
LauJohansson a4e0744
fix cache test
LauJohansson 922a253
skip cache test
LauJohansson b342836
feat: update tests
LauJohansson 669dce2
fix: missing names in configs test
LauJohansson 4b1671d
use debug tables
LauJohansson 47ffd45
manual set databricks token
LauJohansson 966f7f1
Merge branch 'main' into feature/autoloader
LauJohansson 873a3db
overwritable return only None
LauJohansson f109eba
flake8
LauJohansson b49f77c
checkpoints removement
LauJohansson bcbed26
feat: add avrofiles using write avro
LauJohansson fef7699
fix autoloader tests
LauJohansson 85941c1
simplify view creation
LauJohansson 9a2d37b
outcomment optimization part of upsert
LauJohansson 09c059a
fix upsertloader test
LauJohansson 199c646
linting
LauJohansson 4a5681e
skip flaky tests
LauJohansson db2eda1
use same source view in test
LauJohansson de4c5cf
use append. not overwrite
LauJohansson 32cd63e
only use one source table
LauJohansson ad10d43
add tableid to class
LauJohansson 0926d90
refactor: renaming and documentation
LauJohansson c2bdd58
validate delta methods
LauJohansson 9d507e1
add new deltastream class
LauJohansson 2671c9c
split deltastreamhandle and autoloader
LauJohansson 38f79f3
test deltastreamhanle on sink
LauJohansson dee2f15
create database
LauJohansson 08c46c3
remove path from test table
LauJohansson 7439ccc
validate checkpointpath correct
LauJohansson 839f426
overwrite avro data
LauJohansson da711f8
delta stream test add delta format
LauJohansson bb62539
remove autoloader from streamdelta tests
LauJohansson d6f5d8e
remove avro related tests for delta
LauJohansson bf48f38
use assert equal
LauJohansson ecc3ee4
test awaittermination for better testing
LauJohansson 72962ad
rescued data
LauJohansson 3e2c656
tbl4 already exists
LauJohansson b95b5ee
add format delta
LauJohansson 427d971
feat: await_termination optional
LauJohansson 539638c
Merge branch 'main' into feature/autoloader
LauJohansson 21dfd37
revert to 9.1 testing
LauJohansson 549becf
Merge branch 'main' into feature/autoloader
LauJohansson ab5e640
use tests again
LauJohansson 9a823cb
assert on spark version
LauJohansson e404770
only assert on init
LauJohansson 49babf5
fix default await input
LauJohansson 7db4820
fix cls for stream
LauJohansson 9784733
use eh tests again
LauJohansson 9afa177
remove set databricks cfg manual
LauJohansson File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
param ( | ||
# This helper function can set the databricks cfg manually | ||
[Parameter(Mandatory=$True)] | ||
[ValidateNotNullOrEmpty()] | ||
[string] | ||
$workspaceUrl, | ||
|
||
[Parameter(Mandatory=$True)] | ||
[ValidateNotNullOrEmpty()] | ||
[string] | ||
$token | ||
) | ||
|
||
|
||
Set-Content ~/.databrickscfg "[DEFAULT]" | ||
Add-Content ~/.databrickscfg "host = https://$workspaceUrl" | ||
Add-Content ~/.databrickscfg "token = $token" | ||
Add-Content ~/.databrickscfg "" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,2 +1,4 @@ | ||
from .autoloaderstream_handle import AutoloaderStreamHandle # noqa: F401 | ||
from .db_handle import DbHandle # noqa: F401 | ||
from .delta_handle import DeltaHandle # noqa: F401 | ||
from .deltastream_handle import DeltaStreamHandle # noqa: F401 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,74 @@ | ||
from pyspark.sql import DataFrame | ||
|
||
from atc.configurator.configurator import Configurator | ||
from atc.spark import Spark | ||
from atc.tables import TableHandle | ||
from atc.tables.SparkHandle import DeltaHandleInvalidFormat | ||
|
||
|
||
class AutoloaderStreamHandle(TableHandle): | ||
def __init__( | ||
self, | ||
*, | ||
location: str, | ||
checkpoint_path: str, | ||
data_format: str, | ||
): | ||
""" | ||
location: the location of the delta table | ||
|
||
checkpoint_path: The location of the checkpoints, <table_name>/_checkpoints | ||
The Delta Lake VACUUM function removes all files not managed by Delta Lake | ||
but skips any directories that begin with _. You can safely store | ||
checkpoints alongside other data and metadata for a Delta table | ||
using a directory structure such as <table_name>/_checkpoints | ||
See: https://docs.databricks.com/structured-streaming/delta-lake.html | ||
|
||
data_format: the data format of the files that are read | ||
|
||
""" | ||
|
||
assert ( | ||
Spark.version() >= Spark.DATABRICKS_RUNTIME_10_4 | ||
), f"AutoloaderStreamHandle not available for Spark version {Spark.version()}" | ||
|
||
self._location = location | ||
self._data_format = data_format | ||
self._checkpoint_path = checkpoint_path | ||
|
||
self._validate() | ||
self._validate_checkpoint() | ||
|
||
@classmethod | ||
def from_tc(cls, id: str) -> "AutoloaderStreamHandle": | ||
tc = Configurator() | ||
return cls( | ||
location=tc.table_property(id, "path", None), | ||
data_format=tc.table_property(id, "format", None), | ||
checkpoint_path=tc.table_property(id, "checkpoint_path", None), | ||
) | ||
|
||
def _validate(self): | ||
"""Validates that the name is either db.table or just table.""" | ||
if self._data_format == "delta": | ||
raise DeltaHandleInvalidFormat("Use DeltaStreamHandle for delta.") | ||
|
||
def _validate_checkpoint(self): | ||
if "/_" not in self._checkpoint_path: | ||
print( | ||
"RECOMMENDATION: You can safely store checkpoints alongside " | ||
"other data and metadata for a Delta table using a directory " | ||
"structure such as <table_name>/_checkpoints" | ||
) | ||
|
||
def read(self) -> DataFrame: | ||
|
||
reader = ( | ||
Spark.get() | ||
.readStream.format("cloudFiles") | ||
.option("cloudFiles.format", self._data_format) | ||
.option("cloudFiles.schemaLocation", self._checkpoint_path) | ||
.load(self._location) | ||
) | ||
|
||
return reader |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mrmasterplan we need to ensure that the checkpoint path is the same as the write checkpoint path: https://docs.databricks.com/getting-started/etl-quick-start.html#auto-loader