forked from amundsen-io/amundsendatabuilder
-
Notifications
You must be signed in to change notification settings - Fork 0
/
neo4j_es_last_updated.py
63 lines (54 loc) · 1.72 KB
/
neo4j_es_last_updated.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
from typing import Any, Dict, List, Union # noqa: F401
from databuilder.models.neo4j_csv_serde import Neo4jCsvSerializable, NODE_KEY, NODE_LABEL
class Neo4jESLastUpdated(Neo4jCsvSerializable):
# type: (...) -> None
"""
Data model to keep track the last updated timestamp for
neo4j and es.
"""
LABEL = 'Updatedtimestamp'
KEY = 'amundsen_updated_timestamp'
LATEST_TIMESTAMP = 'latest_timestmap'
def __init__(self,
timestamp, # type: int
):
# type: (...) -> None
"""
:param timestamp: epoch for latest updated timestamp for neo4j an es
"""
self.timestamp = timestamp
self._node_iter = iter(self.create_nodes())
self._rel_iter = iter(self.create_relation())
def create_next_node(self):
# type: (...) -> Union[Dict[str, Any], None]
"""
Will create an orphan node for last updated timestamp.
:return:
"""
try:
return next(self._node_iter)
except StopIteration:
return None
def create_nodes(self):
# type: () -> List[Dict[str, Any]]
"""
Create a list of Neo4j node records.
:return:
"""
return [{
NODE_KEY: Neo4jESLastUpdated.KEY,
NODE_LABEL: Neo4jESLastUpdated.LABEL,
Neo4jESLastUpdated.LATEST_TIMESTAMP: self.timestamp
}]
def create_next_relation(self):
# type: () -> Union[Dict[str, Any], None]
"""
:return:
"""
try:
return next(self._rel_iter)
except StopIteration:
return None
def create_relation(self):
# type: () -> List[Dict[str, Any]]
return []