forked from amundsen-io/amundsendatabuilder
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcolumn_usage_model.py
106 lines (88 loc) · 4.03 KB
/
column_usage_model.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
from typing import Union, Dict, Any, Iterator # noqa: F401
from databuilder.models.neo4j_csv_serde import (
Neo4jCsvSerializable, RELATION_START_KEY, RELATION_END_KEY,
RELATION_START_LABEL, RELATION_END_LABEL, RELATION_TYPE, RELATION_REVERSE_TYPE
)
from databuilder.models.usage.usage_constants import (
READ_RELATION_TYPE, READ_REVERSE_RELATION_TYPE, READ_RELATION_COUNT_PROPERTY
)
from databuilder.models.table_metadata import TableMetadata
from databuilder.models.user import User
class ColumnUsageModel(Neo4jCsvSerializable):
"""
A model represents user <--> column graph model
Currently it only support to serialize to table level
"""
TABLE_NODE_LABEL = TableMetadata.TABLE_NODE_LABEL
TABLE_NODE_KEY_FORMAT = TableMetadata.TABLE_KEY_FORMAT
USER_TABLE_RELATION_TYPE = READ_RELATION_TYPE
TABLE_USER_RELATION_TYPE = READ_REVERSE_RELATION_TYPE
# Property key for relationship read, readby relationship
READ_RELATION_COUNT = READ_RELATION_COUNT_PROPERTY
def __init__(self,
database, # type: str
cluster, # type: str
schema, # type: str
table_name, # type: str
column_name, # type: str
user_email, # type: str
read_count, # type: int
):
# type: (...) -> None
self.database = database
self.cluster = cluster
self.schema = schema
self.table_name = table_name
self.column_name = column_name
self.user_email = user_email
self.read_count = read_count
self._node_iter = iter(self.create_nodes())
self._relation_iter = iter(self.create_relation())
def create_next_node(self):
# type: () -> Union[Dict[str, Any], None]
try:
return next(self._node_iter)
except StopIteration:
return None
def create_nodes(self):
# type: () -> List[Dict[str, Any]]
"""
Create a list of Neo4j node records
:return:
"""
return User(email=self.user_email).create_nodes()
def create_next_relation(self):
# type: () -> Union[Dict[str, Any], None]
try:
return next(self._relation_iter)
except StopIteration:
return None
def create_relation(self):
# type: () -> Iterator[Any]
return [{
RELATION_START_LABEL: TableMetadata.TABLE_NODE_LABEL,
RELATION_END_LABEL: User.USER_NODE_LABEL,
RELATION_START_KEY: self._get_table_key(),
RELATION_END_KEY: self._get_user_key(self.user_email),
RELATION_TYPE: ColumnUsageModel.TABLE_USER_RELATION_TYPE,
RELATION_REVERSE_TYPE: ColumnUsageModel.USER_TABLE_RELATION_TYPE,
ColumnUsageModel.READ_RELATION_COUNT: self.read_count
}]
def _get_table_key(self):
# type: (ColumnReader) -> str
return TableMetadata.TABLE_KEY_FORMAT.format(db=self.database,
cluster=self.cluster,
schema=self.schema,
tbl=self.table_name)
def _get_user_key(self, email):
# type: (str) -> str
return User.get_user_model_key(email=email)
def __repr__(self):
# type: () -> str
return 'TableColumnUsage({!r}, {!r}, {!r}, {!r}, {!r}, {!r}, {!r})'.format(self.database,
self.cluster,
self.schema,
self.table_name,
self.column_name,
self.user_email,
self.read_count)