-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathreadWriteDtaq.py
99 lines (84 loc) · 2.59 KB
/
readWriteDtaq.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
import time
import threading
from mapepire_python.data_types import DaemonServer
from mapepire_python.client.sql_job import SQLJob
import configparser
config = configparser.ConfigParser()
config.read('mapepire.ini')
section_mapepire = "mapepire"
section_data_queues = "data_queues"
creds = DaemonServer(
host=config[section_mapepire]["SERVER"],
port=config[section_mapepire]["PORT"],
user=config[section_mapepire]["USER"],
password=config[section_mapepire]["PASSWORD"],
ignoreUnauthorized=True
)
# Configuration for data queues
INPUT_QUEUE = config[section_data_queues]["INPUT_QUEUE"]
INPUT_QUEUE_LIB = config[section_data_queues]["INPUT_QUEUE_LIB"]
OUTPUT_QUEUE = config[section_data_queues]["OUTPUT_QUEUE"]
OUTPUT_QUEUE_LIB = config[section_data_queues]["OUTPUT_QUEUE_LIB"]
hit_message = False
jobs = []
num_jobs = 6
for i in range(num_jobs):
job = SQLJob()
job.connect(creds)
jobs.append(job)
count = 0
def process_message(message):
# Dummy processing function - replace with actual processing logic
return int(message) + 1
def receive_data_queue():
global jobs
global count
job = jobs[count % len(jobs)]
count += 1
command_string = (
f"SELECT * FROM TABLE(QSYS2.RECEIVE_DATA_QUEUE("
f"DATA_QUEUE => '{INPUT_QUEUE}', "
f"DATA_QUEUE_LIBRARY => '{INPUT_QUEUE_LIB}'))"
)
try:
result_set = job.query_and_run(command_string)
data = result_set['data'][0]['MESSAGE_DATA_UTF8']
if len(data) > 10:
return (data[0:10], data[10:])
except:
return None
def send_data_queue(key, message):
global jobs
global count
job = jobs[count % len(jobs)]
count += 1
command_string = (
f"CALL QSYS2.SEND_DATA_QUEUE_UTF8("
f"MESSAGE_DATA => '{message}', "
f"DATA_QUEUE => '{OUTPUT_QUEUE}', "
f"DATA_QUEUE_LIBRARY => '{OUTPUT_QUEUE_LIB}', "
f"KEY_DATA => '{key}')"
)
job.query_and_run(command_string)
print("Message sent to output queue:", message)
def write_message(key, data):
processed_message = process_message(data)
send_data_queue(key, processed_message)
def read_and_write():
global hit_message
message = receive_data_queue()
if message:
hit_message = True
t1 = threading.Thread(None, write_message, None, message)
t1.start()
else:
hit_message = False
def main():
while True:
read_and_write()
if (hit_message == False):
time.sleep(5) # Adjust wait time as needed
else:
time.sleep(0.025) # Adjust wait time as need
if __name__ == "__main__":
main()