From 7f29c70fff07be302051cfacdc948baf66805fb5 Mon Sep 17 00:00:00 2001 From: kristofvancoillie Date: Wed, 24 Jul 2024 08:19:39 +0200 Subject: [PATCH 1/3] Edge automation and capability sync --- .github/workflows/pipeline.yml | 2 +- CHANGELOG.md | 6 ++++ README.md | 3 +- requirements.txt | 4 ++- setup.cfg | 2 +- src/edge.py | 60 ++++++++++++++++++++++++++++++++++ src/helper.py | 35 ++++++++++++++++++++ tools/example.py | 17 +++++++++- tools/example_with_props.py | 17 +++++++++- 9 files changed, 140 insertions(+), 6 deletions(-) create mode 100644 src/edge.py diff --git a/.github/workflows/pipeline.yml b/.github/workflows/pipeline.yml index 0c4a94f..177c701 100644 --- a/.github/workflows/pipeline.yml +++ b/.github/workflows/pipeline.yml @@ -25,7 +25,7 @@ jobs: run: | python -m pip install -r requirements.txt python -m pip install --upgrade pip setuptools wheel - python -m pip install --upgrade flake8 mypy types-requests + python -m pip install --upgrade flake8 mypy types-requests types-paramiko python -m pip install -e . - name: "Run mypy and flake8 for ${{ matrix.python-version }}" diff --git a/CHANGELOG.md b/CHANGELOG.md index d3f91b9..d4bac3c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +## [1.5.0] - 2024-07-23 + +### Changed + +- Helper functionality and example to automate upload to Edge, trigger edgecli command and synchronize the capability + ## [1.4.0] - 2024-05-23 ### Changed diff --git a/README.md b/README.md index c641411..24890a3 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,7 @@ # Custom technical lineage This repository contains examples and helper functions to help you develop your custom technical lineage files. +The dependencies are collected in the `requirements.txt` file and can be installed using `pip install -r requirements.txt`. ## Convert single-file definition files to the new batch definition format @@ -74,7 +75,7 @@ This example creates a lineage relationship between a file and a column. The cus ## Python batch definition custom technical lineage examples -`tools.example.py` and `tools.example_with_props.py` contain examples of how you can use the models and helper functions defined in `src.models.py` and `src.helper.py`. +`tools.example.py` and `tools.example_with_props.py` contain examples of how you can use the models and helper functions defined in `src.models.py` and `src.helper.py` to generate the required files for custom technical lineage. It also shows how the functions can be used to upload the files to edge, trigger `edgecli` command and synchronize the capability. ## Retrieve the fullname and domain ID of an asset, based on the domain ID, type ID or display name diff --git a/requirements.txt b/requirements.txt index 76aa8db..1ef0e8c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,4 @@ pydantic -requests \ No newline at end of file +requests +paramiko +scp \ No newline at end of file diff --git a/setup.cfg b/setup.cfg index e127894..9eae112 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = custom-technical-lineage -version = 1.4.0 +version = 1.5.0 author = Kristof Van Coillie author_email = kristof.vancoillie@collibra.com description = Helper scripts for custom technical lineage diff --git a/src/edge.py b/src/edge.py new file mode 100644 index 0000000..7a9a669 --- /dev/null +++ b/src/edge.py @@ -0,0 +1,60 @@ +import logging +from typing import Optional + +import paramiko +from scp import SCPClient + + +class EdgeConnection(object): + def __init__(self, address: str, username: str, certificate_path: str, port: int = 22): + self.address = address + self.username = username + self.certificate = certificate_path + self.port = port + self.ssh_client = None + self.ssh_client = self.connect() + + def connect(self) -> Optional[paramiko.SSHClient]: + try: + ssh_client = paramiko.SSHClient() + ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + ssh_client.connect( + hostname=self.address, + port=self.port, + username=self.username, + key_filename=self.certificate, + timeout=20, + look_for_keys=False, + ) + logging.info(f"Connected to {self.address} over SSH") + + except Exception as e: + ssh_client = None + logging.error(f"Unable to connect to {self.address} over SSH: {e}") + return ssh_client + + def send_command(self, command: str) -> None: + if self.ssh_client: + stdin, stdout, stderr = self.ssh_client.exec_command(command) + while not stdout.channel.exit_status_ready(): + # Print data when available + if stdout.channel.recv_ready(): + alldata = stdout.channel.recv(1024) + prevdata = b"1" + while prevdata: + prevdata = stdout.channel.recv(1024) + alldata += prevdata + logging.info(str(alldata)) + else: + logging.error(f"Connection to Edge server {self.address} not opened.") + + def upload_folder(self, source_folder: str, target_folder: str) -> None: + if self.ssh_client: + scp = SCPClient(self.ssh_client.get_transport()) + scp.put(files=source_folder, remote_path=target_folder, recursive=True) + + def upload_edge_shared_folder(self, edge_directory: str, shared_connection_folder: str) -> None: + self.send_command( + command=f"sudo ./edgecli objects folder-upload --source {edge_directory} \ + --target {shared_connection_folder}" + ) diff --git a/src/helper.py b/src/helper.py index 11a244e..9a565fa 100644 --- a/src/helper.py +++ b/src/helper.py @@ -264,3 +264,38 @@ def get_asset_types_name_from_lineage_json_file(path: str) -> set: if None in types: types.remove(None) return types + + +def synchronize_capability( + collibra_instance: str, username: str, password: str, capability_id: str +) -> Optional[requests.Response]: + """ + Helper function that triggers the synchronisation of the custom lineage capability + + :param collibra_instance: Collibra instance name + :type collibra_instance: str + :param username: Collibra username + :type username: str + :param password: Collibra user's password + :type password: str + :param capability_id: ID of the capability to synchronize + :type type_id: str + :returns: response of the http post call to synchronize the capability + :rtype: requests.Response + """ + auth = HTTPBasicAuth(username=username, password=password) + url = f"https://{collibra_instance}/rest/catalog/1.0/genericIntegration/{capability_id}/run" + logging.info(f"Sending POST {url}") + try: + ret = requests.post(url=url, auth=auth) + except NameResolutionError as e: + raise e + except Exception as e: + logging.warning(f"POST {url} failed with\n{e}") + else: + if ret.status_code >= 400 and ret.status_code < 500: + raise CollibraAPIError(f"POST {url} failed with {ret.status_code} {ret.text}") + elif ret.status_code == 200: + logging.info(f"Response received for GET {url}: {ret.status_code}") + return ret + return None diff --git a/tools/example.py b/tools/example.py index ec57d57..cabce76 100644 --- a/tools/example.py +++ b/tools/example.py @@ -1,4 +1,5 @@ -from src.helper import generate_json_files, generate_source_code +from src.edge import EdgeConnection +from src.helper import generate_json_files, generate_source_code, synchronize_capability from src.models import ( Asset, AssetProperties, @@ -99,3 +100,17 @@ # Generating the json files generate_json_files(assets=[], lineages=lineages, asset_types=asset_types, custom_lineage_config=custom_lineage_config) + +# Preparing the Edge capability +edge_directory = "/tmp/cl3/" # this is the folder on Edge to which the files will be uploaded to +edge_shared_connection_folder = "shared-folder" # this is the name of the shared folder as configured on the capability +edge_connection = EdgeConnection(address="192.169.10.10", username="username", certificate="/path-to-ssh-cert") +edge_connection.upload_folder(source_folder=custom_lineage_config.output_directory_path, target_folder=edge_directory) +edge_connection.upload_edge_shared_folder( + edge_directory=edge_directory, shared_connection_folder=edge_shared_connection_folder +) + +# Trigger the capability +synchronize_capability( + collibra_instance="", username="Admin", password="password", capability_id="custom_lineage_capability_id" +) \ No newline at end of file diff --git a/tools/example_with_props.py b/tools/example_with_props.py index b82ea75..f07fa89 100644 --- a/tools/example_with_props.py +++ b/tools/example_with_props.py @@ -1,4 +1,5 @@ -from src.helper import generate_json_files, generate_source_code +from src.edge import EdgeConnection +from src.helper import generate_json_files, generate_source_code, synchronize_capability from src.models import ( Asset, AssetProperties, @@ -121,3 +122,17 @@ generate_json_files( assets=assets, lineages=lineages, asset_types=asset_types, custom_lineage_config=custom_lineage_config ) + +# Preparing the Edge capability +edge_directory = "/tmp/cl3/" # this is the folder on Edge to which the files will be uploaded to +edge_shared_connection_folder = "shared-folder" # this is the name of the shared folder as configured on the capability +edge_connection = EdgeConnection(address="192.169.10.10", username="username", certificate="/path-to-ssh-cert") +edge_connection.upload_folder(source_folder=custom_lineage_config.output_directory_path, target_folder=edge_directory) +edge_connection.upload_edge_shared_folder( + edge_directory=edge_directory, shared_connection_folder=edge_shared_connection_folder +) + +# Trigger the capability +synchronize_capability( + collibra_instance="", username="Admin", password="password", capability_id="custom_lineage_capability_id" +) \ No newline at end of file From 18fb22e2894ca7405a7648059ec875e80374b9b0 Mon Sep 17 00:00:00 2001 From: kristofvancoillie Date: Wed, 24 Jul 2024 08:23:06 +0200 Subject: [PATCH 2/3] fix formatting --- tools/example.py | 2 +- tools/example_with_props.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tools/example.py b/tools/example.py index cabce76..56ef9ab 100644 --- a/tools/example.py +++ b/tools/example.py @@ -113,4 +113,4 @@ # Trigger the capability synchronize_capability( collibra_instance="", username="Admin", password="password", capability_id="custom_lineage_capability_id" -) \ No newline at end of file +) diff --git a/tools/example_with_props.py b/tools/example_with_props.py index f07fa89..1e6ef27 100644 --- a/tools/example_with_props.py +++ b/tools/example_with_props.py @@ -135,4 +135,4 @@ # Trigger the capability synchronize_capability( collibra_instance="", username="Admin", password="password", capability_id="custom_lineage_capability_id" -) \ No newline at end of file +) From 35fe1517898a62f82356fffb3a656ff8cf0aef80 Mon Sep 17 00:00:00 2001 From: kristofvancoillie Date: Thu, 25 Jul 2024 16:19:03 +0200 Subject: [PATCH 3/3] Updated based on PR feedback --- src/edge.py | 38 +++++++++++++++++--------------------- 1 file changed, 17 insertions(+), 21 deletions(-) diff --git a/src/edge.py b/src/edge.py index 7a9a669..afdcd23 100644 --- a/src/edge.py +++ b/src/edge.py @@ -11,10 +11,9 @@ def __init__(self, address: str, username: str, certificate_path: str, port: int self.username = username self.certificate = certificate_path self.port = port - self.ssh_client = None self.ssh_client = self.connect() - def connect(self) -> Optional[paramiko.SSHClient]: + def connect(self) -> paramiko.SSHClient: try: ssh_client = paramiko.SSHClient() ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) @@ -29,29 +28,26 @@ def connect(self) -> Optional[paramiko.SSHClient]: logging.info(f"Connected to {self.address} over SSH") except Exception as e: - ssh_client = None - logging.error(f"Unable to connect to {self.address} over SSH: {e}") - return ssh_client + logging.error(f"Failed to connect to {self.address} over SSH") + raise e + else: + return ssh_client def send_command(self, command: str) -> None: - if self.ssh_client: - stdin, stdout, stderr = self.ssh_client.exec_command(command) - while not stdout.channel.exit_status_ready(): - # Print data when available - if stdout.channel.recv_ready(): - alldata = stdout.channel.recv(1024) - prevdata = b"1" - while prevdata: - prevdata = stdout.channel.recv(1024) - alldata += prevdata - logging.info(str(alldata)) - else: - logging.error(f"Connection to Edge server {self.address} not opened.") + stdin, stdout, stderr = self.ssh_client.exec_command(command) + while not stdout.channel.exit_status_ready(): + # Print data when available + if stdout.channel.recv_ready(): + alldata = stdout.channel.recv(1024) + prevdata = b"1" + while prevdata: + prevdata = stdout.channel.recv(1024) + alldata += prevdata + logging.info(str(alldata)) def upload_folder(self, source_folder: str, target_folder: str) -> None: - if self.ssh_client: - scp = SCPClient(self.ssh_client.get_transport()) - scp.put(files=source_folder, remote_path=target_folder, recursive=True) + scp = SCPClient(self.ssh_client.get_transport()) + scp.put(files=source_folder, remote_path=target_folder, recursive=True) def upload_edge_shared_folder(self, edge_directory: str, shared_connection_folder: str) -> None: self.send_command(