Skip to content

Commit

Permalink
feat: Implement more sophisticated notification system
Browse files Browse the repository at this point in the history
- Add support for multiple stakeholder notifications based on event types
- Improve data processing with new database query structure
- Enhance message formatting with more detailed ship information
refactor: Optimize code structure and readability
- Introduce helper functions for data processing and conversion
- Improve error handling and logging
- Update main loop for better flow control and error reporting
  • Loading branch information
jotpalch committed Aug 23, 2024
1 parent 7905ef5 commit 556ae41
Showing 1 changed file with 127 additions and 51 deletions.
178 changes: 127 additions & 51 deletions notifier/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
import psycopg2
import requests
from datetime import datetime, timedelta
import json
from psycopg2.extras import RealDictCursor
from config import notification_mapping, INOUT_PILOTAGE_EVENTS

def send_line_notify(message, token):
url = 'https://notify-api.line.me/api/notify'
Expand All @@ -21,86 +24,159 @@ def get_recent_ship_statuses(interval):
dbname=os.getenv('POSTGRES_DB'),
user=os.getenv('POSTGRES_USER'),
password=os.getenv('POSTGRES_PASSWORD'),
host='db' # Docker Compose 服務名稱
host='db'
)
cur = conn.cursor()
cur = conn.cursor(cursor_factory=RealDictCursor)
interval_ago = datetime.now() - timedelta(seconds=interval)

cur.execute('''
SELECT ship_voyage_number, ship_name, latest_event, port_entry_application,
berth_shift_application, port_departure_application, offshore_vessel_entry,
at_anchor, port_entry_in_progress, loading_unloading_notice,
berth_shift_in_progress, berth_shift_loading_unloading,
port_departure_in_progress, vessel_departed, created_at, updated_at
FROM ship_status
WHERE updated_at >= %s
WITH ranked_events AS (
SELECT
se.*,
ROW_NUMBER() OVER (PARTITION BY se.ship_voyage_number, se.event_name ORDER BY se.event_time DESC) as rn
FROM ship_events se
WHERE se.event_name IN ('修改進港預報', '修改出港預報')
),
latest_event AS (
SELECT
se.*,
ROW_NUMBER() OVER (PARTITION BY se.ship_voyage_number ORDER BY se.event_time DESC) as rn
FROM ship_events se
)
SELECT
ss.ship_name,
ss.ship_voyage_number,
eta.event_content as eta,
etd.event_content as etd,
le.event_name as latest_event_name,
le.event_time as latest_event_time,
le.navigation_status as navigation_status,
le.event_content as latest_event_content,
le.event_source as latest_event_source,
ss.updated_at
FROM ship_status ss
LEFT JOIN ranked_events eta ON ss.ship_voyage_number = eta.ship_voyage_number
AND eta.event_name = '修改進港預報' AND eta.rn = 1
LEFT JOIN ranked_events etd ON ss.ship_voyage_number = etd.ship_voyage_number
AND etd.event_name = '修改出港預報' AND etd.rn = 1
LEFT JOIN latest_event le ON ss.ship_voyage_number = le.ship_voyage_number AND le.rn = 1
WHERE ss.updated_at >= %s
ORDER BY ss.updated_at DESC
''', (interval_ago,))

rows = cur.fetchall()
cur.close()
conn.close()
return rows

return [process_row(row) for row in rows]

def status_mapper(status):
# Map the status values to symbols
def process_row(row):
return {
'YES': '✅',
'NO': '',
'RED': '🔴'
}.get(status, status) # Return the original status if it doesn't need to be mapped
'船名': row['ship_name'],
'船編': row['ship_voyage_number'][:6],
'航次': row['ship_voyage_number'][6:10],
'ETA': convert_to_timestamp(row['eta']),
'ETD': convert_to_timestamp(row['etd']),
'最新消息': convert_inout_pilotage_event(row['latest_event_name'], row['navigation_status']),
'事件時間': convert_to_timestamp(row['latest_event_content']),
'事件來源': row['latest_event_source'],
'更新時間': row['updated_at']
}

def format_message(row):
ship_voyage_number, ship_name, latest_event, port_entry_application, berth_shift_application, \
port_departure_application, offshore_vessel_entry, at_anchor, port_entry_in_progress, \
loading_unloading_notice, berth_shift_in_progress, berth_shift_loading_unloading, \
port_departure_in_progress, vessel_departed, created_at, updated_at = row
def convert_to_timestamp(date_string):
if not date_string:
return None
for fmt in ("%Y/%m/%d %H:%M:%S", "%Y%m%d%H%M"):
try:
return datetime.strptime(date_string, fmt)
except ValueError:
continue
return date_string

# Convert to UTC+8
updated_at_utc8 = updated_at + timedelta(hours=8)
# Format the datetime object back to string
updated_at_str = updated_at_utc8.strftime("%Y-%m-%d %H:%M:%S")
def convert_inout_pilotage_event(event_name, navigation_status):
if event_name in INOUT_PILOTAGE_EVENTS:
return event_name + f" ({navigation_status})"
return event_name

def format_message(row):
ship_name = row['船名']
ship_id = row['船編']
voyage_number = row['航次']
eta = row['ETA'].strftime("%Y/%m/%d %H:%M:%S") if row['ETA'] else "N/A"
etd = row['ETD'].strftime("%Y/%m/%d %H:%M:%S") if row['ETD'] else "N/A"
latest_event = row['最新消息']
event_time = row['事件時間'].strftime("%Y/%m/%d %H:%M:%S") if isinstance(row['事件時間'], datetime) else row['事件時間']
event_source = row['事件來源']
updated_at = (row['更新時間'] + timedelta(hours=8)).strftime("%Y/%m/%d %H:%M:%S") if row['更新時間'] else "N/A"

message = f"""
船舶航次號: {ship_voyage_number}
船名: {ship_name}
最新事件: {latest_event}
進港申請: {status_mapper(port_entry_application)}
移泊申請: {status_mapper(berth_shift_application)}
出港申請: {status_mapper(port_departure_application)}
離岸船舶進入: {status_mapper(offshore_vessel_entry)}
停錨: {status_mapper(at_anchor)}
進港進行中: {status_mapper(port_entry_in_progress)}
裝卸通知: {status_mapper(loading_unloading_notice)}
移泊進行中: {status_mapper(berth_shift_in_progress)}
移泊裝卸: {status_mapper(berth_shift_loading_unloading)}
出港進行中: {status_mapper(port_departure_in_progress)}
船舶離港: {status_mapper(vessel_departed)}
船編: {ship_id}
航次: {voyage_number}
ETA: {eta}
ETD: {etd}
最新事件: {latest_event}
事件時間: {event_time}
事件來源: {event_source}
更新時間:
{updated_at_str}"""
{updated_at}"""
return message

def main():
# 從環境變數獲取 Line Notify 權杖
line_notify_token = os.getenv('LINE_NOTIFY_TOKEN')
def main():
original_token = os.getenv('LINE_NOTIFY_TOKEN')
interval_time = int(os.getenv('INTERVAL_TIME', 180))

interval_time = os.getenv('INTERVAL_TIME', 180)
line_notify_tokens = {
'Pilot': os.getenv('LINE_NOTIFY_TOKEN_PILOT'),
'Unmooring': os.getenv('LINE_NOTIFY_TOKEN_UNMOORING'),
'Tugboat': os.getenv('LINE_NOTIFY_TOKEN_TUGBOAT'),
'ShippingAgent': os.getenv('LINE_NOTIFY_TOKEN_SHIPPINGAGENT'),
'ShippingCompany': os.getenv('LINE_NOTIFY_TOKEN_SHIPPINGCOMPANY'),
'LoadingUnloading': os.getenv('LINE_NOTIFY_TOKEN_LOADINGUNLOADING')
}

while True:
print(f'{datetime.now().strftime("%Y-%m-%d %H:%M:%S")} 查看資料庫有無更新')
# 從資料庫獲取最近 interval_time 秒內的訊息
rows = get_recent_ship_statuses(interval_time+1)

for row in rows:
message = format_message(row)
response = send_line_notify(message, line_notify_token)
if response.status_code == 200:
print(f'{datetime.now().strftime("%Y-%m-%d %H:%M:%S")} 通知發送成功: {row[0]}')
latest_event = row['最新消息']

if latest_event in notification_mapping:
stakeholders = notification_mapping[latest_event]
for stakeholder in stakeholders:
if stakeholder not in line_notify_tokens:
continue

token = line_notify_tokens[stakeholder]
if not token:
print(f'{(datetime.now() + timedelta(hours=8)).strftime("%Y-%m-%d %H:%M:%S")} 無法發送通知: {row["船名"]} to {stakeholder}, TOKEN 未設置')
continue

response = send_line_notify(message, token)
if response.status_code == 200:
print(f'{(datetime.now() + timedelta(hours=8)).strftime("%Y-%m-%d %H:%M:%S")} 通知發送成功: {row["船名"]} to {stakeholder}')
else:
print(f'{(datetime.now() + timedelta(hours=8)).strftime("%Y-%m-%d %H:%M:%S")} 通知發送失敗: {row["船名"]} to {stakeholder}, 狀態碼: {response.status_code}')

# Send a copy to the original token
if original_token:
stakeholders_list = "\n".join(stakeholders)
message = f"\n通知對象: \n{stakeholders_list}" + message
response = send_line_notify(message, original_token)
if response.status_code == 200:
print(f'{(datetime.now() + timedelta(hours=8)).strftime("%Y-%m-%d %H:%M:%S")} 通知發送成功: {row["船名"]} - 事件: {latest_event}')
else:
print(f'{(datetime.now() + timedelta(hours=8)).strftime("%Y-%m-%d %H:%M:%S")} 通知發送失敗: {row["船名"]} - 事件: {latest_event}, 狀態碼: {response.status_code}')
else:
print(f'{datetime.now().strftime("%Y-%m-%d %H:%M:%S")} 通知發送失敗: {row[0]}, 狀態碼: {response.status_code}')
print(f'{(datetime.now() + timedelta(hours=8)).strftime("%Y-%m-%d %H:%M:%S")} 未知事件類型: {latest_event}, 船名: {row["船名"]}')
# pass

# 等待 interval_time 秒
time.sleep(interval_time)

if __name__ == "__main__":
main()
main()

0 comments on commit 556ae41

Please sign in to comment.