Skip to content

Commit

Permalink
Copy data processings (#5)
Browse files Browse the repository at this point in the history
resolves #2
  • Loading branch information
drscholly authored Nov 3, 2023
1 parent 99e1d2c commit c2c9e56
Show file tree
Hide file tree
Showing 4 changed files with 243 additions and 0 deletions.
45 changes: 45 additions & 0 deletions tests/test_copy_dataprocessings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from toolbox.api.datagalaxy_api_dataprocessings import DataGalaxyApiDataprocessings
from toolbox.commands.copy_dataprocessings import copy_dataprocessings
from toolbox.api.datagalaxy_api_workspaces import DataGalaxyApiWorkspace

from toolbox.api.datagalaxy_api import DataGalaxyApiAuthentication, Token
import pytest as pytest


# Mocks

def mock_list_dataprocessings_on_source_workspace(self, workspace_name):
if workspace_name == 'workspace_source':
return ['dataprocessing1', 'dataprocessing2', 'dataprocessing3']
return []


# Scenarios

def test_copy_dataprocessings_when_workspace_source_does_not_exist(mocker):
# GIVEN
client_space_mock = mocker.patch.object(Token, 'get_client_space_id', autospec=True)
client_space_mock.return_value = 'cid'
api_authenticate_mock = mocker.patch.object(DataGalaxyApiAuthentication, 'authenticate', autospec=True)
api_authenticate_mock.return_value = 'token'
workspaces = mocker.patch.object(DataGalaxyApiWorkspace, 'list_workspaces', autospec=True)
workspaces.return_value = ['workspace_source']
workspace_source_mock = mocker.patch.object(DataGalaxyApiWorkspace, 'get_workspace', autospec=True)
workspace_source_mock.return_value = None
dataprocessings_on_source_workspace_mock = mocker.patch.object(
DataGalaxyApiDataprocessings,
'list_dataprocessings',
autospec=True
)
dataprocessings_on_source_workspace_mock.side_effect = mock_list_dataprocessings_on_source_workspace

# ASSERT / VERIFY
with pytest.raises(Exception, match='workspace workspace_source does not exist'):
copy_dataprocessings(
url_source='url_source',
token_source='token_source',
url_target='url_target',
token_target='token_target',
workspace_source_name='workspace_source',
workspace_target_name='workspace_target'
)
9 changes: 9 additions & 0 deletions toolbox/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from toolbox.commands.copy_attributes import copy_attributes_parse, copy_attributes
from toolbox.commands.copy_usages import copy_usages, copy_usages_parse
from toolbox.commands.copy_dataprocessings import copy_dataprocessings, copy_dataprocessings_parse
from toolbox.commands.delete_attributes import delete_attributes_parse, delete_attributes
from toolbox.commands.copy_glossary import copy_glossary_parse, copy_glossary
from toolbox.commands.copy_dictionary import copy_dictionary, copy_dictionary_parse
Expand All @@ -26,6 +27,7 @@ def run(args):
copy_glossary_parse(subparsers)
copy_usages_parse(subparsers)
copy_dictionary_parse(subparsers)
copy_dataprocessings_parse(subparsers)
# parse some argument lists
result = parser.parse_args(args)
if result.verbose:
Expand Down Expand Up @@ -70,6 +72,13 @@ def run(args):
logging.info("<<< copy_dictionary")
return 0

if result.subparsers_name == 'copy-dataprocessings':
logging.info(">>> copy_dataprocessings")
copy_dataprocessings(result.url_source, result.url_target, result.token_source, result.token_target,
result.workspace_source, result.workspace_target)
logging.info("<<< copy_dataprocessings")
return 0

parser.print_help(sys.stderr)
return 1

Expand Down
83 changes: 83 additions & 0 deletions toolbox/api/datagalaxy_api_dataprocessings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import logging

import requests as requests
from toolbox.api.datagalaxy_api_workspaces import DataGalaxyBulkResult, to_bulk_tree


class DataGalaxyApiDataprocessings:
def __init__(self, url: str, access_token: str, workspace: dict):
if workspace["isVersioningEnabled"]:
raise Exception('Workspace with versioning enabled are currently not supported.')
self.url = url
self.access_token = access_token
self.workspace = workspace

def list_dataprocessings(self, workspace_name: str) -> list:
version_id = self.workspace['defaultVersionId']
params = {'versionId': version_id, 'includeAttributes': 'false'}
headers = {'Authorization': f"Bearer {self.access_token}"}
response = requests.get(f"{self.url}/dataProcessing", params=params, headers=headers)
code = response.status_code
body_json = response.json()
if code != 200:
raise Exception(body_json['error'])
logging.info(
f'list_dataprocessings - {len(body_json["results"])} dataprocessings found on '
f'workspace: {workspace_name}')
result = []
result = result + body_json['results']
next_page = body_json["next_page"]
while next_page is not None:
headers = {'Authorization': f"Bearer {self.access_token}"}
response = requests.get(next_page, headers=headers)
body_json = response.json()
next_page = body_json["next_page"]
result = result + body_json['results']
return result

def list_dataprocessing_items(self, workspace_name: str, parent_id: str) -> list:
version_id = self.workspace['defaultVersionId']
params = {'versionId': version_id, 'parentId': parent_id, 'includeAttributes': 'false'}
headers = {'Authorization': f"Bearer {self.access_token}"}
response = requests.get(f"{self.url}/dataProcessingItem", params=params, headers=headers)
code = response.status_code
body_json = response.json()
if code != 200:
raise Exception(body_json['error'])
logging.info(
f'list_dataprocessing_items - {len(body_json["results"])} dataprocessing_items found on '
f'workspace: {workspace_name} for parent_id: {parent_id}')
result = []
result = result + body_json['results']
next_page = body_json["next_page"]
while next_page is not None:
headers = {'Authorization': f"Bearer {self.access_token}"}
response = requests.get(next_page, headers=headers)
body_json = response.json()
next_page = body_json["next_page"]
result = result + body_json['results']
return result

def bulk_upsert_dataprocessings_tree(self, workspace_name: str, dataprocessings: list) -> DataGalaxyBulkResult:
# Existing entities are updated and non-existing ones are created.
bulk_tree = to_bulk_tree(dataprocessings)

version_id = self.workspace['defaultVersionId']
headers = {'Authorization': f"Bearer {self.access_token}"}
response = requests.post(f"{self.url}/dataProcessing/bulktree/{version_id}", json=bulk_tree, headers=headers)
code = response.status_code
body_json = response.json()
if 200 <= code < 300:
result = DataGalaxyBulkResult(total=body_json["total"], created=body_json["created"],
deleted=body_json["deleted"], unchanged=body_json["unchanged"],
updated=body_json["updated"])
logging.info(
f'bulk_upsert_dataprocessings_tree - {result.total} dataprocessings copied on workspace {workspace_name}:'
f'{result.created} were created, {result.updated} were updated, '
f'{result.deleted} were deleted and {result.unchanged} were unchanged')
return result

if 400 <= code < 500:
raise Exception(body_json['error'])

raise Exception(f'Unexpected error, code: {code}')
106 changes: 106 additions & 0 deletions toolbox/commands/copy_dataprocessings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
from typing import Optional

from toolbox.api.datagalaxy_api import get_access_token, Token
from toolbox.api.datagalaxy_api_dataprocessings import DataGalaxyApiDataprocessings
from toolbox.api.datagalaxy_api_workspaces import DataGalaxyApiWorkspace, DataGalaxyBulkResult


def copy_dataprocessings(url_source: str,
url_target: Optional[str],
token_source: str,
token_target: Optional[str],
workspace_source_name: str,
workspace_target_name: str) -> DataGalaxyBulkResult:
if token_target is None:
token_target = token_source

if url_target is None:
url_target = url_source

integration_token_source = Token(token_source)
integration_token_target = Token(token_target)
source_access_token = get_access_token(url_source, integration_token_source)
target_access_token = get_access_token(url_target, integration_token_target)

workspaces_api_on_source_env = DataGalaxyApiWorkspace(
url=url_source,
access_token=source_access_token)
source_workspace = workspaces_api_on_source_env.get_workspace(workspace_source_name)
if source_workspace is None:
raise Exception(f'workspace {workspace_source_name} does not exist')

workspaces_api_on_target_env = DataGalaxyApiWorkspace(
url=url_target,
access_token=target_access_token
)
target_workspace = workspaces_api_on_target_env.get_workspace(workspace_target_name)
if target_workspace is None:
raise Exception(f'workspace {workspace_target_name} does not exist')

dataprocessings_on_source_workspace = DataGalaxyApiDataprocessings(
url=url_source,
access_token=source_access_token,
workspace=source_workspace
)
dataprocessings_on_target_workspace = DataGalaxyApiDataprocessings(
url=url_target,
access_token=target_access_token,
workspace=target_workspace
)

# fetching dataprocessings from source workspace
workspace_source_dataprocessings = dataprocessings_on_source_workspace.list_dataprocessings(workspace_source_name)

for dp in workspace_source_dataprocessings:
dp_index = workspace_source_dataprocessings.index(dp)
items = dataprocessings_on_source_workspace.list_dataprocessing_items(workspace_name=workspace_source_name, parent_id=dp['id'])
for item in items:
item_index = items.index(item)
del items[item_index]['summary']
# for inputs and outputs, property 'path' must be named 'entityPath'
for input in item['inputs']:
input_index = item['inputs'].index(input)
items[item_index]['inputs'][input_index]['entityPath'] = input['path']
for output in item['outputs']:
output_index = item['outputs'].index(output)
items[item_index]['outputs'][output_index]['entityPath'] = output['path']
workspace_source_dataprocessings[dp_index]['dataProcessingItems'] = items

# copying the dataprocessings on the target workspace
return dataprocessings_on_target_workspace.bulk_upsert_dataprocessings_tree(
workspace_name=workspace_target_name,
dataprocessings=workspace_source_dataprocessings
)


def copy_dataprocessings_parse(subparsers):
# create the parser for the "copy_dataprocessings" command
copy_dataprocessings_parse = subparsers.add_parser('copy-dataprocessings', help='copy-dataprocessings help')
copy_dataprocessings_parse.add_argument(
'--url-source',
type=str,
help='url source environnement',
required=True)
copy_dataprocessings_parse.add_argument(
'--token-source',
type=str,
help='integration token source environnement',
required=True)
copy_dataprocessings_parse.add_argument(
'--url-target',
type=str,
help='url target environnement (if undefined, use url source)')
copy_dataprocessings_parse.add_argument(
'--token-target',
type=str,
help='integration token target environnement (if undefined, use token source)')
copy_dataprocessings_parse.add_argument(
'--workspace-source',
type=str,
help='workspace source name',
required=True)
copy_dataprocessings_parse.add_argument(
'--workspace-target',
type=str,
help='workspace target name',
required=True)

0 comments on commit c2c9e56

Please sign in to comment.