Skip to content
This repository has been archived by the owner on Aug 25, 2023. It is now read-only.

AutoloaderHandle and StreamingHandle #206

Closed
wants to merge 65 commits into from
Closed
Show file tree
Hide file tree
Changes from 64 commits
Commits
Show all changes
65 commits
Select commit Hold shift + click to select a range
1b1bf14
feat: initial autoloader handle
LauJohansson Nov 23, 2022
e3c9575
add better upsert logic
LauJohansson Nov 23, 2022
b22849c
stop all streams and remove checkpoints
LauJohansson Nov 23, 2022
ff40da0
more tests
LauJohansson Nov 23, 2022
f6348da
common spark handle class
LauJohansson Nov 23, 2022
6a35eae
skip autoloader tests first
LauJohansson Nov 23, 2022
6f97e0b
linting
LauJohansson Nov 23, 2022
ae2a486
add autoloader tests
LauJohansson Nov 23, 2022
f6212ba
add comment regarding awaitTermination
LauJohansson Nov 23, 2022
9ec8372
add await termination
LauJohansson Nov 24, 2022
d4ea6c1
file exists except all
LauJohansson Nov 24, 2022
3511be3
fix: use delta handle to put data in table
LauJohansson Nov 24, 2022
fe0b38d
use delta format when delta
LauJohansson Nov 24, 2022
850864f
remove self read in write method
LauJohansson Nov 24, 2022
eab53eb
cannot count on read. use isstreaming
LauJohansson Nov 24, 2022
8a0b917
cehck straming df and use 10.4 spark
LauJohansson Nov 24, 2022
a4e0744
fix cache test
LauJohansson Nov 24, 2022
922a253
skip cache test
LauJohansson Nov 24, 2022
b342836
feat: update tests
LauJohansson Nov 25, 2022
669dce2
fix: missing names in configs test
LauJohansson Nov 25, 2022
4b1671d
use debug tables
LauJohansson Nov 25, 2022
47ffd45
manual set databricks token
LauJohansson Nov 25, 2022
966f7f1
Merge branch 'main' into feature/autoloader
LauJohansson Nov 25, 2022
873a3db
overwritable return only None
LauJohansson Nov 25, 2022
f109eba
flake8
LauJohansson Nov 25, 2022
b49f77c
checkpoints removement
LauJohansson Nov 25, 2022
bcbed26
feat: add avrofiles using write avro
LauJohansson Nov 28, 2022
fef7699
fix autoloader tests
LauJohansson Nov 28, 2022
85941c1
simplify view creation
LauJohansson Nov 28, 2022
9a2d37b
outcomment optimization part of upsert
LauJohansson Nov 28, 2022
09c059a
fix upsertloader test
LauJohansson Nov 28, 2022
199c646
linting
LauJohansson Nov 28, 2022
4a5681e
skip flaky tests
LauJohansson Nov 29, 2022
db2eda1
use same source view in test
LauJohansson Nov 29, 2022
de4c5cf
use append. not overwrite
LauJohansson Nov 29, 2022
32cd63e
only use one source table
LauJohansson Nov 29, 2022
ad10d43
add tableid to class
LauJohansson Nov 29, 2022
0926d90
refactor: renaming and documentation
LauJohansson Nov 30, 2022
c2bdd58
validate delta methods
LauJohansson Nov 30, 2022
9d507e1
add new deltastream class
LauJohansson Nov 30, 2022
2671c9c
split deltastreamhandle and autoloader
LauJohansson Nov 30, 2022
38f79f3
test deltastreamhanle on sink
LauJohansson Nov 30, 2022
dee2f15
create database
LauJohansson Nov 30, 2022
08c46c3
remove path from test table
LauJohansson Nov 30, 2022
7439ccc
validate checkpointpath correct
LauJohansson Nov 30, 2022
839f426
overwrite avro data
LauJohansson Dec 1, 2022
da711f8
delta stream test add delta format
LauJohansson Dec 1, 2022
bb62539
remove autoloader from streamdelta tests
LauJohansson Dec 1, 2022
d6f5d8e
remove avro related tests for delta
LauJohansson Dec 1, 2022
bf48f38
use assert equal
LauJohansson Dec 1, 2022
ecc3ee4
test awaittermination for better testing
LauJohansson Dec 1, 2022
72962ad
rescued data
LauJohansson Dec 1, 2022
3e2c656
tbl4 already exists
LauJohansson Dec 1, 2022
b95b5ee
add format delta
LauJohansson Dec 1, 2022
427d971
feat: await_termination optional
LauJohansson Dec 15, 2022
539638c
Merge branch 'main' into feature/autoloader
LauJohansson Dec 15, 2022
21dfd37
revert to 9.1 testing
LauJohansson Dec 16, 2022
549becf
Merge branch 'main' into feature/autoloader
LauJohansson Dec 16, 2022
ab5e640
use tests again
LauJohansson Dec 16, 2022
9a823cb
assert on spark version
LauJohansson Dec 16, 2022
e404770
only assert on init
LauJohansson Dec 16, 2022
49babf5
fix default await input
LauJohansson Dec 17, 2022
7db4820
fix cls for stream
LauJohansson Dec 17, 2022
9784733
use eh tests again
LauJohansson Dec 19, 2022
9afa177
remove set databricks cfg manual
LauJohansson Jan 4, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions .github/submit/Set-DatabricksCfgManual.ps1
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
param (
# This helper function can set the databricks cfg manually
[Parameter(Mandatory=$True)]
[ValidateNotNullOrEmpty()]
[string]
$workspaceUrl,

[Parameter(Mandatory=$True)]
[ValidateNotNullOrEmpty()]
[string]
$token
)


Set-Content ~/.databrickscfg "[DEFAULT]"
Add-Content ~/.databrickscfg "host = https://$workspaceUrl"
Add-Content ~/.databrickscfg "token = $token"
Add-Content ~/.databrickscfg ""
2 changes: 2 additions & 0 deletions src/atc/delta/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
from .autoloaderstream_handle import AutoloaderStreamHandle # noqa: F401
from .db_handle import DbHandle # noqa: F401
from .delta_handle import DeltaHandle # noqa: F401
from .deltastream_handle import DeltaStreamHandle # noqa: F401
74 changes: 74 additions & 0 deletions src/atc/delta/autoloaderstream_handle.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
from pyspark.sql import DataFrame

from atc.configurator.configurator import Configurator
from atc.spark import Spark
from atc.tables import TableHandle
from atc.tables.SparkHandle import DeltaHandleInvalidFormat


class AutoloaderStreamHandle(TableHandle):
def __init__(
self,
*,
location: str,
checkpoint_path: str,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mrmasterplan we need to ensure that the checkpoint path is the same as the write checkpoint path: https://docs.databricks.com/getting-started/etl-quick-start.html#auto-loader

data_format: str,
):
"""
location: the location of the delta table

checkpoint_path: The location of the checkpoints, <table_name>/_checkpoints
The Delta Lake VACUUM function removes all files not managed by Delta Lake
but skips any directories that begin with _. You can safely store
checkpoints alongside other data and metadata for a Delta table
using a directory structure such as <table_name>/_checkpoints
See: https://docs.databricks.com/structured-streaming/delta-lake.html

data_format: the data format of the files that are read

"""

assert (
Spark.version() >= Spark.DATABRICKS_RUNTIME_10_4
), f"AutoloaderStreamHandle not available for Spark version {Spark.version()}"

self._location = location
self._data_format = data_format
self._checkpoint_path = checkpoint_path

self._validate()
self._validate_checkpoint()

@classmethod
def from_tc(cls, id: str) -> "AutoloaderStreamHandle":
tc = Configurator()
return cls(
location=tc.table_property(id, "path", None),
data_format=tc.table_property(id, "format", None),
checkpoint_path=tc.table_property(id, "checkpoint_path", None),
)

def _validate(self):
"""Validates that the name is either db.table or just table."""
if self._data_format == "delta":
raise DeltaHandleInvalidFormat("Use DeltaStreamHandle for delta.")

def _validate_checkpoint(self):
if "/_" not in self._checkpoint_path:
print(
"RECOMMENDATION: You can safely store checkpoints alongside "
"other data and metadata for a Delta table using a directory "
"structure such as <table_name>/_checkpoints"
)

def read(self) -> DataFrame:

reader = (
Spark.get()
.readStream.format("cloudFiles")
.option("cloudFiles.format", self._data_format)
.option("cloudFiles.schemaLocation", self._checkpoint_path)
.load(self._location)
)

return reader
101 changes: 4 additions & 97 deletions src/atc/delta/delta_handle.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,19 @@
from typing import List, Optional, Union
from typing import List, Union

from pyspark.sql import DataFrame

from atc.configurator.configurator import Configurator
from atc.exceptions import AtcException
from atc.functions import get_unique_tempview_name, init_dbutils
from atc.spark import Spark
from atc.tables.TableHandle import TableHandle
from atc.tables.SparkHandle import SparkHandle
from atc.utils.CheckDfMerge import CheckDfMerge
from atc.utils.GetMergeStatement import GetMergeStatement


class DeltaHandleException(AtcException):
pass


class DeltaHandleInvalidName(DeltaHandleException):
pass


class DeltaHandleInvalidFormat(DeltaHandleException):
pass


class DeltaHandle(TableHandle):
class DeltaHandle(SparkHandle):
def __init__(self, name: str, location: str = None, data_format: str = "delta"):
self._name = name
self._location = location
self._data_format = data_format

self._partitioning: Optional[List[str]] = None
super().__init__(name, location, data_format)

self._validate()

Expand All @@ -42,29 +26,6 @@ def from_tc(cls, id: str) -> "DeltaHandle":
data_format=tc.table_property(id, "format", "delta"),
)

def _validate(self):
"""Validates that the name is either db.table or just table."""
if not self._name:
if not self._location:
raise DeltaHandleInvalidName(
"Cannot create DeltaHandle without name or path"
)
self._name = f"delta.`{self._location}`"
else:
name_parts = self._name.split(".")
if len(name_parts) == 1:
self._db = None
self._table_name = name_parts[0]
elif len(name_parts) == 2:
self._db = name_parts[0]
self._table_name = name_parts[1]
else:
raise DeltaHandleInvalidName(f"Could not parse name {self._name}")

# only format delta is supported.
if self._data_format != "delta":
raise DeltaHandleInvalidFormat("Only format delta is supported.")

def read(self) -> DataFrame:
"""Read table by path if location is given, otherwise from name."""
if self._location:
Expand Down Expand Up @@ -102,60 +63,6 @@ def drop_and_delete(self) -> None:
if self._location:
init_dbutils().fs.rm(self._location, True)

def create_hive_table(self) -> None:
sql = f"CREATE TABLE IF NOT EXISTS {self._name} "
if self._location:
sql += f" USING DELTA LOCATION '{self._location}'"
Spark.get().sql(sql)

def recreate_hive_table(self):
self.drop()
self.create_hive_table()

def get_partitioning(self):
"""The result of DESCRIBE TABLE tablename is like this:
+-----------------+---------------+-------+
| col_name| data_type|comment|
+-----------------+---------------+-------+
| mycolA| string| |
| myColB| int| |
| | | |
| # Partitioning| | |
| Part 0| mycolA| |
+-----------------+---------------+-------+
but this method return the partitioning in the form ['mycolA'],
if there is no partitioning, an empty list is returned.
"""
if self._partitioning is None:
# create an iterator object and use it in two steps
rows_iter = iter(
Spark.get().sql(f"DESCRIBE TABLE {self.get_tablename()}").collect()
)

# roll over the iterator until you see the title line
for row in rows_iter:
# discard rows until the important section header
if row.col_name.strip() == "# Partitioning":
break
# at this point, the iterator has moved past the section heading
# leaving only the rows with "Part 1" etc.

# create a list from the rest of the iterator like [(0,colA), (1,colB)]
parts = [
(int(row.col_name[5:]), row.data_type)
for row in rows_iter
if row.col_name.startswith("Part ")
]
# sort, just in case the parts were out of order.
parts.sort()

# discard the index and put into an ordered list.
self._partitioning = [p[1] for p in parts]
return self._partitioning

def get_tablename(self) -> str:
return self._name

def upsert(
self,
df: DataFrame,
Expand Down
Loading