forked from amundsen-io/amundsen
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcolumn.py
123 lines (100 loc) · 4.9 KB
/
column.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0
import json
from http import HTTPStatus
from typing import Iterable, Mapping, Union
from amundsen_common.entity.resource_type import ResourceType
from amundsen_common.models.lineage import LineageSchema
from flasgger import swag_from
from flask import request
from flask_restful import Resource, reqparse
from metadata_service.api.badge import BadgeCommon
from metadata_service.exception import NotFoundException
from metadata_service.proxy import get_proxy_client
class ColumnLineageAPI(Resource):
"""
ColumnLineageAPI supports GET operation to get column lineage
"""
def __init__(self) -> None:
self.client = get_proxy_client()
self.parser = reqparse.RequestParser()
self.parser.add_argument('direction', type=str, required=False, default="both")
self.parser.add_argument('depth', type=int, required=False, default=1)
super(ColumnLineageAPI, self).__init__()
@swag_from('swagger_doc/column/lineage_get.yml')
def get(self, table_uri: str, column_name: str) -> Iterable[Union[Mapping, int, None]]:
args = self.parser.parse_args()
direction = args.get('direction')
depth = args.get('depth')
try:
lineage = self.client.get_lineage(id=f"{table_uri}/{column_name}",
resource_type=ResourceType.Column,
direction=direction,
depth=depth)
schema = LineageSchema()
return schema.dump(lineage), HTTPStatus.OK
except Exception as e:
return {'message': f'Exception raised when getting lineage: {e}'}, HTTPStatus.NOT_FOUND
class ColumnDescriptionAPI(Resource):
"""
ColumnDescriptionAPI supports PUT and GET operations to upsert column description
"""
def __init__(self) -> None:
self.client = get_proxy_client()
super(ColumnDescriptionAPI, self).__init__()
@swag_from('swagger_doc/column/description_put.yml')
def put(self,
table_uri: str,
column_name: str) -> Iterable[Union[dict, tuple, int, None]]:
"""
Updates column description (passed as a request body)
:param table_uri:
:param column_name:
:return:
"""
try:
description = json.loads(request.data).get('description')
self.client.put_column_description(table_uri=table_uri,
column_name=column_name,
description=description)
return None, HTTPStatus.OK
except NotFoundException:
msg = 'table_uri {} with column {} does not exist'.format(table_uri, column_name)
return {'message': msg}, HTTPStatus.NOT_FOUND
@swag_from('swagger_doc/column/description_get.yml')
def get(self, table_uri: str, column_name: str) -> Union[tuple, int, None]:
"""
Gets column descriptions in Neo4j
"""
try:
description = self.client.get_column_description(table_uri=table_uri,
column_name=column_name)
return {'description': description}, HTTPStatus.OK
except NotFoundException:
msg = 'table_uri {} with column {} does not exist'.format(table_uri, column_name)
return {'message': msg}, HTTPStatus.NOT_FOUND
except Exception:
return {'message': 'Internal server error!'}, HTTPStatus.INTERNAL_SERVER_ERROR
class ColumnBadgeAPI(Resource):
def __init__(self) -> None:
self.client = get_proxy_client()
self.parser = reqparse.RequestParser()
self.parser.add_argument('category', type=str, required=True)
super(ColumnBadgeAPI, self).__init__()
self._badge_common = BadgeCommon(client=self.client)
@swag_from('swagger_doc/column/badge_put.yml')
def put(self, table_uri: str, badge: str, column_name: str) -> Iterable[Union[Mapping, int, None]]:
args = self.parser.parse_args()
category = args.get('category', '')
return self._badge_common.put(id=f"{table_uri}/{column_name}",
resource_type=ResourceType.Column,
badge_name=badge,
category=category)
@swag_from('swagger_doc/column/badge_delete.yml')
def delete(self, table_uri: str, badge: str, column_name: str) -> Iterable[Union[Mapping, int, None]]:
args = self.parser.parse_args()
category = args.get('category', '')
return self._badge_common.delete(id=f"{table_uri}/{column_name}",
resource_type=ResourceType.Column,
badge_name=badge,
category=category)