-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpipeline.py
111 lines (90 loc) · 4.53 KB
/
pipeline.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import os
import sys
sys.path.append("/opt/pyflink-nlp/")
from pyflink.table import EnvironmentSettings, StreamTableEnvironment, DataTypes
from pyflink.table.udf import udf
from pyflink.datastream import StreamExecutionEnvironment
import tokenizer
import pandas as pd
from gensim.models.ldamodel import LdaModel
from gensim.corpora import Dictionary
ddl_jdbc_source = """CREATE TABLE user_ml_messages (
message_date TIMESTAMP(3),
message_id STRING,
message_in_reply_to STRING,
message_from_name STRING,
message_from_email STRING,
message_subject STRING,
proc_time AS PROCTIME(),
PRIMARY KEY(message_id) NOT ENFORCED
) WITH (
'connector'= 'jdbc',
'url' = 'jdbc:postgresql://postgres:5432/postgres',
'table-name' = 'perceval.stg_flink_user_ml',
'username' = 'postgres',
'password' = 'postgres'
)"""
ddl_jdbc_dim = """CREATE TABLE dim_topics (
id VARCHAR(2),
topic VARCHAR(50),
topic_weigh VARCHAR(100),
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector'= 'jdbc',
'url' = 'jdbc:postgresql://postgres:5432/postgres',
'table-name' = 'perceval.dim_topics',
'username' = 'postgres',
'password' = 'postgres'
)"""
ddl_jdbc_sink = """CREATE TABLE flink_user_ml_topics (
message_id STRING,
message_date TIMESTAMP(3),
message_from_name STRING,
topic VARCHAR(2),
topic_description VARCHAR(50),
PRIMARY KEY(message_id) NOT ENFORCED
) WITH (
'connector'= 'jdbc',
'url' = 'jdbc:postgresql://postgres:5432/postgres',
'table-name' = 'perceval.flink_user_ml_topics',
'username' = 'postgres',
'password' = 'postgres'
)"""
@udf(input_types=DataTypes.STRING(), result_type=DataTypes.STRING(), udf_type='pandas')
def class_model(m):
lda = LdaModel.load("model/lda_model/lda_model_user_ml")
dic = Dictionary.load('model/lda_model/lda_model_user_ml.id2word')
topics = [tokenizer.find_topic(message, lda, dic) for message in m]
return pd.Series(topics)
if __name__ == '__main__':
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(env)
t_env.add_python_file("/opt/pyflink-nlp/tokenizer.py")
t_env.add_python_archive(archive_path="/opt/pyflink-nlp/lda_model.zip#model", target_dir=None)
config = t_env.get_config().get_configuration()
config.set_string("taskmanager.memory.task.off-heap.size", "80mb") #512mb
create_jdbc_source = t_env.execute_sql(ddl_jdbc_source)
create_jdbc_sink = t_env.execute_sql(ddl_jdbc_sink)
create_jdbc_dim = t_env.execute_sql(ddl_jdbc_dim)
# Register the UDF in the Table Environment
t_env.register_function("CLASS_TOPIC", class_model)
#
t_env.execute_sql("""INSERT INTO flink_user_ml_topics
WITH topics AS (SELECT message_id,
message_date,
message_from_name,
CLASS_TOPIC(message_subject) AS topic_id,
proc_time
FROM user_ml_messages
WHERE message_subject IS NOT NULL
)
SELECT t.message_id,
t.message_date,
t.message_from_name,
t.topic_id,
dim.topic AS topic_name
FROM topics t
JOIN dim_topics FOR SYSTEM_TIME AS OF t.proc_time AS dim
ON t.topic_id = dim.id
""")