Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Edge automation and capability sync #8

Merged
merged 3 commits into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ jobs:
run: |
python -m pip install -r requirements.txt
python -m pip install --upgrade pip setuptools wheel
python -m pip install --upgrade flake8 mypy types-requests
python -m pip install --upgrade flake8 mypy types-requests types-paramiko
python -m pip install -e .

- name: "Run mypy and flake8 for ${{ matrix.python-version }}"
Expand Down
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
## [1.5.0] - 2024-07-23

### Changed

- Helper functionality and example to automate upload to Edge, trigger edgecli command and synchronize the capability

## [1.4.0] - 2024-05-23

### Changed
Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Custom technical lineage

This repository contains examples and helper functions to help you develop your custom technical lineage files.
The dependencies are collected in the `requirements.txt` file and can be installed using `pip install -r requirements.txt`.

## Convert single-file definition files to the new batch definition format

Expand Down Expand Up @@ -74,7 +75,7 @@ This example creates a lineage relationship between a file and a column. The cus

## Python batch definition custom technical lineage examples

`tools.example.py` and `tools.example_with_props.py` contain examples of how you can use the models and helper functions defined in `src.models.py` and `src.helper.py`.
`tools.example.py` and `tools.example_with_props.py` contain examples of how you can use the models and helper functions defined in `src.models.py` and `src.helper.py` to generate the required files for custom technical lineage. It also shows how the functions can be used to upload the files to edge, trigger `edgecli` command and synchronize the capability.

## Retrieve the fullname and domain ID of an asset, based on the domain ID, type ID or display name

Expand Down
4 changes: 3 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
pydantic
requests
requests
paramiko
scp
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[metadata]
name = custom-technical-lineage
version = 1.4.0
version = 1.5.0
author = Kristof Van Coillie
author_email = [email protected]
description = Helper scripts for custom technical lineage
Expand Down
56 changes: 56 additions & 0 deletions src/edge.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import logging
from typing import Optional

import paramiko
from scp import SCPClient


class EdgeConnection(object):
def __init__(self, address: str, username: str, certificate_path: str, port: int = 22):
self.address = address
self.username = username
self.certificate = certificate_path
self.port = port
self.ssh_client = self.connect()

def connect(self) -> paramiko.SSHClient:
try:
ssh_client = paramiko.SSHClient()
ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh_client.connect(
hostname=self.address,
port=self.port,
username=self.username,
key_filename=self.certificate,
timeout=20,
look_for_keys=False,
)
logging.info(f"Connected to {self.address} over SSH")

except Exception as e:
logging.error(f"Failed to connect to {self.address} over SSH")
raise e
else:
return ssh_client

def send_command(self, command: str) -> None:
stdin, stdout, stderr = self.ssh_client.exec_command(command)
while not stdout.channel.exit_status_ready():
# Print data when available
if stdout.channel.recv_ready():
alldata = stdout.channel.recv(1024)
prevdata = b"1"
while prevdata:
prevdata = stdout.channel.recv(1024)
alldata += prevdata
logging.info(str(alldata))

def upload_folder(self, source_folder: str, target_folder: str) -> None:
scp = SCPClient(self.ssh_client.get_transport())
scp.put(files=source_folder, remote_path=target_folder, recursive=True)

def upload_edge_shared_folder(self, edge_directory: str, shared_connection_folder: str) -> None:
self.send_command(
command=f"sudo ./edgecli objects folder-upload --source {edge_directory} \
--target {shared_connection_folder}"
)
35 changes: 35 additions & 0 deletions src/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,3 +264,38 @@ def get_asset_types_name_from_lineage_json_file(path: str) -> set:
if None in types:
types.remove(None)
return types


def synchronize_capability(
collibra_instance: str, username: str, password: str, capability_id: str
) -> Optional[requests.Response]:
"""
Helper function that triggers the synchronisation of the custom lineage capability

:param collibra_instance: Collibra instance name
:type collibra_instance: str
:param username: Collibra username
:type username: str
:param password: Collibra user's password
:type password: str
:param capability_id: ID of the capability to synchronize
:type type_id: str
:returns: response of the http post call to synchronize the capability
:rtype: requests.Response
"""
auth = HTTPBasicAuth(username=username, password=password)
url = f"https://{collibra_instance}/rest/catalog/1.0/genericIntegration/{capability_id}/run"
logging.info(f"Sending POST {url}")
try:
ret = requests.post(url=url, auth=auth)
except NameResolutionError as e:
raise e
except Exception as e:
logging.warning(f"POST {url} failed with\n{e}")
else:
if ret.status_code >= 400 and ret.status_code < 500:
raise CollibraAPIError(f"POST {url} failed with {ret.status_code} {ret.text}")
elif ret.status_code == 200:
logging.info(f"Response received for GET {url}: {ret.status_code}")
return ret
return None
17 changes: 16 additions & 1 deletion tools/example.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from src.helper import generate_json_files, generate_source_code
from src.edge import EdgeConnection
from src.helper import generate_json_files, generate_source_code, synchronize_capability
from src.models import (
Asset,
AssetProperties,
Expand Down Expand Up @@ -99,3 +100,17 @@

# Generating the json files
generate_json_files(assets=[], lineages=lineages, asset_types=asset_types, custom_lineage_config=custom_lineage_config)

# Preparing the Edge capability
edge_directory = "/tmp/cl3/" # this is the folder on Edge to which the files will be uploaded to
edge_shared_connection_folder = "shared-folder" # this is the name of the shared folder as configured on the capability
edge_connection = EdgeConnection(address="192.169.10.10", username="username", certificate="/path-to-ssh-cert")
edge_connection.upload_folder(source_folder=custom_lineage_config.output_directory_path, target_folder=edge_directory)
edge_connection.upload_edge_shared_folder(
edge_directory=edge_directory, shared_connection_folder=edge_shared_connection_folder
)

# Trigger the capability
synchronize_capability(
collibra_instance="", username="Admin", password="password", capability_id="custom_lineage_capability_id"
)
17 changes: 16 additions & 1 deletion tools/example_with_props.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from src.helper import generate_json_files, generate_source_code
from src.edge import EdgeConnection
from src.helper import generate_json_files, generate_source_code, synchronize_capability
from src.models import (
Asset,
AssetProperties,
Expand Down Expand Up @@ -121,3 +122,17 @@
generate_json_files(
assets=assets, lineages=lineages, asset_types=asset_types, custom_lineage_config=custom_lineage_config
)

# Preparing the Edge capability
edge_directory = "/tmp/cl3/" # this is the folder on Edge to which the files will be uploaded to
edge_shared_connection_folder = "shared-folder" # this is the name of the shared folder as configured on the capability
edge_connection = EdgeConnection(address="192.169.10.10", username="username", certificate="/path-to-ssh-cert")
edge_connection.upload_folder(source_folder=custom_lineage_config.output_directory_path, target_folder=edge_directory)
edge_connection.upload_edge_shared_folder(
edge_directory=edge_directory, shared_connection_folder=edge_shared_connection_folder
)

# Trigger the capability
synchronize_capability(
collibra_instance="", username="Admin", password="password", capability_id="custom_lineage_capability_id"
)
Loading