From 279b22ec0c67a9d013223db53f9efafaaf2bbb17 Mon Sep 17 00:00:00 2001 From: Zsaschz Date: Thu, 27 Jul 2023 09:19:20 +0700 Subject: [PATCH 01/16] attempt to fix missing timestamp from influx --- eews_backend/rest/main.py | 191 ++++++++++++------ eews_backend/rest/sandbox.py | 183 ----------------- eews_backend/stream/__init__.py | 2 + .../kafka.py => stream/main.py} | 0 .../{stream_processing => stream}/topics.py | 0 eews_backend/stream_processing/__init__.py | 0 eews_backend/stream_processing/schema.py | 23 --- 7 files changed, 133 insertions(+), 266 deletions(-) delete mode 100644 eews_backend/rest/sandbox.py create mode 100644 eews_backend/stream/__init__.py rename eews_backend/{stream_processing/kafka.py => stream/main.py} (100%) rename eews_backend/{stream_processing => stream}/topics.py (100%) delete mode 100644 eews_backend/stream_processing/__init__.py delete mode 100644 eews_backend/stream_processing/schema.py diff --git a/eews_backend/rest/main.py b/eews_backend/rest/main.py index 60f10ff..24cb817 100644 --- a/eews_backend/rest/main.py +++ b/eews_backend/rest/main.py @@ -1,10 +1,20 @@ from datetime import timedelta from math import ceil from dotenv import load_dotenv -from fastapi import FastAPI, HTTPException, Response, WebSocket, WebSocketDisconnect, status, UploadFile, BackgroundTasks +from fastapi import ( + FastAPI, + HTTPException, + Response, + WebSocket, + WebSocketDisconnect, + status, + UploadFile, + BackgroundTasks, +) from fastapi.params import Body from fastapi.responses import HTMLResponse from fastapi.staticfiles import StaticFiles +from fastapi.staticfiles import StaticFiles from typing import Dict, List from influxdb_client import Point, WritePrecision from obspy import read @@ -12,9 +22,7 @@ from database.mongodb import * from database.influxdb import * -from stream_processing.schema import * -from stream_processing.kafka import KafkaProducer, BOOTSTRAP_SERVER -from stream_processing.topics import PREPROCESSED_TOPIC +from stream import KafkaProducer, PREPROCESSED_TOPIC from utils import * from .model import * from .websocket import ConnectionManager @@ -75,10 +83,13 @@ """ + + @app.get("/test") async def test(): query_api = client.query_api() - now = datetime(2015,8,20,15,12,1) + now = datetime(2015, 8, 20, 15, 12, 1) + first_starttime = now query = f""" from(bucket: "eews") |> range(start: {(now - timedelta(seconds=1)).isoformat()}Z, stop: {now.isoformat()}Z) @@ -89,20 +100,21 @@ async def test(): # log.debug(data.columns) # log.debug(data.head(50)) data = data.fillna(0) - return data.to_dict() + return data.to_dict() @app.get("/") async def get(): return HTMLResponse(HTML) + @app.websocket("/ws") async def websocket_endpoint(websocket: WebSocket): await manager.connect(websocket) try: query_api = client.query_api() now = datetime.now() - now = datetime(2015,8,20,15,12,1) + # now = datetime(2015, 8, 20, 15, 12, 1) while True: await asyncio.sleep(1) query = f""" @@ -111,6 +123,18 @@ async def websocket_endpoint(websocket: WebSocket): |> filter(fn: (r) => r["_measurement"] == "seismograf") |> pivot(rowKey: ["_time"], columnKey: ["channel", "station"], valueColumn: "_value")""" data: pd.DataFrame = query_api.query_data_frame(query=query) + time_list = [ + _time.isoformat(sep=" ", timespec="microseconds")[:-6] + for _time in data["_time"].to_list() + ] + for i in range(25): + timestamp = ( + now + timedelta(seconds=(i * 0.04)) - timedelta(seconds=1) + ).isoformat(sep=" ", timespec="microseconds") + if timestamp not in time_list: + print(timestamp, time_list) + # data.add(pd.DataFrame()) + log.debug(data) json_data = data.to_json() now += timedelta(seconds=1) @@ -118,23 +142,30 @@ async def websocket_endpoint(websocket: WebSocket): except WebSocketDisconnect: manager.disconnect(websocket) + @app.get("/station", response_model=List[StationModel]) async def list_seismometer(): list_data = await db["station"].find().to_list(1000000000) log.info(list_data) return list_data + @app.get("/station/{name}", response_model=StationModel) async def get_seismometer(name: str): data = await db["station"].find_one({"name": name}) if data is not None: return data - raise HTTPException(status_code=404, detail=f"Seismometer with name {name} not found") + raise HTTPException( + status_code=404, detail=f"Seismometer with name {name} not found" + ) + @app.put("/station/{name}", response_model=StationModel) -async def update_seismometer(name: str, background_task: BackgroundTasks, data: UpdateStationModel = Body(...)): +async def update_seismometer( + name: str, background_task: BackgroundTasks, data: UpdateStationModel = Body(...) +): data = data.model_dump() - + if len(data) >= 1: update_result = await db["station"].update_one({"name": name}, {"$set": data}) @@ -149,24 +180,37 @@ async def update_seismometer(name: str, background_task: BackgroundTasks, data: await adjust_closest_stations() return existing_data - raise HTTPException(status_code=404, detail=f"Seismometer with name {name} not found") + raise HTTPException( + status_code=404, detail=f"Seismometer with name {name} not found" + ) + @app.post("/station", response_model=StationModel, status_code=status.HTTP_201_CREATED) -async def create_seismometer(background_task: BackgroundTasks, data: UpdateStationModel = Body(...)): +async def create_seismometer( + background_task: BackgroundTasks, data: UpdateStationModel = Body(...) +): data = data.model_dump() - if (existing_data := await db["station"].find_one({"name": data["name"]})) is not None: - raise HTTPException(status_code=400, detail=f"Seismometer with name {data['name']} already exists") - + if ( + existing_data := await db["station"].find_one({"name": data["name"]}) + ) is not None: + raise HTTPException( + status_code=400, + detail=f"Seismometer with name {data['name']} already exists", + ) + all_stations = await db["station"].find().to_list(1000000000) calculated = dict() - + data["closest_stations"] = calculate_closest_station(data, all_stations, calculated) - + new_data = await db["station"].insert_one(data) - if (existing_data := await db["station"].find_one({"_id": new_data.inserted_id})) is not None: + if ( + existing_data := await db["station"].find_one({"_id": new_data.inserted_id}) + ) is not None: await adjust_closest_stations() return existing_data + @app.delete("/station/{name}") async def delete_seismometer(name: str, background_task: BackgroundTasks): delete_result = await db["station"].delete_one({"name": name}) @@ -175,7 +219,10 @@ async def delete_seismometer(name: str, background_task: BackgroundTasks): await adjust_closest_stations() return Response(status_code=status.HTTP_204_NO_CONTENT) - raise HTTPException(status_code=404, detail=f"Seismometer with name {name} not found") + raise HTTPException( + status_code=404, detail=f"Seismometer with name {name} not found" + ) + @app.post("/mseed", status_code=status.HTTP_201_CREATED) async def upload_mseed(file: UploadFile, background_tasks: BackgroundTasks): @@ -184,78 +231,101 @@ async def upload_mseed(file: UploadFile, background_tasks: BackgroundTasks): background_tasks.add_task(save_mseed, contents, filename) return {"file_size": file.size, "filename": filename} + @measure_execution_time -async def adjust_closest_stations(all_stations = None): +async def adjust_closest_stations(all_stations=None): log.info("Adjusting closest stations") if not all_stations: all_stations = await db["station"].find().to_list(1000000000) calculated = dict() - + for station in all_stations: - station["closest_stations"] = calculate_closest_station(station, all_stations, calculated) + station["closest_stations"] = calculate_closest_station( + station, all_stations, calculated + ) await db["station"].update_one({"name": station["name"]}, {"$set": station}) - -def calculate_closest_station(curr_station: List[Dict], all_stations: List[Dict], calculated: Dict = None): + + +def calculate_closest_station( + curr_station: List[Dict], all_stations: List[Dict], calculated: Dict = None +): distances = [] - + for other_station in all_stations: - if other_station['name'] == curr_station["name"]: + if other_station["name"] == curr_station["name"]: continue distance = float("inf") if f"{other_station['name']}-{curr_station['name']}" in calculated: distance = calculated[f"{other_station['name']}-{curr_station['name']}"] else: - distance = hs.haversine((curr_station['x'], curr_station['y']), (other_station['x'], other_station['y'])) + distance = hs.haversine( + (curr_station["x"], curr_station["y"]), + (other_station["x"], other_station["y"]), + ) calculated[f"{curr_station['name']}-{other_station['name']}"] = distance - distances.append((other_station['name'], distance)) - + distances.append((other_station["name"], distance)) + distances.sort(key=lambda x: x[1]) return [i[0] for i in distances[:3]] + @measure_execution_time def save_mseed(contents: bytes, filename: str): log.info("Saving mseed on the background") filepath = f"{MODULE_DIR}{STATIC_DIR}{filename}" with open(filepath, "wb") as f: f.write(contents) - + records = [] events = [] traces = process_data(filepath) start = time.monotonic_ns() for mseed_data in traces: - starttime: datetime = UTCDateTime(mseed_data['starttime']).datetime - endtime = UTCDateTime(mseed_data['endtime']).datetime - delta = 1/int(mseed_data['sampling_rate']) + starttime: datetime = UTCDateTime(mseed_data["starttime"]).datetime + endtime = UTCDateTime(mseed_data["endtime"]).datetime + delta = 1 / int(mseed_data["sampling_rate"]) channel = mseed_data["channel"] station = mseed_data["station"] first_starttime = nearest_datetime_rounded(starttime, delta * 10**6) - - log.debug(f"Processing {station}_{channel} from {filename} with len {len(mseed_data['data_interpolated'])}") - - for data_point in mseed_data['data_interpolated']: - point = Point("seismograf").time(first_starttime, write_precision=WritePrecision.MS).tag("channel", channel).tag("station", station).field("data", data_point) - records.append(point) + + log.debug( + f"Processing {station}_{channel} from {filename} with len {len(mseed_data['data_interpolated'])}" + ) + + for data_point in mseed_data["data_interpolated"]: + point = ( + Point("seismograf") + .time(first_starttime, write_precision=WritePrecision.MS) + .tag("channel", channel) + .tag("station", station) + .field("data", data_point) + ) + records.append(point) event = { "station": station, "channel": channel, "time": str(starttime), - "data": data_point + "data": data_point, } events.append(event) first_starttime += timedelta(seconds=delta) - - with InfluxDBClient(url=INFLUXDB_URL, token=INFLUXDB_TOKEN, org=INFLUXDB_ORG) as client: + + with InfluxDBClient( + url=INFLUXDB_URL, token=INFLUXDB_TOKEN, org=INFLUXDB_ORG + ) as client: with client.write_api() as writer: log.debug("Start batch save to InfluxDB") writer.write(bucket="eews", record=records) - + log.debug("Start producing events") - for i in range(len(events)): + for i in range(len(events)): producer.produce_message(events[i]) - - log.debug(f"Finished process mseed with {len(records)} data for {(time.monotonic_ns() - start) / 10**9}s with rate of {len(records)/((time.monotonic_ns() - start) / 10**9)}") + + log.debug( + f"Finished process mseed with {len(records)} data for {(time.monotonic_ns() - start) / 10**9}s with rate of {len(records)/((time.monotonic_ns() - start) / 10**9)}" + ) + @measure_execution_time def process_data(mseed_filename: str): @@ -267,22 +337,23 @@ def process_data(mseed_filename: str): lowcut = 1.0 highcut = 5.0 order = 5 - preprocessed['network'] = detail.stats.network - preprocessed['station'] = detail.stats.station - preprocessed['channel'] = detail.stats.channel - preprocessed['location'] = detail.stats.location - preprocessed['starttime'] = str(detail.stats.starttime) - preprocessed['endtime'] = str(detail.stats.endtime) - preprocessed['delta'] = detail.stats.delta - preprocessed['npts'] = detail.stats.npts - preprocessed['calib'] = detail.stats.calib - preprocessed['data'] = detail.data + preprocessed["network"] = detail.stats.network + preprocessed["station"] = detail.stats.station + preprocessed["channel"] = detail.stats.channel + preprocessed["location"] = detail.stats.location + preprocessed["starttime"] = str(detail.stats.starttime) + preprocessed["endtime"] = str(detail.stats.endtime) + preprocessed["delta"] = detail.stats.delta + preprocessed["npts"] = detail.stats.npts + preprocessed["calib"] = detail.stats.calib + preprocessed["data"] = detail.data data_before = detail.data data_processed = butter_bandpass_filter(data_before, lowcut, highcut, fs, order) data_to = list(data_processed) - data_to = letInterpolate(data_to, int(ceil(len(data_to)*25/detail.stats.sampling_rate))) - preprocessed['sampling_rate'] = 25.0 - preprocessed['data_interpolated'] = data_to + data_to = letInterpolate( + data_to, int(ceil(len(data_to) * 25 / detail.stats.sampling_rate)) + ) + preprocessed["sampling_rate"] = 25.0 + preprocessed["data_interpolated"] = data_to traces.append(preprocessed) return traces - \ No newline at end of file diff --git a/eews_backend/rest/sandbox.py b/eews_backend/rest/sandbox.py deleted file mode 100644 index c883303..0000000 --- a/eews_backend/rest/sandbox.py +++ /dev/null @@ -1,183 +0,0 @@ -from dotenv import load_dotenv -from fastapi import FastAPI, UploadFile, BackgroundTasks, status -from obspy import read -from typing import List, Optional -from motor import motor_asyncio -from pprint import pprint -from functools import wraps -import timeit -import logging -import numpy as np -import faust -import os -import time -# from schema import * - -load_dotenv() - -RAW_TOPIC = 'raw' -RAW_MSEED_TOPIC = 'raw-mseed' -PREPROCESSED_TOPIC = 'preprocessed' -PREDICTION_TOPIC = 'prediction' -STATIC_DIR = "./static/" - -DEFAULT_MONGO_DATABASE = "db" -RAW_MONGO_URL = os.getenv("RAW_MONGO_URL") -RAW_MONGO_DATABASE = os.getenv("RAW_MONGO_DATABASE") if os.getenv("RAW_MONGO_DATABASE") else DEFAULT_MONGO_DATABASE -MONGO_URL = os.getenv("MONGO_URL") -MONGO_DATABASE = os.getenv("MONGO_DATABASE") if os.getenv("MONGO_DATABASE") else DEFAULT_MONGO_DATABASE - -raw_client = motor_asyncio.AsyncIOMotorClient(RAW_MONGO_URL) -raw_db = raw_client[RAW_MONGO_DATABASE] - -client = motor_asyncio.AsyncIOMotorClient(MONGO_URL) -db = client[MONGO_DATABASE] - -BOOTSTRAP_SERVER = os.getenv("BOOTSTRAP_SERVER") -if not BOOTSTRAP_SERVER: - raise Exception("BOOTSTRAP_SERVER is required") - -class RawValue(faust.Record, serializer="json"): - mseed_name: str - filename: str - -def array_to_str_limit_dec(array): - lst = "" - for i in array: - if i == None: - i = "None" - lst += i + " " - else: - lst += '{:.10f}'.format(np.round_(i, 10)) + " " - return lst - -def measure_execution_time(func): - @wraps(func) - def wrapper(*args, **kwargs): - start_time = timeit.default_timer() - result = func(*args, **kwargs) - end_time = timeit.default_timer() - - execution_time = end_time - start_time - print(f"Execution Time of {func.__name__}: {execution_time} seconds") - return result - - return wrapper - -@measure_execution_time -def process_data(mseed_filename): - mseed_data = read(mseed_filename) - traces = [] - # l_diff, first_starttime = add_null_station(gmji, jagi, pwji) - for detail in mseed_data: - preprocessed = {} - fs = detail.stats.sampling_rate - lowcut = 1.0 - highcut = 5.0 - order = 5 - preprocessed['network'] = detail.stats.network - preprocessed['station'] = detail.stats.station - preprocessed['channel'] = detail.stats.channel - preprocessed['location'] = detail.stats.location - preprocessed['starttime'] = str(detail.stats.starttime) - preprocessed['endtime'] = str(detail.stats.endtime) - preprocessed['delta'] = detail.stats.delta - preprocessed['npts'] = detail.stats.npts - preprocessed['calib'] = detail.stats.calib - # data_before = detail.data - # data_processed = butter_bandpass_filter(data_before, lowcut, highcut, fs,order) - # data_processed = normalizations(data_processed) - # data_to = list(data_processed) - preprocessed['data'] = array_to_str_limit_dec(detail.data) - # data_to = letInterpolate(data_to, int(ceil(len(data_to)*25/detail.stats.sampling_rate))) - # if detail.stats.station == "GMJI": - # print(l_diff[0][2]) - # if detail.stats.channel == l_diff[0][2]: - # print('masuk') - # if l_diff[0][1] != 0: - # for i in range(l_diff[0][1]): - # data_to.insert(0, None) - # print(data_processed[0:100]) - # elif detail.stats.station == "JAGI": - # if detail.stats.channel == l_diff[1][2]: - # if l_diff[1][1] != 0: - # for i in range(l_diff[1][1]): - # data_to.insert(0, None) - # elif detail.stats.station == "PWJI": - # if detail.stats.channel == l_diff[2][2]: - # if l_diff[2][1] != 0: - # for i in range(l_diff[2][1]): - # data_to.insert(0, None) - # preprocessed['sampling_rate'] = 25.0 - # preprocessed['data_interpolated'] = array_to_str_limit_dec(data_to) - # preprocessed['starttime_station'] = str(first_starttime) - # datas['first_npts'] = first_npts - # print(data_processed) - # serializer.validated_data['traces'].append(preprocessed) - traces.append(preprocessed) - return traces - - -app = FastAPI() -@app.post("/mseed", status_code=status.HTTP_201_CREATED) -async def upload_mseed(file: UploadFile, background_tasks: BackgroundTasks): - filename = file.filename - # background_tasks.add_task(_save_mseed, file, filename) - background_tasks.add_task(_send_mseed, file, filename) - return {"file_size": file.size, "filename": filename} - -async def _save_mseed(file: UploadFile, filename: str): - print(f"{time.time_ns()} | saving mseed to db") - contents = await file.read() - with open(f"{STATIC_DIR}{filename}", "wb") as f: - f.write(contents) - traces = process_data(f"{STATIC_DIR}{filename}") - # pprint(traces) - await raw_db["mseed"].insert_one({"name": filename, "traces": traces}) - await raw_topic.send(value=RawValue(mseed_id = filename, filename = filename)) - -async def _send_mseed(file: UploadFile, filename: str): - print(f"{time.time_ns()} | sending mseed to kafka") - contents = await file.read() - await raw_mseed_topic.send(key=filename, value=contents, value_serializer="raw") - -# Stream Processing -APP_NAME = "sandbox" -stream = faust.App(APP_NAME, broker=BOOTSTRAP_SERVER, producer_max_request_size=5242880, consumer_max_fetch_size=5242880) -raw_topic = stream.topic(RAW_TOPIC, value_type=RawValue) -raw_mseed_topic = stream.topic(RAW_MSEED_TOPIC, key_serializer="str", value_serializer='raw') - -@stream.agent(raw_topic) -async def process_raw_topic(data): - async for key, value in data.items(): - mseed = await db["mseed"].find_one({"name": value.mseed_name}) - print("found mseed on db", mseed.traces) - - -@stream.agent(raw_mseed_topic) -async def process_raw_mseed_topic(data): - async for key, value in data.items(): - print(f"{time.time_ns()} | received mseed from kafka") - with open(f"{key}", "wb") as f: - f.write(value) - traces = process_data(f"{key}") - print(f"{time.time_ns()} | finished processing data") - # pprint(traces) - await raw_db["mseed"].insert_one({"name": key, "traces": traces}) - print("processed raw bytes", traces) - - -# test_topic = stream.topic("test", partitions=8) -# @stream.agent(test_topic) -# async def test_listen(data): -# async for key, value in data.items(): -# print("RECEIVED MESSAGE", key, value) - -# # Generator app -# generator = faust.App("generator", broker=BOOTSTRAP_SERVER) - -# @generator.timer(interval=1) -# async def data_generator(): -# for i in range(1): -# data = {"name": None, "timestamp": str(time.time())} -# await test_topic.send(value=data) \ No newline at end of file diff --git a/eews_backend/stream/__init__.py b/eews_backend/stream/__init__.py new file mode 100644 index 0000000..dcba930 --- /dev/null +++ b/eews_backend/stream/__init__.py @@ -0,0 +1,2 @@ +from .main import * +from .topics import * \ No newline at end of file diff --git a/eews_backend/stream_processing/kafka.py b/eews_backend/stream/main.py similarity index 100% rename from eews_backend/stream_processing/kafka.py rename to eews_backend/stream/main.py diff --git a/eews_backend/stream_processing/topics.py b/eews_backend/stream/topics.py similarity index 100% rename from eews_backend/stream_processing/topics.py rename to eews_backend/stream/topics.py diff --git a/eews_backend/stream_processing/__init__.py b/eews_backend/stream_processing/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/eews_backend/stream_processing/schema.py b/eews_backend/stream_processing/schema.py deleted file mode 100644 index 70be260..0000000 --- a/eews_backend/stream_processing/schema.py +++ /dev/null @@ -1,23 +0,0 @@ -from typing import List, Optional -import faust - -class RawValue(faust.Record, serializer="json"): - mseed_name: str - filename: str - -class MseedData(faust.Record, serializer="json"): - network: Optional[str] - station: Optional[str] - location: Optional[str] - channel: Optional[str] - starttime: Optional[str] - endtime: Optional[str] - sampling_rate: Optional[float] - delta: Optional[float] - npts: Optional[int] - calib: Optional[float] - data: Optional[str] - -class PreprocessedValue(faust.Record, serializer="json"): - name: str - traces: List[MseedData] \ No newline at end of file From fbe3a4382fe0f674796bf45d7feebeb011ff2d78 Mon Sep 17 00:00:00 2001 From: Zsaschz Date: Sun, 30 Jul 2023 21:28:40 +0700 Subject: [PATCH 02/16] add generator to simulate realtime data --- eews_backend/generator/main.py | 76 +++++++++++ eews_backend/rest/main.py | 54 ++++---- eews_backend/utils/helper_functions.py | 168 ++++++++++++++++--------- 3 files changed, 215 insertions(+), 83 deletions(-) create mode 100644 eews_backend/generator/main.py diff --git a/eews_backend/generator/main.py b/eews_backend/generator/main.py new file mode 100644 index 0000000..24c76e7 --- /dev/null +++ b/eews_backend/generator/main.py @@ -0,0 +1,76 @@ +from ast import List +import requests +from obspy import read, Stream, UTCDateTime, Trace +import time +import os +import shutil +import threading + +MSEED_FOLDER = "eews_backend/mseed/" +SOURCE_MSEED = "eews_backend/generator/20150920_151412.mseed" +REST_URL = "http://127.0.0.1:8000" +MSEED_RANGE_IN_SECONDS = 10 + + +def main(): + split_mseed() + + print("Start sending file to", REST_URL) + threads = [] + for station in os.listdir(MSEED_FOLDER): + send_thread = threading.Thread(target=send, args=(station,)) + send_thread.name = station + threads.append(send_thread) + for thread in threads: + thread.start() + + +def send(station: str): + folder = f"{MSEED_FOLDER}/{station}/" + BHE: List[str] = os.listdir(f"{folder}BHE/") + BHN: List[str] = os.listdir(f"{folder}BHN/") + BHZ: List[str] = os.listdir(f"{folder}BHZ/") + + for index in range(len(BHE)): + bhe_mseed: bytes = open(f"{MSEED_FOLDER}/{station}/BHE/{BHE[index]}", "rb") + bhn_mseed: bytes = open(f"{MSEED_FOLDER}/{station}/BHN/{BHN[index]}", "rb") + bhz_mseed: bytes = open(f"{MSEED_FOLDER}/{station}/BHZ/{BHZ[index]}", "rb") + + threading.Thread( + target=post, args=(f"{REST_URL}/mseed", {"file": bhe_mseed}) + ).start() + threading.Thread( + target=post, args=(f"{REST_URL}/mseed", {"file": bhn_mseed}) + ).start() + threading.Thread( + target=post, args=(f"{REST_URL}/mseed", {"file": bhz_mseed}) + ).start() + + time.sleep(10) + + +def post(url, files): + requests.post(url, files=files) + + +def split_mseed(): + print("Mseed will be saved to folder", MSEED_FOLDER) + print("Splitting mseed") + st: Stream = read(SOURCE_MSEED) + dt = UTCDateTime("2015-08-20T15:12:00") + last_endtime = max([trace.stats["endtime"] for trace in st]) + trace: Trace + shutil.rmtree(MSEED_FOLDER) + while dt <= last_endtime: + trimmed = st.slice(dt, dt + MSEED_RANGE_IN_SECONDS) + for trace in trimmed: + stats = trace.stats + filename = f"{MSEED_FOLDER}{stats['station']}/{stats['channel']}/{dt.strftime('%Y%m%d')}_{stats['starttime'].strftime('%H%M%S')}.mseed" + os.makedirs(os.path.dirname(filename), exist_ok=True) + trace.write(filename=filename, format="MSEED") + dt += MSEED_RANGE_IN_SECONDS + print("Finished splitting mseed") + + +if __name__ == "__main__": + main() diff --git a/eews_backend/rest/main.py b/eews_backend/rest/main.py index 24cb817..7e29012 100644 --- a/eews_backend/rest/main.py +++ b/eews_backend/rest/main.py @@ -1,4 +1,4 @@ -from datetime import timedelta +from datetime import timedelta, timezone from math import ceil from dotenv import load_dotenv from fastapi import ( @@ -14,7 +14,6 @@ from fastapi.params import Body from fastapi.responses import HTMLResponse from fastapi.staticfiles import StaticFiles -from fastapi.staticfiles import StaticFiles from typing import Dict, List from influxdb_client import Point, WritePrecision from obspy import read @@ -33,16 +32,20 @@ import time import asyncio import pandas as pd +import io + +load_dotenv() +dictConfig(logging_config) MODULE_DIR = "./rest/" STATIC_DIR = "static/" +SIMULATE_REALTIME = True if os.getenv("SIMULATE_REALTIME") == "True" else False -load_dotenv() -dictConfig(logging_config) app = FastAPI() log = logging.getLogger("rest") app.mount("/static", StaticFiles(directory=f"{MODULE_DIR}{STATIC_DIR}"), name="static") +log.info(f"{SIMULATE_REALTIME=}") producer = KafkaProducer(PREPROCESSED_TOPIC) manager = ConnectionManager() @@ -95,7 +98,9 @@ async def test(): |> range(start: {(now - timedelta(seconds=1)).isoformat()}Z, stop: {now.isoformat()}Z) |> filter(fn: (r) => r["_measurement"] == "seismograf") |> pivot(rowKey: ["_time"], columnKey: ["channel", "station"], valueColumn: "_value")""" + start = time.monotonic_ns() data: pd.DataFrame = query_api.query_data_frame(query=query) + log.debug(f"{(time.monotonic_ns() - start)/10**9}s") # data = data.drop(columns=["_start", "_stop", "_field", "_measurement", "result", "table"]) # log.debug(data.columns) # log.debug(data.head(50)) @@ -113,32 +118,34 @@ async def websocket_endpoint(websocket: WebSocket): await manager.connect(websocket) try: query_api = client.query_api() - now = datetime.now() + now = datetime.now(tz=timezone.utc) - timedelta(seconds=10) # now = datetime(2015, 8, 20, 15, 12, 1) while True: - await asyncio.sleep(1) + start = time.monotonic_ns() query = f""" from(bucket: "eews") - |> range(start: {(now - timedelta(seconds=1)).isoformat()}Z, stop: {now.isoformat()}Z) + |> range(start: {(now - timedelta(seconds=1)).isoformat()}, stop: {now.isoformat()}) |> filter(fn: (r) => r["_measurement"] == "seismograf") |> pivot(rowKey: ["_time"], columnKey: ["channel", "station"], valueColumn: "_value")""" data: pd.DataFrame = query_api.query_data_frame(query=query) - time_list = [ - _time.isoformat(sep=" ", timespec="microseconds")[:-6] - for _time in data["_time"].to_list() - ] - for i in range(25): - timestamp = ( - now + timedelta(seconds=(i * 0.04)) - timedelta(seconds=1) - ).isoformat(sep=" ", timespec="microseconds") - if timestamp not in time_list: - print(timestamp, time_list) - # data.add(pd.DataFrame()) - + # time_list = [ + # _time.isoformat(sep=" ", timespec="microseconds")[:-6] + # for _time in data["_time"].to_list() + # ] + # for i in range(25): + # timestamp = ( + # now + timedelta(seconds=(i * 0.04)) - timedelta(seconds=1) + # ).isoformat(sep=" ", timespec="microseconds") + # if timestamp not in time_list: + # print(timestamp, time_list) + # # data.add(pd.DataFrame()) + log.debug(now) log.debug(data) json_data = data.to_json() now += timedelta(seconds=1) await manager.broadcast(json_data) + diff = (time.monotonic_ns() - start) / 10**9 + await asyncio.sleep(1 - diff) except WebSocketDisconnect: manager.disconnect(websocket) @@ -227,6 +234,7 @@ async def delete_seismometer(name: str, background_task: BackgroundTasks): @app.post("/mseed", status_code=status.HTTP_201_CREATED) async def upload_mseed(file: UploadFile, background_tasks: BackgroundTasks): filename = file.filename + log.debug(f"Received mseed with name {filename}") contents = await file.read() background_tasks.add_task(save_mseed, contents, filename) return {"file_size": file.size, "filename": filename} @@ -273,17 +281,15 @@ def calculate_closest_station( @measure_execution_time def save_mseed(contents: bytes, filename: str): log.info("Saving mseed on the background") - filepath = f"{MODULE_DIR}{STATIC_DIR}{filename}" - with open(filepath, "wb") as f: - f.write(contents) records = [] events = [] - traces = process_data(filepath) + traces = process_data(io.BytesIO(contents)) start = time.monotonic_ns() for mseed_data in traces: starttime: datetime = UTCDateTime(mseed_data["starttime"]).datetime - endtime = UTCDateTime(mseed_data["endtime"]).datetime + if SIMULATE_REALTIME: + starttime = (UTCDateTime().now() - 10).datetime delta = 1 / int(mseed_data["sampling_rate"]) channel = mseed_data["channel"] station = mseed_data["station"] diff --git a/eews_backend/utils/helper_functions.py b/eews_backend/utils/helper_functions.py index c1540ec..e773703 100644 --- a/eews_backend/utils/helper_functions.py +++ b/eews_backend/utils/helper_functions.py @@ -6,119 +6,163 @@ import numpy as np import pytz + def normalizations(array): # res = array/np.amax(np.abs(array)) - res = array/100000 + res = array / 100000 return res + def array_to_str_limit_dec(array): lst = "" for i in array: - if i == None: + if i == None: i = "None" lst += i + " " - else: - lst += '{:.10f}'.format(np.round_(i, 10)) + " " + else: + lst += "{:.10f}".format(np.round_(i, 10)) + " " return lst + # band pass filter def butter_bandpass(lowcut, highcut, fs, order): nyq = 0.5 * fs low = lowcut / nyq high = highcut / nyq - b, a = butter(order, [low, high], btype='band') + b, a = butter(order, [low, high], btype="band") return b, a -#band pass filter with filtfilt + +# band pass filter with filtfilt def butter_bandpass_filter(data, lowcut, highcut, fs, order): b, a = butter_bandpass(lowcut, highcut, fs, order=order) # y = lfilter(b, a, data) - y = filtfilt(b, a, data) + y = lfilter(b, a, data) return y + def get_Parrival(data1, data2, data3, sampling): E_p = search_Parrival(data1, sampling) N_p = search_Parrival(data2, sampling) Z_p = search_Parrival(data3, sampling) - if len(E_p)== 0 and len(N_p) == 0 and len(Z_p) == 0: + if len(E_p) == 0 and len(N_p) == 0 and len(Z_p) == 0: return -1 - + else: lens = [E_p, N_p, Z_p] r = [] for i in lens: - if len(i) != 0: - r.append(i[0]) + if len(i) != 0: + r.append(i[0]) s = [] for i in r: if len(i) != 0: s.append(i[0]) - return(min(s)) - + return min(s) + + def search_Parrival(data, sampling): - cft = recursive_sta_lta(data, int(2.5 * sampling), int(10. * sampling)) + cft = recursive_sta_lta(data, int(2.5 * sampling), int(10.0 * sampling)) on_of = trigger_onset(cft, 3.3, 0.5) return on_of + def split(strr): strr = list(filter(None, strr.split(" "))) lst = [] for i in strr: - if i == "None": - j = None - lst.append(j) - else: - lst.append(float(i)) + if i == "None": + j = None + lst.append(j) + else: + lst.append(float(i)) return lst + def s_add_starttime(e, n, z, data): - l_enz = [('e', e.timestamp), ('n', n.timestamp), ('z', z.timestamp)] - l_enz.sort(key=lambda a:a[1]) - sample = data[0]['sampling_rate'] - + l_enz = [("e", e.timestamp), ("n", n.timestamp), ("z", z.timestamp)] + l_enz.sort(key=lambda a: a[1]) + sample = data[0]["sampling_rate"] + print(l_enz) l_diff = [] - l_diff.append((l_enz[2][0], int(np.around((l_enz[2][1] - l_enz[0][1])*sample)))) - l_diff.append((l_enz[1][0], int(np.around((l_enz[1][1] - l_enz[0][1])*sample)))) + l_diff.append((l_enz[2][0], int(np.around((l_enz[2][1] - l_enz[0][1]) * sample)))) + l_diff.append((l_enz[1][0], int(np.around((l_enz[1][1] - l_enz[0][1]) * sample)))) l_diff.append((l_enz[0][0], 0)) - + l_diff.sort() print(l_diff) - - data_e = split(data[0]['data_interpolated']) - data_n = split(data[1]['data_interpolated']) - data_z = split(data[2]['data_interpolated']) - + + data_e = split(data[0]["data_interpolated"]) + data_n = split(data[1]["data_interpolated"]) + data_z = split(data[2]["data_interpolated"]) + if l_diff[0][1] != 0: - for i in range(l_diff[0][1]): - data_e.insert(0, 0) - + for i in range(l_diff[0][1]): + data_e.insert(0, 0) + if l_diff[1][1] != 0: - for i in range(l_diff[1][1]): - data_n.insert(0, 0) - + for i in range(l_diff[1][1]): + data_n.insert(0, 0) + if l_diff[2][1] != 0: - for i in range(l_diff[2][1]): - data_z.insert(0, 0) - + for i in range(l_diff[2][1]): + data_z.insert(0, 0) + lst = [data_e, data_n, data_z] - + return lst, l_enz[0][0] - + + def add_null_station(gmji, jagi, pwji): - gmji = sorted(gmji, key= lambda a:a.stats.starttime) - jagi = sorted(jagi, key= lambda a:a.stats.starttime) - pwji = sorted(pwji, key= lambda a:a.stats.starttime) - - l_gjp = [('gmji', UTCDateTime(gmji[0].stats.starttime).timestamp, gmji[0].stats.sampling_rate, gmji[0].stats.channel, gmji[0].stats.starttime, gmji[0].stats.npts), - ('jagi', UTCDateTime(jagi[0].stats.starttime).timestamp, jagi[0].stats.sampling_rate, jagi[0].stats.channel, jagi[0].stats.starttime, gmji[0].stats.npts), - ('pwji', UTCDateTime(pwji[0].stats.starttime).timestamp, pwji[0].stats.sampling_rate, pwji[0].stats.channel, pwji[0].stats.starttime, gmji[0].stats.npts)] - l_gjp = sorted(l_gjp, key= lambda a:a[1]) + gmji = sorted(gmji, key=lambda a: a.stats.starttime) + jagi = sorted(jagi, key=lambda a: a.stats.starttime) + pwji = sorted(pwji, key=lambda a: a.stats.starttime) + + l_gjp = [ + ( + "gmji", + UTCDateTime(gmji[0].stats.starttime).timestamp, + gmji[0].stats.sampling_rate, + gmji[0].stats.channel, + gmji[0].stats.starttime, + gmji[0].stats.npts, + ), + ( + "jagi", + UTCDateTime(jagi[0].stats.starttime).timestamp, + jagi[0].stats.sampling_rate, + jagi[0].stats.channel, + jagi[0].stats.starttime, + gmji[0].stats.npts, + ), + ( + "pwji", + UTCDateTime(pwji[0].stats.starttime).timestamp, + pwji[0].stats.sampling_rate, + pwji[0].stats.channel, + pwji[0].stats.starttime, + gmji[0].stats.npts, + ), + ] + l_gjp = sorted(l_gjp, key=lambda a: a[1]) print(l_gjp) l_diff = [] - l_diff.append((l_gjp[2][0], int(np.around((l_gjp[2][1] - l_gjp[0][1]) * l_gjp[0][2])), l_gjp[2][3])) - l_diff.append((l_gjp[1][0], int(np.around((l_gjp[1][1] - l_gjp[0][1]) * l_gjp[1][2])), l_gjp[1][3])) + l_diff.append( + ( + l_gjp[2][0], + int(np.around((l_gjp[2][1] - l_gjp[0][1]) * l_gjp[0][2])), + l_gjp[2][3], + ) + ) + l_diff.append( + ( + l_gjp[1][0], + int(np.around((l_gjp[1][1] - l_gjp[0][1]) * l_gjp[1][2])), + l_gjp[1][3], + ) + ) l_diff.append((l_gjp[0][0], 0, l_gjp[0][3])) data_first = l_gjp[0][4] # npts_first = l_gjp[0][5] @@ -127,20 +171,26 @@ def add_null_station(gmji, jagi, pwji): return l_diff, data_first + def interpolate(lst, fi): - i, f = int(fi // 1), fi % 1 # Split floating-point index into whole & fractional parts. - j = i+1 if f > 0 else i # Avoid index error. - return (1-f) * lst[i] + f * lst[j] + i, f = ( + int(fi // 1), + fi % 1, + ) # Split floating-point index into whole & fractional parts. + j = i + 1 if f > 0 else i # Avoid index error. + return (1 - f) * lst[i] + f * lst[j] def letInterpolate(inp, new_len): - delta = (len(inp)-1) / (new_len-1) - outp = [interpolate(inp, i*delta) for i in range(new_len)] + delta = (len(inp) - 1) / (new_len - 1) + outp = [interpolate(inp, i * delta) for i in range(new_len)] return outp + def get_current_utc_datetime(): return datetime.now(pytz.utc).strftime("%Y-%m-%d %H:%M:%S.%f") - + + def nearest_datetime_rounded(datetime: datetime, step_in_micros: int = 40000): microsecond = datetime.time().microsecond remainder = microsecond % step_in_micros @@ -149,4 +199,4 @@ def nearest_datetime_rounded(datetime: datetime, step_in_micros: int = 40000): rounded -= timedelta(microseconds=remainder) else: rounded += timedelta(microseconds=(step_in_micros - remainder)) - return rounded \ No newline at end of file + return rounded From a477f14c4d3193370c09dd28ed1e5337fd54b7b6 Mon Sep 17 00:00:00 2001 From: Zsaschz Date: Mon, 31 Jul 2023 15:11:47 +0700 Subject: [PATCH 03/16] refactor db module --- eews_backend/database/influxdb.py | 8 +++----- eews_backend/database/mongodb.py | 12 +++++++----- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/eews_backend/database/influxdb.py b/eews_backend/database/influxdb.py index f56a8da..7e5e550 100644 --- a/eews_backend/database/influxdb.py +++ b/eews_backend/database/influxdb.py @@ -12,8 +12,6 @@ INFLUXDB_URL = os.getenv("INFLUXDB_URL") INFLUXDB_TOKEN = os.getenv("INFLUXDB_TOKEN") -client = InfluxDBClient( - url=INFLUXDB_URL, - org=INFLUXDB_ORG, - token=INFLUXDB_TOKEN -) + +def influx_client(): + return InfluxDBClient(url=INFLUXDB_URL, org=INFLUXDB_ORG, token=INFLUXDB_TOKEN) diff --git a/eews_backend/database/mongodb.py b/eews_backend/database/mongodb.py index 7142927..3711b3d 100644 --- a/eews_backend/database/mongodb.py +++ b/eews_backend/database/mongodb.py @@ -7,12 +7,14 @@ DEFAULT_MONGO_DATABASE = "db" MONGO_URL = os.getenv("MONGO_URL") -MONGO_DATABASE = os.getenv("MONGO_DATABASE") if os.getenv("MONGO_DATABASE") else DEFAULT_MONGO_DATABASE +MONGO_DATABASE = ( + os.getenv("MONGO_DATABASE") + if os.getenv("MONGO_DATABASE") + else DEFAULT_MONGO_DATABASE +) -client = motor_asyncio.AsyncIOMotorClient(MONGO_URL) -db = client[MONGO_DATABASE] - -def new_client(): + +def mongo_client(): mongo_url = MONGO_URL mongo_db = MONGO_DATABASE client = motor_asyncio.AsyncIOMotorClient(mongo_url) From 43bb8b5ae504b045e500527d36c18907ab4972c7 Mon Sep 17 00:00:00 2001 From: Zsaschz Date: Mon, 31 Jul 2023 15:12:46 +0700 Subject: [PATCH 04/16] refactor to use multiprocess --- eews_backend/generator/main.py | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/eews_backend/generator/main.py b/eews_backend/generator/main.py index 24c76e7..c7803b7 100644 --- a/eews_backend/generator/main.py +++ b/eews_backend/generator/main.py @@ -5,24 +5,27 @@ import os import shutil import threading +import multiprocessing MSEED_FOLDER = "eews_backend/mseed/" SOURCE_MSEED = "eews_backend/generator/20150920_151412.mseed" REST_URL = "http://127.0.0.1:8000" MSEED_RANGE_IN_SECONDS = 10 +global process_list +process_list = [] + def main(): split_mseed() - + global process_list print("Start sending file to", REST_URL) - threads = [] for station in os.listdir(MSEED_FOLDER): - send_thread = threading.Thread(target=send, args=(station,)) - send_thread.name = station - threads.append(send_thread) - for thread in threads: - thread.start() + send_process = multiprocessing.Process(target=send, args=(station,)) + send_process.name = station + process_list.append(send_process) + for process in process_list: + process.start() def send(station: str): @@ -73,4 +76,9 @@ def split_mseed(): if __name__ == "__main__": - main() + try: + main() + except KeyboardInterrupt: + for process in process_list: + process.terminate() + process.join() From 94ab50579096025299452338aa5e6aefe2eeedf0 Mon Sep 17 00:00:00 2001 From: Zsaschz Date: Mon, 31 Jul 2023 15:14:02 +0700 Subject: [PATCH 05/16] add option to simulate realtime --- eews_backend/rest/main.py | 42 +++++++++++++++++--------- eews_backend/rest/model.py | 15 +++++---- eews_backend/utils/helper_functions.py | 2 +- 3 files changed, 37 insertions(+), 22 deletions(-) diff --git a/eews_backend/rest/main.py b/eews_backend/rest/main.py index 7e29012..428d111 100644 --- a/eews_backend/rest/main.py +++ b/eews_backend/rest/main.py @@ -14,13 +14,14 @@ from fastapi.params import Body from fastapi.responses import HTMLResponse from fastapi.staticfiles import StaticFiles +from fastapi.middleware.cors import CORSMiddleware from typing import Dict, List from influxdb_client import Point, WritePrecision from obspy import read from logging.config import dictConfig -from database.mongodb import * -from database.influxdb import * +from database.mongodb import mongo_client +from database.influxdb import influx_client from stream import KafkaProducer, PREPROCESSED_TOPIC from utils import * from .model import * @@ -33,6 +34,7 @@ import asyncio import pandas as pd import io +import os load_dotenv() dictConfig(logging_config) @@ -41,14 +43,30 @@ STATIC_DIR = "static/" SIMULATE_REALTIME = True if os.getenv("SIMULATE_REALTIME") == "True" else False +origins = [ + "http://localhost", + "http://localhost:3000", +] app = FastAPI() log = logging.getLogger("rest") app.mount("/static", StaticFiles(directory=f"{MODULE_DIR}{STATIC_DIR}"), name="static") + + +app.add_middleware( + CORSMiddleware, + allow_origins=origins, + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + log.info(f"{SIMULATE_REALTIME=}") producer = KafkaProducer(PREPROCESSED_TOPIC) manager = ConnectionManager() +_, db = mongo_client() +client = influx_client() HTML = """ @@ -101,9 +119,6 @@ async def test(): start = time.monotonic_ns() data: pd.DataFrame = query_api.query_data_frame(query=query) log.debug(f"{(time.monotonic_ns() - start)/10**9}s") - # data = data.drop(columns=["_start", "_stop", "_field", "_measurement", "result", "table"]) - # log.debug(data.columns) - # log.debug(data.head(50)) data = data.fillna(0) return data.to_dict() @@ -118,13 +133,14 @@ async def websocket_endpoint(websocket: WebSocket): await manager.connect(websocket) try: query_api = client.query_api() - now = datetime.now(tz=timezone.utc) - timedelta(seconds=10) - # now = datetime(2015, 8, 20, 15, 12, 1) + now = datetime(2015, 8, 20, 15, 12, 1) + if SIMULATE_REALTIME: + now = datetime.now(tz=timezone.utc) - timedelta(seconds=10) while True: start = time.monotonic_ns() query = f""" from(bucket: "eews") - |> range(start: {(now - timedelta(seconds=1)).isoformat()}, stop: {now.isoformat()}) + |> range(start: {(now - timedelta(seconds=1)).isoformat()}Z, stop: {now.isoformat()}Z) |> filter(fn: (r) => r["_measurement"] == "seismograf") |> pivot(rowKey: ["_time"], columnKey: ["channel", "station"], valueColumn: "_value")""" data: pd.DataFrame = query_api.query_data_frame(query=query) @@ -153,7 +169,6 @@ async def websocket_endpoint(websocket: WebSocket): @app.get("/station", response_model=List[StationModel]) async def list_seismometer(): list_data = await db["station"].find().to_list(1000000000) - log.info(list_data) return list_data @@ -317,12 +332,9 @@ def save_mseed(contents: bytes, filename: str): events.append(event) first_starttime += timedelta(seconds=delta) - with InfluxDBClient( - url=INFLUXDB_URL, token=INFLUXDB_TOKEN, org=INFLUXDB_ORG - ) as client: - with client.write_api() as writer: - log.debug("Start batch save to InfluxDB") - writer.write(bucket="eews", record=records) + with client.write_api() as writer: + log.debug("Start batch save to InfluxDB") + writer.write(bucket="eews", record=records) log.debug("Start producing events") for i in range(len(events)): diff --git a/eews_backend/rest/model.py b/eews_backend/rest/model.py index 7d94930..e00dead 100644 --- a/eews_backend/rest/model.py +++ b/eews_backend/rest/model.py @@ -1,15 +1,18 @@ from typing import List, Optional from pydantic import BaseModel, Field, ConfigDict - + + class UpdateStationModel(BaseModel): name: str = Field() description: Optional[str] = Field() x: float = Field() y: float = Field() - + + class StationModel(UpdateStationModel): closest_stations: List[str] = Field() - + + class MSeedData(BaseModel): network: Optional[str] = Field() station: Optional[str] = Field() @@ -21,9 +24,9 @@ class MSeedData(BaseModel): delta: Optional[float] = Field() npts: Optional[int] = Field() calib: Optional[float] = Field() - data: Optional[str]= Field() - + data: Optional[str] = Field() + + class MSeed(BaseModel): name: str = Field() traces: Optional[List[MSeedData]] = Field() - \ No newline at end of file diff --git a/eews_backend/utils/helper_functions.py b/eews_backend/utils/helper_functions.py index e773703..07660dd 100644 --- a/eews_backend/utils/helper_functions.py +++ b/eews_backend/utils/helper_functions.py @@ -36,8 +36,8 @@ def butter_bandpass(lowcut, highcut, fs, order): # band pass filter with filtfilt def butter_bandpass_filter(data, lowcut, highcut, fs, order): b, a = butter_bandpass(lowcut, highcut, fs, order=order) - # y = lfilter(b, a, data) y = lfilter(b, a, data) + # y = filtfilt(b, a, data) return y From 39fe5ddc752e11a3ca16df1884584b1fea3de358 Mon Sep 17 00:00:00 2001 From: Zsaschz Date: Mon, 31 Jul 2023 15:14:54 +0700 Subject: [PATCH 06/16] map to fetch stations data & refactor --- frontend/src/components/Charts/lineChart.js | 22 +++-- frontend/src/components/Map/Map2.js | 83 +++++++++---------- frontend/src/components/style.css | 2 +- frontend/src/pages/AdminMap.js | 92 +++++++++------------ 4 files changed, 94 insertions(+), 105 deletions(-) diff --git a/frontend/src/components/Charts/lineChart.js b/frontend/src/components/Charts/lineChart.js index 639437f..ca4db1f 100644 --- a/frontend/src/components/Charts/lineChart.js +++ b/frontend/src/components/Charts/lineChart.js @@ -1,6 +1,7 @@ import { useEffect, useState,useRef } from 'react'; import React from 'react'; import { + Label, Line, LineChart, XAxis, @@ -49,35 +50,42 @@ const RealtimeChart = (props) => { Object.entries(value).forEach(([index, point]) => { let points = mappedData[index] != undefined ? mappedData[index] : {} points[key] = point - points["time"] = props.time[index] + points["time"] = new Date(props.time[index]).toLocaleTimeString() mappedData[index] = points }) }) setData((prev) => [...prev.slice(25), ...mappedData]) - console.log(mappedData) }, [props]); + function yAxisFormatter(y) { + if (y > 0) { + return y/y + } else if (y == 0) { + return 0 + } else { + return -(y/y) + } + } + return (

Stasiun {props.stasiun}

- - - + + + -
- ); }; export default RealtimeChart diff --git a/frontend/src/components/Map/Map2.js b/frontend/src/components/Map/Map2.js index c156b4d..b9edd5c 100644 --- a/frontend/src/components/Map/Map2.js +++ b/frontend/src/components/Map/Map2.js @@ -8,30 +8,33 @@ import L from 'leaflet' const Map = (props) => { const [found,setFound] = useState(false); const [coord,setCoord] = useState(new L.circle()); - const customMarkerIconPWJI = divIcon({ - html: '
PWJI
', - iconAnchor: [25, 41], - popupAnchor: [-11, -40] - }); - - const customMarkerIconGMJI = divIcon({ - html: '
GMJI
', - iconAnchor: [25, 41], - popupAnchor: [-11, -40] - }); - // https://unpkg.com/leaflet@1.9.1/dist/images/marker-icon-2x.png - const customMarkerIconJAGI = divIcon({ - html: '
JAGI
', - iconAnchor: [25, 41], - popupAnchor: [-11, -40] - }); + const [stations, setStations] = useState([]) const map = useRef(); - const GMJIpos = [-8.0219, 111.8]; - const PWJIpos = [-8.2732, 113.4441]; - const JAGIpos = [-8.47, 114.15]; - const mapRef = useRef() + const mapRef = useRef(); let lst = useRef([]); + + function customMarkerIcon(station, iconAnchor = [25, 41], popupAnchor = [-11, -40]) { + return divIcon({ + html: `
${station}
`, + iconAnchor: iconAnchor, + popupAnchor: popupAnchor + }) + } + + function popupContent(station, description) { + return `${station}
${description}
` + } + useEffect(() =>{ + async function getStations() { + const response = await fetch(`http://${props.url}/station`) + const jsonData = await response.json() + setStations(jsonData) + jsonData.forEach(element => { + L.marker([element["x"], element["y"]], {icon: customMarkerIcon(element["name"])}).addTo(mapRef.current).bindPopup(popupContent(element["name"], element["description"])); + }); + } + if (mapRef.current == undefined) { mapRef.current = L.map('map', { layers: [ L.tileLayer("https://server.arcgisonline.com/ArcGIS/rest/services/World_Imagery/MapServer/tile/{z}/{y}/{x}") @@ -39,31 +42,24 @@ const Map = (props) => { zoomControl: false }) - .setView(GMJIpos, 6.5) - + .setView([-8.0219, 111.8], 6.5) + L.control.zoom({ - position: 'topright' - }).addTo(mapRef.current); - - const popupContentGMJI = "GMJI
Station Gumukmas, Java
" - const popupContentPWJI = "PWJI
Station Pagerwojo, Java
" - const popupContentJAGI = "JAGI
Station Jajag, Java
" - - L.marker(GMJIpos, {icon: customMarkerIconGMJI}).addTo(mapRef.current).bindPopup(popupContentGMJI); - L.marker(PWJIpos, {icon: customMarkerIconPWJI}).addTo(mapRef.current).bindPopup(popupContentPWJI); - L.marker(JAGIpos, {icon: customMarkerIconJAGI}).addTo(mapRef.current).bindPopup(popupContentJAGI); - + position: 'topright' + }).addTo(mapRef.current); + } + getStations() }, []); useEffect(()=>{ - if(props.jsonGMJI!==null && props.jsonPWJI!==null && props.jsonJAGI!==null){ - if(props.jsonGMJI[99].data_prediction.lat===null && props.jsonJAGI[99].data_prediction.lat===null && props.jsonPWJI[99].data_prediction.lat===null){ + if(props.jsonGMJI !== undefined && props.jsonPWJI !== undefined && props.jsonJAGI !== undefined){ + if(props.jsonGMJI[99].data_prediction.lat===undefined && props.jsonJAGI[99].data_prediction.lat===undefined && props.jsonPWJI[99].data_prediction.lat===undefined){ setFound(false); mapRef.current.removeLayer(coord); } - else{ + else { if(props.jsonGMJI[99].data_prediction.lat!==null){ setSource('GMJI',[props.jsonGMJI[99].data_prediction.lat,props.jsonGMJI[99].data_prediction.long],props.jsonGMJI[99].data_prediction.magnitude,props.jsonGMJI[99].data_prediction.depth) } @@ -76,15 +72,13 @@ const Map = (props) => { } } - - - function setSource(stasiun,koord, mag, depth){ + function setSource(stasiun, koord, mag, depth){ if(found===false){ var magnitude = Math.round((mag + Number.EPSILON) * 100) / 100; var lat = Math.round((koord[0] + Number.EPSILON) * 100) / 100; var long= Math.round((koord[1] + Number.EPSILON) * 100) / 100; var depths = Math.round((depth + Number.EPSILON) * 100) / 100; - let text = 'Koordinat: '+lat+','+long+' Magnitude; '+magnitude+' Kedalaman: '+depths+' KM'+' Stasiun Pendeteksi: '+stasiun; + let text = `Koordinat: ${lat},${long} Magnitude: ${magnitude} Kedalaman: ${depths} KM Stasiun Pendeteksi: ${stasiun}`; var circle = new L.circle (koord, 30000, {color: "red", opacity:.5}).bindPopup(text) setCoord(circle); mapRef.current.addLayer(circle); @@ -93,8 +87,9 @@ const Map = (props) => { } } },[props]) - return ( -
- ); + + return ( +
+ ); } export default Map; \ No newline at end of file diff --git a/frontend/src/components/style.css b/frontend/src/components/style.css index 07abf59..ef9888d 100644 --- a/frontend/src/components/style.css +++ b/frontend/src/components/style.css @@ -138,7 +138,7 @@ width: 60px; } .station-title{ - padding-top: 40px; + margin-top: 60px; padding-left: 40px; font-size: 18px; } diff --git a/frontend/src/pages/AdminMap.js b/frontend/src/pages/AdminMap.js index dc354db..3522927 100644 --- a/frontend/src/pages/AdminMap.js +++ b/frontend/src/pages/AdminMap.js @@ -15,10 +15,8 @@ import Dropdown from "../components/Charts/dropdown"; const RESERVED_FIELD = ["result", "table", "_field", "_measurement", "_start", "_stop", "_time"] const AdminMap = (props) => { - const [seed, setSeed] = useState(''); const [stations, setStations] = useState([]); const [optionSelected, setOptionSelected] = useState(null); - const [mseedSelected, setMseedSelected] = useState(null); const [showTable, setShowTable] = useState(false); const [checklistDisabled, setChecklistDisabled] = useState(true); const [mseed, setMseed] = useState([]); @@ -28,25 +26,12 @@ const AdminMap = (props) => { const [reset,setReset] = useState(false); const [showTab,setShowTab] = useState(true); const [showLegend, setShowLegend] = useState(false); - const prevSeed = useRef(""); - const changeSeed = (i) => { - setSeed(i); - setReset(true); - prevSeed.current = i; - mseed.forEach(func => func(i.value)); - // if(i!==prevSeed){ - // sockGMJI.close(); - // sockJAGI.close(); - // sockPWJI.close(); - // } - } const handleClick = () => setShowTable(!showTable); useEffect(()=>{ - // if(seed!==null){ - const socket = new WebSocket("ws://"+ props.url +"/ws") + const socket = new WebSocket(`ws://${props.url}/ws`) console.log(props.url) socket.onmessage = function(e) { const jsonData = JSON.parse(e.data) @@ -61,11 +46,9 @@ const AdminMap = (props) => { }) setTime(jsonData["_time"]) setData(stations); - console.log(stations) } socket.onclose = console.log('Socket connection closed') - setSocket(socket); - // } + setSocket(socket) }, []) useEffect(() => { @@ -74,43 +57,46 @@ const AdminMap = (props) => { // setShowLegend(true); // } - if(stations.length==0) setShowLegend(false); + if (stations.length == 0) setShowLegend(false); }, [stations]); - return ( -
-
- {/* */} -
- -
-
- -
- {showTab ? -
- - - -
- {data != null && Object.entries(data).map(([key, value]) => { - return - })} - {/* { showGMJI ? : null } */} - {/* { showLegend ?
-o- BHN-o- BHZ-o- BHE| P-Arrival
: null} */} -
- {/* { showTable ? : null} */} -
- -
-
: null} -
+ return ( +
+
+ +
+ +
+
+ +
+ {showTab ? +
+ {/* */} + +
+ {data != null && Object.entries(data).map(([key, value]) => { + return ( + //
+ + /*
+ -o- BHN-o- BHZ-o- BHE| P-Arrival +
*/ + //
+ )})} +
+ {/* { showTable ? : null} */} +
+
- ); +
: null} +
+
+ ); }; export default AdminMap; \ No newline at end of file From f16f512a48e90b2b55df74453da6d4f448f9e9e3 Mon Sep 17 00:00:00 2001 From: Zsaschz Date: Mon, 31 Jul 2023 21:09:39 +0700 Subject: [PATCH 07/16] fix generator file --- eews_backend/generator/main.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/eews_backend/generator/main.py b/eews_backend/generator/main.py index c7803b7..6919244 100644 --- a/eews_backend/generator/main.py +++ b/eews_backend/generator/main.py @@ -60,7 +60,8 @@ def split_mseed(): print("Mseed will be saved to folder", MSEED_FOLDER) print("Splitting mseed") st: Stream = read(SOURCE_MSEED) - dt = UTCDateTime("2015-08-20T15:12:00") + first_starttime = min([trace.stats["starttime"] for trace in st]) + dt = UTCDateTime(first_starttime) last_endtime = max([trace.stats["endtime"] for trace in st]) trace: Trace shutil.rmtree(MSEED_FOLDER) From 166b3a294f7f73978db5ab073111ab9e3a37b2cc Mon Sep 17 00:00:00 2001 From: Zsaschz Date: Thu, 3 Aug 2023 10:08:13 +0700 Subject: [PATCH 08/16] add fill empty timestamp and send key to kafka --- eews_backend/rest/main.py | 37 ++++++++-------- eews_backend/stream/main.py | 46 ++++++++++++-------- eews_backend/utils/helper_functions.py | 25 +++++++++++ requirements.txt | 59 -------------------------- 4 files changed, 71 insertions(+), 96 deletions(-) delete mode 100644 requirements.txt diff --git a/eews_backend/rest/main.py b/eews_backend/rest/main.py index 428d111..9250b23 100644 --- a/eews_backend/rest/main.py +++ b/eews_backend/rest/main.py @@ -109,18 +109,20 @@ @app.get("/test") async def test(): query_api = client.query_api() - now = datetime(2015, 8, 20, 15, 12, 1) + now = datetime(2015, 8, 20, 15, 11, 47, tzinfo=timezone.utc) first_starttime = now query = f""" from(bucket: "eews") - |> range(start: {(now - timedelta(seconds=1)).isoformat()}Z, stop: {now.isoformat()}Z) + |> range(start: {(now - timedelta(seconds=1)).isoformat()}, stop: {now.isoformat()}) |> filter(fn: (r) => r["_measurement"] == "seismograf") |> pivot(rowKey: ["_time"], columnKey: ["channel", "station"], valueColumn: "_value")""" start = time.monotonic_ns() data: pd.DataFrame = query_api.query_data_frame(query=query) + log.debug(data) + extended_data = fill_empty_timestamp((now - timedelta(seconds=1)), now, data) + print(extended_data) log.debug(f"{(time.monotonic_ns() - start)/10**9}s") - data = data.fillna(0) - return data.to_dict() + return extended_data.to_dict() @app.get("/") @@ -133,36 +135,29 @@ async def websocket_endpoint(websocket: WebSocket): await manager.connect(websocket) try: query_api = client.query_api() - now = datetime(2015, 8, 20, 15, 12, 1) + now = datetime(2015, 8, 20, 15, 12, 1, tzinfo=timezone.utc) if SIMULATE_REALTIME: now = datetime.now(tz=timezone.utc) - timedelta(seconds=10) while True: start = time.monotonic_ns() query = f""" from(bucket: "eews") - |> range(start: {(now - timedelta(seconds=1)).isoformat()}Z, stop: {now.isoformat()}Z) + |> range(start: {(now - timedelta(seconds=1)).isoformat()}, stop: {now.isoformat()}) |> filter(fn: (r) => r["_measurement"] == "seismograf") |> pivot(rowKey: ["_time"], columnKey: ["channel", "station"], valueColumn: "_value")""" data: pd.DataFrame = query_api.query_data_frame(query=query) - # time_list = [ - # _time.isoformat(sep=" ", timespec="microseconds")[:-6] - # for _time in data["_time"].to_list() - # ] - # for i in range(25): - # timestamp = ( - # now + timedelta(seconds=(i * 0.04)) - timedelta(seconds=1) - # ).isoformat(sep=" ", timespec="microseconds") - # if timestamp not in time_list: - # print(timestamp, time_list) - # # data.add(pd.DataFrame()) + extended_data = fill_empty_timestamp( + (now - timedelta(seconds=1)), now, data + ) log.debug(now) log.debug(data) - json_data = data.to_json() + json_data = extended_data.to_json() now += timedelta(seconds=1) await manager.broadcast(json_data) diff = (time.monotonic_ns() - start) / 10**9 await asyncio.sleep(1 - diff) - except WebSocketDisconnect: + except Exception: + log.warning(f"Client {websocket} has been disconnected") manager.disconnect(websocket) @@ -338,7 +333,9 @@ def save_mseed(contents: bytes, filename: str): log.debug("Start producing events") for i in range(len(events)): - producer.produce_message(events[i]) + producer.produce_message( + events[i], f"{events[i]['channel']}_{events[i]['station']}" + ) log.debug( f"Finished process mseed with {len(records)} data for {(time.monotonic_ns() - start) / 10**9}s with rate of {len(records)/((time.monotonic_ns() - start) / 10**9)}" diff --git a/eews_backend/stream/main.py b/eews_backend/stream/main.py index acd0bd7..18d2da4 100644 --- a/eews_backend/stream/main.py +++ b/eews_backend/stream/main.py @@ -4,7 +4,7 @@ from confluent_kafka import KafkaError from confluent_kafka import Message from utils.helper_functions import get_current_utc_datetime -from typing import Dict +from typing import Any, Dict from typing import Optional import logging import os @@ -17,14 +17,14 @@ raise Exception("BOOTSTRAP_SERVER env is required") DEFAULT_CONSUMER_CONFIG = { - 'bootstrap.servers': BOOTSTRAP_SERVER, - 'default.topic.config': { - 'auto.offset.reset': 'latest', # if there's no initial offset, use latest + "bootstrap.servers": BOOTSTRAP_SERVER, + "default.topic.config": { + "auto.offset.reset": "latest", # if there's no initial offset, use latest }, } DEFAULT_PRODUCER_CONFIG = { - 'bootstrap.servers': BOOTSTRAP_SERVER, + "bootstrap.servers": BOOTSTRAP_SERVER, # 'compression.type': 'lz4', # 'linger.ms': 100, # 'batch.size': 131072, # 128 KB @@ -33,18 +33,20 @@ logging.debug(f"Consumer config: {DEFAULT_CONSUMER_CONFIG}") logging.info(f"Producer config: {DEFAULT_PRODUCER_CONFIG}") + def serialize_json(obj): - return json.dumps(obj).encode('utf-8') + return json.dumps(obj).encode("utf-8") def deserialize_json(obj: bytes): - decoded = obj.decode('utf-8') + decoded = obj.decode("utf-8") try: return json.loads(decoded) except: return decoded + class KafkaProducer: def __init__( self, @@ -69,6 +71,7 @@ def __init__( def produce_message( self, value: object, + key: Optional[Any] = None, callback_function: Optional[Callable[[str, str], None]] = None, ): value[f"injected_to_{self.topic_name}_at"] = get_current_utc_datetime() @@ -77,18 +80,25 @@ def produce_message( topic=self.topic_name, value=self.value_serializer(value), on_delivery=self.get_on_delivery_function(callback_function), + key=key, ) self.producer.poll(0) def log_on_kafka_message_delivery(self, error: Optional[str], message: str): if error is not None: - logging.error(f"Failed to produce message: {message.value()}, topic: {self.topic_name} error: {error}") + logging.error( + f"Failed to produce message: {message.value()}, topic: {self.topic_name} error: {error}" + ) else: - logging.debug(f"Successfully produced message: {message.value()}, topic: {self.topic_name}") + logging.debug( + f"Successfully produced message: {message.value()}, topic: {self.topic_name}" + ) - def get_on_delivery_function(self, extra_function: Optional[Callable[[str, str], None]]): + def get_on_delivery_function( + self, extra_function: Optional[Callable[[str, str], None]] + ): if extra_function is None: return self.log_on_kafka_message_delivery @@ -97,8 +107,8 @@ def get_on_delivery_function(self, extra_function: Optional[Callable[[str, str], extra_function(error, message), ) -class KafkaConsumer: +class KafkaConsumer: def __init__( self, topic_name: str, @@ -106,11 +116,13 @@ def __init__( extra_config: Dict, value_deserializer: Optional[Callable[[object], bytes]] = None, ): - self.consumer = Consumer({ - **DEFAULT_CONSUMER_CONFIG, - "group.id": group_id, - **extra_config, - }) + self.consumer = Consumer( + { + **DEFAULT_CONSUMER_CONFIG, + "group.id": group_id, + **extra_config, + } + ) self.consumer.subscribe([topic_name]) self.value_deserializer = value_deserializer @@ -141,4 +153,4 @@ def consume( except Exception as e: logging.error("Error: %s", e) - self.consumer.close() \ No newline at end of file + self.consumer.close() diff --git a/eews_backend/utils/helper_functions.py b/eews_backend/utils/helper_functions.py index 07660dd..ffe6941 100644 --- a/eews_backend/utils/helper_functions.py +++ b/eews_backend/utils/helper_functions.py @@ -3,9 +3,12 @@ from obspy import UTCDateTime from datetime import datetime, timedelta from pprint import pprint +import pandas as pd import numpy as np import pytz +from .wrapper import measure_execution_time + def normalizations(array): # res = array/np.amax(np.abs(array)) @@ -200,3 +203,25 @@ def nearest_datetime_rounded(datetime: datetime, step_in_micros: int = 40000): else: rounded += timedelta(microseconds=(step_in_micros - remainder)) return rounded + + +@measure_execution_time +def fill_empty_timestamp( + start: datetime, + end: datetime, + data: pd.DataFrame, + data_key: str = "_time", + step_in_micros: int = 40000, +) -> pd.DataFrame: + diff = end - start + diff_in_micros = (diff.seconds * 10**6) + (diff.microseconds) + time_list = set( + [ + (start + timedelta(microseconds=i)) + for i in range(0, diff_in_micros, step_in_micros) + ] + ) + time_in_data = set([_time for _time in data[data_key].to_list()]) + time_not_in_data = pd.DataFrame(list(time_list - time_in_data), columns=[data_key]) + extended_data = pd.concat([data, time_not_in_data], ignore_index=True) + return extended_data.sort_values(by=["_time"], ignore_index=True).fillna(0) diff --git a/requirements.txt b/requirements.txt deleted file mode 100644 index 78b4ac2..0000000 --- a/requirements.txt +++ /dev/null @@ -1,59 +0,0 @@ -asgiref==3.7.2 -attrs==23.1.0 -autobahn==23.1.2 -Automat==22.10.0 -backports.zoneinfo==0.2.1 -certifi==2023.5.7 -cffi==1.15.1 -channels==4.0.0 -charset-normalizer==3.2.0 -confluent-kafka==2.2.0 -constantly==15.1.0 -contourpy==1.1.0 -cryptography==41.0.2 -cycler==0.11.0 -daphne==4.0.0 -decorator==5.1.1 -Django==4.2.3 -django-filter==23.2 -django-storages==1.13.2 -djangorestframework==3.14.0 -dnspython==2.3.0 -fonttools==4.41.0 -greenlet==2.0.2 -hyperlink==21.0.0 -idna==3.4 -importlib-metadata==6.8.0 -importlib-resources==6.0.0 -incremental==22.10.0 -kiwisolver==1.4.4 -lxml==4.9.3 -Markdown==3.4.3 -matplotlib==3.7.2 -numpy==1.24.4 -obspy==1.4.0 -packaging==23.1 -Pillow==10.0.0 -pyasn1==0.5.0 -pyasn1-modules==0.3.0 -pycparser==2.21 -pymongo==4.4.0 -pyOpenSSL==23.2.0 -pyparsing==3.0.9 -python-dateutil==2.8.2 -python-dotenv==1.0.0 -pytz==2023.3 -requests==2.31.0 -scipy==1.10.1 -service-identity==23.1.0 -six==1.16.0 -SQLAlchemy==2.0.18 -sqlparse==0.4.4 -Twisted==22.10.0 -twisted-iocpsupport==1.0.3 -txaio==23.1.1 -typing_extensions==4.7.1 -tzdata==2023.3 -urllib3==2.0.3 -zipp==3.16.1 -zope.interface==6.0 From f210c4e07d908b48ec94082e10325fee7ac2c8bd Mon Sep 17 00:00:00 2001 From: Zsaschz Date: Thu, 3 Aug 2023 10:40:24 +0700 Subject: [PATCH 09/16] fix time format for kafka event --- eews_backend/rest/main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eews_backend/rest/main.py b/eews_backend/rest/main.py index 9250b23..6c28321 100644 --- a/eews_backend/rest/main.py +++ b/eews_backend/rest/main.py @@ -321,7 +321,7 @@ def save_mseed(contents: bytes, filename: str): event = { "station": station, "channel": channel, - "time": str(starttime), + "time": f"{first_starttime.isoformat()}Z", "data": data_point, } events.append(event) From c6d823665abdf703414eb87ea3a37f5562d0812d Mon Sep 17 00:00:00 2001 From: Zsaschz Date: Tue, 15 Aug 2023 16:43:37 +0700 Subject: [PATCH 10/16] add nginx conf --- .gitignore | 2 + docker-compose.kafka.yml | 87 ++++++++++++++++++++++++++++++++++++++++ docker-compose.yml | 41 ++++++++++++++++++- nginx/nginx.conf | 23 +++++++++++ 4 files changed, 152 insertions(+), 1 deletion(-) create mode 100644 docker-compose.kafka.yml create mode 100644 nginx/nginx.conf diff --git a/.gitignore b/.gitignore index 3ba3cdd..8560c64 100644 --- a/.gitignore +++ b/.gitignore @@ -11,6 +11,8 @@ db.sqlite3 /data/ /mseed/ *.mseed +*.zip +*.pick # See https://help.github.com/articles/ignoring-files/ for more about ignoring files. diff --git a/docker-compose.kafka.yml b/docker-compose.kafka.yml new file mode 100644 index 0000000..138265a --- /dev/null +++ b/docker-compose.kafka.yml @@ -0,0 +1,87 @@ +version: "3.3" + +services: + zookeeper: + restart: always + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 + image: confluentinc/cp-zookeeper:latest + ports: + - "2181:2181/tcp" + volumes: + - ./zookeeper/data:/data + - ./zookeeper/data/datalog:/datalog + + kafka: + restart: always + environment: + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://34.128.127.171:9092 + KAFKA_BROKER_ID: 1 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_JMX_HOSTNAME: 34.128.127.171 + KAFKA_JMX_PORT: 9999 + image: confluentinc/cp-kafka:latest + user: root + ports: + - "9092:9092" + - "9999:9999" + volumes: + - ./kafka/data/kafka-1:/var/lib/kafka/data + depends_on: + - zookeeper + + kafka-2: + restart: always + environment: + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-2:29092,PLAINTEXT_HOST://34.128.127.171:9093 + KAFKA_BROKER_ID: 2 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_JMX_HOSTNAME: 34.128.127.171 + KAFKA_JMX_PORT: 9999 + image: confluentinc/cp-kafka:latest + user: root + ports: + - "9093:9093" + - "9998:9999" + volumes: + - ./kafka/data/kafka-2:/var/lib/kafka/data + depends_on: + - zookeeper + + kafka-3: + restart: always + environment: + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-3:29092,PLAINTEXT_HOST://34.128.127.171:9094 + KAFKA_BROKER_ID: 3 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_JMX_HOSTNAME: 34.128.127.171 + KAFKA_JMX_PORT: 9999 + image: confluentinc/cp-kafka:latest + user: root + ports: + - "9094:9094" + - "9997:9999" + volumes: + - ./kafka/data/kafka-3:/var/lib/kafka/data + depends_on: + - zookeeper + - ./cassandra-sink/kafka-connect/plugins:/data/connectors + + # Utility + kafka-ui: + image: provectuslabs/kafka-ui + container_name: kafka-ui + ports: + - "9080:8080" + restart: always + environment: + KAFKA_CLUSTERS_0_NAME: local + KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: PLAINTEXT://kafka:29092,PLAINTEXT://kafka-2:29092,PLAINTEXT://kafka-3:29092 + KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://kafka-schema-registry:8081/ diff --git a/docker-compose.yml b/docker-compose.yml index 4f613fd..0870023 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,11 +1,18 @@ version: '3' services: + nginx_lb: + image: nginx + ports: + - 8080:8080 + volumes: + - ./nginx/nginx.conf:/etc/nginx/nginx.conf + mongodb: image: mongo restart: always ports: - - 27018:27017 + - 27017:27017 volumes: - ./data/mongodb:/data/db @@ -35,6 +42,36 @@ services: - ./data/zookeeper/data:/data - ./data/zookeeper/data/datalog:/datalog + spark-master: + image: risw/spark-python:v-1 + ports: + - "8080:8080" + environment: + - SPARK_MODE=master + volumes: + - ./data/spark/master:/data + + spark-worker-1: + image: risw/spark-python:v-1 + environment: + - SPARK_MODE=worker + - SPARK_MASTER_URL=spark://spark-master:7077 + depends_on: + - spark-master + volumes: + - ./data/spark/worker-1:/data + + spark-worker-2: + image: risw/spark-python:v-1 + environment: + - SPARK_MODE=worker + - SPARK_MASTER_URL=spark://spark-master:7077 + depends_on: + - spark-master + volumes: + - ./data/spark/worker-2:/data + + kafka: restart: always environment: @@ -65,3 +102,5 @@ services: environment: KAFKA_CLUSTERS_0_NAME: local KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092 + KAFKA_CLUSTERS_1_NAME: cloud + KAFKA_CLUSTERS_1_BOOTSTRAPSERVERS: 34.101.119.214:9092,34.101.119.214:9093,34.101.119.214:9094 diff --git a/nginx/nginx.conf b/nginx/nginx.conf new file mode 100644 index 0000000..5cb38ff --- /dev/null +++ b/nginx/nginx.conf @@ -0,0 +1,23 @@ +worker_processes auto; + +events { + worker_connections 1024; +} + +http { + client_max_body_size 10M; + upstream backend { + ip_hash; + server host.docker.internal:8000; + server host.docker.internal:8001; + } + + server { + listen 8080; + server_name my_load_balancer; + + location / { + proxy_pass http://backend; + } + } +} \ No newline at end of file From bf0ec4ff7fe0ad4d696e2fea419706be3ecb5f1c Mon Sep 17 00:00:00 2001 From: Zsaschz Date: Tue, 15 Aug 2023 16:45:43 +0700 Subject: [PATCH 11/16] add seedlink realtime query and change window data on kafka --- eews_backend/rest/main.py | 156 +++++++++++++++----------- eews_backend/seedlink/__init__.py | 0 eews_backend/seedlink/main.py | 29 +++++ eews_backend/seedlink/seedlink.py | 177 ++++++++++++++++++++++++++++++ eews_backend/stream/topics.py | 7 +- 5 files changed, 301 insertions(+), 68 deletions(-) create mode 100644 eews_backend/seedlink/__init__.py create mode 100644 eews_backend/seedlink/main.py create mode 100644 eews_backend/seedlink/seedlink.py diff --git a/eews_backend/rest/main.py b/eews_backend/rest/main.py index 6c28321..ca90552 100644 --- a/eews_backend/rest/main.py +++ b/eews_backend/rest/main.py @@ -15,9 +15,10 @@ from fastapi.responses import HTMLResponse from fastapi.staticfiles import StaticFiles from fastapi.middleware.cors import CORSMiddleware +from influxdb_client.client.flux_table import TableList from typing import Dict, List from influxdb_client import Point, WritePrecision -from obspy import read +from obspy import Stream, Trace, read from logging.config import dictConfig from database.mongodb import mongo_client @@ -42,6 +43,7 @@ MODULE_DIR = "./rest/" STATIC_DIR = "static/" SIMULATE_REALTIME = True if os.getenv("SIMULATE_REALTIME") == "True" else False +MSEED_RANGE_IN_SECONDS = 30 origins = [ "http://localhost", @@ -110,19 +112,24 @@ async def test(): query_api = client.query_api() now = datetime(2015, 8, 20, 15, 11, 47, tzinfo=timezone.utc) + now = datetime.now(tz=timezone.utc) first_starttime = now query = f""" from(bucket: "eews") - |> range(start: {(now - timedelta(seconds=1)).isoformat()}, stop: {now.isoformat()}) + |> range(start: {(now - timedelta(seconds=60)).isoformat()}, stop: {now.isoformat()}) |> filter(fn: (r) => r["_measurement"] == "seismograf") |> pivot(rowKey: ["_time"], columnKey: ["channel", "station"], valueColumn: "_value")""" start = time.monotonic_ns() data: pd.DataFrame = query_api.query_data_frame(query=query) - log.debug(data) - extended_data = fill_empty_timestamp((now - timedelta(seconds=1)), now, data) - print(extended_data) + data2: TableList = query_api.query(query=query) + # TODO: Update result for easier handling in frontend + result = {} + # log.debug(data) + # extended_data = fill_empty_timestamp((now - timedelta(seconds=1)), now, data) + # print(extended_data) log.debug(f"{(time.monotonic_ns() - start)/10**9}s") - return extended_data.to_dict() + return data2.to_json() + # return extended_data.to_dict() @app.get("/") @@ -137,7 +144,9 @@ async def websocket_endpoint(websocket: WebSocket): query_api = client.query_api() now = datetime(2015, 8, 20, 15, 12, 1, tzinfo=timezone.utc) if SIMULATE_REALTIME: - now = datetime.now(tz=timezone.utc) - timedelta(seconds=10) + now = datetime.now(tz=timezone.utc) - timedelta( + seconds=MSEED_RANGE_IN_SECONDS + ) while True: start = time.monotonic_ns() query = f""" @@ -149,6 +158,8 @@ async def websocket_endpoint(websocket: WebSocket): extended_data = fill_empty_timestamp( (now - timedelta(seconds=1)), now, data ) + # TODO: Update result for easier handling in frontend + result = {} log.debug(now) log.debug(data) json_data = extended_data.to_json() @@ -291,84 +302,99 @@ def calculate_closest_station( @measure_execution_time def save_mseed(contents: bytes, filename: str): log.info("Saving mseed on the background") + st = read(io.BytesIO(contents)) + + log.debug(f"Stream {st}") + + if len(st) > 0: + first_starttime = min([trace.stats["starttime"] for trace in st]) + first_endtime = min([trace.stats["endtime"] for trace in st]) + + processed = process_data(st) + produce_windowed_data(processed, first_starttime, first_endtime) + save_to_influx(st) + + +@measure_execution_time +def produce_windowed_data(stream: Stream, first_starttime, first_endtime): + dt = UTCDateTime(first_starttime) + + while dt + 8 <= first_endtime: + windowed_data = [None, None, None] + trimmed = stream.slice(dt, dt + 8, keep_empty_traces=True) + if len(trimmed) > 0: + event = { + "station": trimmed[0].stats["station"], + } + for detail in trimmed: + if detail.stats["channel"] == "BHE": + windowed_data[0] = detail.data + event["BHE"] = { + "starttime": str(detail.stats.starttime), + "endtime": str(detail.stats.endtime), + "data": detail.data.tolist(), + } + elif detail.stats["channel"] == "BHN": + windowed_data[1] = detail.data + event["BHN"] = { + "starttime": str(detail.stats.starttime), + "endtime": str(detail.stats.endtime), + "data": detail.data.tolist(), + } + elif detail.stats["channel"] == "BHZ": + windowed_data[2] = detail.data + event["BHZ"] = { + "starttime": str(detail.stats.starttime), + "endtime": str(detail.stats.endtime), + "data": detail.data.tolist(), + } + producer.produce_message(event, event["station"]) + dt += 0.04 + return dt + +@measure_execution_time +def save_to_influx(stream: Stream): + trace: Trace records = [] - events = [] - traces = process_data(io.BytesIO(contents)) - start = time.monotonic_ns() - for mseed_data in traces: - starttime: datetime = UTCDateTime(mseed_data["starttime"]).datetime - if SIMULATE_REALTIME: - starttime = (UTCDateTime().now() - 10).datetime - delta = 1 / int(mseed_data["sampling_rate"]) - channel = mseed_data["channel"] - station = mseed_data["station"] - first_starttime = nearest_datetime_rounded(starttime, delta * 10**6) - - log.debug( - f"Processing {station}_{channel} from {filename} with len {len(mseed_data['data_interpolated'])}" - ) + for trace in stream: + starttime: datetime = UTCDateTime(trace.stats.starttime).datetime + delta = 1 / int(trace.stats.sampling_rate) + channel = trace.stats.channel + station = trace.stats.station - for data_point in mseed_data["data_interpolated"]: + for data_point in trace.data: point = ( Point("seismograf") - .time(first_starttime, write_precision=WritePrecision.MS) + .time(starttime, write_precision=WritePrecision.MS) .tag("channel", channel) .tag("station", station) .field("data", data_point) ) records.append(point) - event = { - "station": station, - "channel": channel, - "time": f"{first_starttime.isoformat()}Z", - "data": data_point, - } - events.append(event) - first_starttime += timedelta(seconds=delta) + starttime += timedelta(seconds=delta) with client.write_api() as writer: - log.debug("Start batch save to InfluxDB") + log.info(f"Start batch save of {len(records)} data to InfluxDB") writer.write(bucket="eews", record=records) - log.debug("Start producing events") - for i in range(len(events)): - producer.produce_message( - events[i], f"{events[i]['channel']}_{events[i]['station']}" - ) - - log.debug( - f"Finished process mseed with {len(records)} data for {(time.monotonic_ns() - start) / 10**9}s with rate of {len(records)/((time.monotonic_ns() - start) / 10**9)}" - ) - @measure_execution_time -def process_data(mseed_filename: str): - mseed_data = read(mseed_filename) - traces = [] +def process_data(stream: Stream): + mseed_data = stream + new_stream = Stream() + detail: Trace for detail in mseed_data: - preprocessed = {} + trace = detail.copy() fs = detail.stats.sampling_rate lowcut = 1.0 highcut = 5.0 order = 5 - preprocessed["network"] = detail.stats.network - preprocessed["station"] = detail.stats.station - preprocessed["channel"] = detail.stats.channel - preprocessed["location"] = detail.stats.location - preprocessed["starttime"] = str(detail.stats.starttime) - preprocessed["endtime"] = str(detail.stats.endtime) - preprocessed["delta"] = detail.stats.delta - preprocessed["npts"] = detail.stats.npts - preprocessed["calib"] = detail.stats.calib - preprocessed["data"] = detail.data data_before = detail.data data_processed = butter_bandpass_filter(data_before, lowcut, highcut, fs, order) - data_to = list(data_processed) - data_to = letInterpolate( - data_to, int(ceil(len(data_to) * 25 / detail.stats.sampling_rate)) - ) - preprocessed["sampling_rate"] = 25.0 - preprocessed["data_interpolated"] = data_to - traces.append(preprocessed) - return traces + trace.data = data_processed + trace.interpolate(25) + trace.stats["delta"] = 1 / 25 + trace.stats["sampling_rate"] = 25 + new_stream.append(trace) + return new_stream diff --git a/eews_backend/seedlink/__init__.py b/eews_backend/seedlink/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/eews_backend/seedlink/main.py b/eews_backend/seedlink/main.py new file mode 100644 index 0000000..6953c67 --- /dev/null +++ b/eews_backend/seedlink/main.py @@ -0,0 +1,29 @@ +from typing import List +from .seedlink import Seedlink +import multiprocessing + +global process_list +process_list: List[multiprocessing.Process] = [] + + +def seedlink_process(station: str): + client = Seedlink(station) + client.start() + + +def main(): + stations = ["JAGI", "BNDI"] + for station in stations: + process = multiprocessing.Process(target=seedlink_process, args=(station,)) + process.name = f"seedlink_{station}" + process_list.append(process) + for process in process_list: + process.start() + + +if __name__ == "__main__": + try: + main() + except KeyboardInterrupt: + for process in process_list: + process.join() diff --git a/eews_backend/seedlink/seedlink.py b/eews_backend/seedlink/seedlink.py new file mode 100644 index 0000000..653f5ce --- /dev/null +++ b/eews_backend/seedlink/seedlink.py @@ -0,0 +1,177 @@ +from math import ceil +from typing import List +from influxdb_client import Point, WritePrecision +from obspy import Stream, Trace, UTCDateTime +from obspy.clients.fdsn import Client +from obspy.clients.seedlink import Client as SeedlinkClient +from stream.main import KafkaProducer +from stream.topics import RAW_TOPIC, PREPROCESSED_TOPIC +from database.influxdb import influx_client +from utils import * +from pprint import pprint + +import logging +import logging +import time +import pandas as pd +import os + +logger = logging.getLogger(__name__) +logger.setLevel("DEBUG") + + +class Seedlink: + def __init__( + self, + stations: str, + network: str = "GE", + channels: str = "BH*", + buffer_size: int = 9, + poll_interval: int = 30, + ) -> None: + self.buffer = [] + self.buffer_size = buffer_size + self.poll_interval = poll_interval + self.stations = stations + self.channels = channels + self.network = network + + self.geofon_client = client = Client("GEOFON") + self.inventory = client.get_stations( + network="*", + station="*", + starttime="2011-03-11T00:00:00", + endtime="2018-03-11T00:00:00", + ) + logger.info(f"Geofon inventory \n {self.inventory}") + self.seedlink_client = SeedlinkClient("geofon.gfz-potsdam.de", 18000) + self.influx_client = influx_client() + self.producer = KafkaProducer(PREPROCESSED_TOPIC) + + def start(self): + # Set the start and end times for the plot + endtime = UTCDateTime.now() # now + starttime = endtime - self.poll_interval + + while True: + start = time.monotonic_ns() + diff = 0 + logger.debug("Getting waveform data") + st = self.seedlink_client.get_waveforms( + self.network, self.stations, "*", self.channels, starttime, endtime + ) + logger.debug(f"Stream {st}") + + # Append the new data to the buffer + self.buffer.append(st) + logger.debug(f"Buffer {self.buffer}") + + # If the buffer has grown larger than the buffer size, remove the oldest data + if len(self.buffer) > self.buffer_size: + self.buffer.pop(0) + + if len(st) > 0: + first_starttime = min([trace.stats["starttime"] for trace in st]) + first_endtime = min([trace.stats["endtime"] for trace in st]) + + processed = self.process_data(st) + last_windowed_endtime = self.produce_windowed_data( + processed, first_starttime, first_endtime + ) + self.save_to_influx(st) + + diff = (time.monotonic_ns() - start) / 10**9 + logger.debug(diff) + + # Update starttime for next iteration + starttime = last_windowed_endtime + endtime = UTCDateTime().now() + else: + starttime = endtime + endtime = UTCDateTime().now() + + time.sleep(max(self.poll_interval - diff, 0)) + + @measure_execution_time + def produce_windowed_data(self, stream: Stream, first_starttime, first_endtime): + dt = UTCDateTime(first_starttime) + + while dt + 8 <= first_endtime: + windowed_data = [None, None, None] + trimmed = stream.slice(dt, dt + 8, keep_empty_traces=True) + if len(trimmed) > 0: + event = { + "station": trimmed[0].stats["station"], + } + for detail in trimmed: + if detail.stats["channel"] == "BHE": + windowed_data[0] = detail.data + event["BHE"] = { + "starttime": str(detail.stats.starttime), + "endtime": str(detail.stats.endtime), + "data": detail.data.tolist(), + } + elif detail.stats["channel"] == "BHN": + windowed_data[1] = detail.data + event["BHN"] = { + "starttime": str(detail.stats.starttime), + "endtime": str(detail.stats.endtime), + "data": detail.data.tolist(), + } + elif detail.stats["channel"] == "BHZ": + windowed_data[2] = detail.data + event["BHZ"] = { + "starttime": str(detail.stats.starttime), + "endtime": str(detail.stats.endtime), + "data": detail.data.tolist(), + } + self.producer.produce_message(event, event["station"]) + dt += 0.04 + return dt + + @measure_execution_time + def save_to_influx(self, stream: Stream): + trace: Trace + records = [] + for trace in stream: + starttime: datetime = UTCDateTime(trace.stats.starttime).datetime + delta = 1 / int(trace.stats.sampling_rate) + channel = trace.stats.channel + station = trace.stats.station + + for data_point in trace.data: + point = ( + Point("seismograf") + .time(starttime, write_precision=WritePrecision.MS) + .tag("channel", channel) + .tag("station", station) + .field("data", data_point) + ) + records.append(point) + starttime += timedelta(seconds=delta) + + with self.influx_client.write_api() as writer: + logger.info(f"Start batch save of {len(records)} data to InfluxDB") + writer.write(bucket="eews", record=records) + + @measure_execution_time + def process_data(self, stream: Stream): + mseed_data = stream + new_stream = Stream() + detail: Trace + for detail in mseed_data: + trace = detail.copy() + fs = detail.stats.sampling_rate + lowcut = 1.0 + highcut = 5.0 + order = 5 + data_before = detail.data + data_processed = butter_bandpass_filter( + data_before, lowcut, highcut, fs, order + ) + trace.data = data_processed + trace.interpolate(25) + trace.stats["delta"] = 1 / 25 + trace.stats["sampling_rate"] = 25 + new_stream.append(trace) + return new_stream diff --git a/eews_backend/stream/topics.py b/eews_backend/stream/topics.py index 5cfbe91..e142f41 100644 --- a/eews_backend/stream/topics.py +++ b/eews_backend/stream/topics.py @@ -1,3 +1,4 @@ -PREPROCESSED_TOPIC = 'preprocessed' -P_ARRIVAL_TOPIC = 'p-arrival' -PREDICTION_TOPIC = 'prediction' +RAW_TOPIC = "raw" +PREPROCESSED_TOPIC = "preprocessed" +P_ARRIVAL_TOPIC = "p-arrival" +PREDICTION_TOPIC = "prediction" From 7e5823aef51bd2eb2f5d5e237f62992df191a1a5 Mon Sep 17 00:00:00 2001 From: Zsaschz Date: Wed, 16 Aug 2023 09:35:52 +0700 Subject: [PATCH 12/16] refactor event time --- eews_backend/rest/main.py | 34 ++++++--------------- eews_backend/seedlink/seedlink.py | 42 +++++++++++--------------- eews_backend/utils/helper_functions.py | 10 ++++-- 3 files changed, 36 insertions(+), 50 deletions(-) diff --git a/eews_backend/rest/main.py b/eews_backend/rest/main.py index ca90552..53a6f0f 100644 --- a/eews_backend/rest/main.py +++ b/eews_backend/rest/main.py @@ -42,7 +42,7 @@ MODULE_DIR = "./rest/" STATIC_DIR = "static/" -SIMULATE_REALTIME = True if os.getenv("SIMULATE_REALTIME") == "True" else False +SIMULATE_REALTIME = False if os.getenv("SIMULATE_REALTIME") == "False" else True MSEED_RANGE_IN_SECONDS = 30 origins = [ @@ -317,37 +317,23 @@ def save_mseed(contents: bytes, filename: str): @measure_execution_time def produce_windowed_data(stream: Stream, first_starttime, first_endtime): - dt = UTCDateTime(first_starttime) + rounded_starttime = nearest_datetime_rounded(first_starttime, 0.04 * 10**6) + dt = UTCDateTime(rounded_starttime) + + log.info("Producing windowed events to kafka") while dt + 8 <= first_endtime: - windowed_data = [None, None, None] trimmed = stream.slice(dt, dt + 8, keep_empty_traces=True) if len(trimmed) > 0: event = { "station": trimmed[0].stats["station"], } for detail in trimmed: - if detail.stats["channel"] == "BHE": - windowed_data[0] = detail.data - event["BHE"] = { - "starttime": str(detail.stats.starttime), - "endtime": str(detail.stats.endtime), - "data": detail.data.tolist(), - } - elif detail.stats["channel"] == "BHN": - windowed_data[1] = detail.data - event["BHN"] = { - "starttime": str(detail.stats.starttime), - "endtime": str(detail.stats.endtime), - "data": detail.data.tolist(), - } - elif detail.stats["channel"] == "BHZ": - windowed_data[2] = detail.data - event["BHZ"] = { - "starttime": str(detail.stats.starttime), - "endtime": str(detail.stats.endtime), - "data": detail.data.tolist(), - } + event[detail.stats["channel"]] = { + "starttime": str(detail.stats.starttime), + "endtime": str(detail.stats.endtime), + "data": detail.data.tolist(), + } producer.produce_message(event, event["station"]) dt += 0.04 return dt diff --git a/eews_backend/seedlink/seedlink.py b/eews_backend/seedlink/seedlink.py index 653f5ce..7210b77 100644 --- a/eews_backend/seedlink/seedlink.py +++ b/eews_backend/seedlink/seedlink.py @@ -28,6 +28,7 @@ def __init__( channels: str = "BH*", buffer_size: int = 9, poll_interval: int = 30, + override_station: str | None = None, ) -> None: self.buffer = [] self.buffer_size = buffer_size @@ -38,7 +39,7 @@ def __init__( self.geofon_client = client = Client("GEOFON") self.inventory = client.get_stations( - network="*", + network="GE", station="*", starttime="2011-03-11T00:00:00", endtime="2018-03-11T00:00:00", @@ -47,6 +48,7 @@ def __init__( self.seedlink_client = SeedlinkClient("geofon.gfz-potsdam.de", 18000) self.influx_client = influx_client() self.producer = KafkaProducer(PREPROCESSED_TOPIC) + self.override_station = override_station def start(self): # Set the start and end times for the plot @@ -60,6 +62,12 @@ def start(self): st = self.seedlink_client.get_waveforms( self.network, self.stations, "*", self.channels, starttime, endtime ) + + if self.override_station: + trace: Trace + for trace in st: + trace.stats["station"] = self.override_station + logger.debug(f"Stream {st}") # Append the new data to the buffer @@ -94,37 +102,23 @@ def start(self): @measure_execution_time def produce_windowed_data(self, stream: Stream, first_starttime, first_endtime): - dt = UTCDateTime(first_starttime) + rounded_starttime = nearest_datetime_rounded(first_starttime, 0.04 * 10**6) + dt = UTCDateTime(rounded_starttime) + + logger.info("Producing windowed events to kafka") while dt + 8 <= first_endtime: - windowed_data = [None, None, None] trimmed = stream.slice(dt, dt + 8, keep_empty_traces=True) if len(trimmed) > 0: event = { "station": trimmed[0].stats["station"], } for detail in trimmed: - if detail.stats["channel"] == "BHE": - windowed_data[0] = detail.data - event["BHE"] = { - "starttime": str(detail.stats.starttime), - "endtime": str(detail.stats.endtime), - "data": detail.data.tolist(), - } - elif detail.stats["channel"] == "BHN": - windowed_data[1] = detail.data - event["BHN"] = { - "starttime": str(detail.stats.starttime), - "endtime": str(detail.stats.endtime), - "data": detail.data.tolist(), - } - elif detail.stats["channel"] == "BHZ": - windowed_data[2] = detail.data - event["BHZ"] = { - "starttime": str(detail.stats.starttime), - "endtime": str(detail.stats.endtime), - "data": detail.data.tolist(), - } + event[detail.stats["channel"]] = { + "starttime": str(detail.stats.starttime), + "endtime": str(detail.stats.endtime), + "data": detail.data.tolist(), + } self.producer.produce_message(event, event["station"]) dt += 0.04 return dt diff --git a/eews_backend/utils/helper_functions.py b/eews_backend/utils/helper_functions.py index ffe6941..b32aa7c 100644 --- a/eews_backend/utils/helper_functions.py +++ b/eews_backend/utils/helper_functions.py @@ -2,6 +2,7 @@ from obspy.signal.trigger import recursive_sta_lta, trigger_onset from obspy import UTCDateTime from datetime import datetime, timedelta +from dateutil import parser from pprint import pprint import pandas as pd import numpy as np @@ -194,7 +195,13 @@ def get_current_utc_datetime(): return datetime.now(pytz.utc).strftime("%Y-%m-%d %H:%M:%S.%f") -def nearest_datetime_rounded(datetime: datetime, step_in_micros: int = 40000): +def nearest_datetime_rounded( + datetime: str | datetime | UTCDateTime, step_in_micros: int = 40000 +): + if type(datetime) == str: + datetime = parser.parse(datetime) + if type(datetime) == UTCDateTime: + datetime = datetime.datetime microsecond = datetime.time().microsecond remainder = microsecond % step_in_micros rounded = datetime @@ -205,7 +212,6 @@ def nearest_datetime_rounded(datetime: datetime, step_in_micros: int = 40000): return rounded -@measure_execution_time def fill_empty_timestamp( start: datetime, end: datetime, From f08ec337a1d324311c1481429d691670b922d815 Mon Sep 17 00:00:00 2001 From: Natasya Zahra Date: Wed, 16 Aug 2023 09:38:07 +0700 Subject: [PATCH 13/16] Update station model --- eews_backend/rest/main.py | 19 +++---------------- eews_backend/rest/model.py | 15 ++++++--------- frontend/src/components/Map/Map2.js | 2 +- 3 files changed, 10 insertions(+), 26 deletions(-) diff --git a/eews_backend/rest/main.py b/eews_backend/rest/main.py index 6c28321..734d4c9 100644 --- a/eews_backend/rest/main.py +++ b/eews_backend/rest/main.py @@ -178,9 +178,7 @@ async def get_seismometer(name: str): @app.put("/station/{name}", response_model=StationModel) -async def update_seismometer( - name: str, background_task: BackgroundTasks, data: UpdateStationModel = Body(...) -): +async def update_seismometer(name: str, data: StationModel = Body(...)): data = data.model_dump() if len(data) >= 1: @@ -190,11 +188,9 @@ async def update_seismometer( if ( updated_data := await db["station"].find_one({"name": name}) ) is not None: - background_task.add_task(adjust_closest_stations) return updated_data if (existing_data := await db["station"].find_one({"name": name})) is not None: - await adjust_closest_stations() return existing_data raise HTTPException( @@ -203,9 +199,7 @@ async def update_seismometer( @app.post("/station", response_model=StationModel, status_code=status.HTTP_201_CREATED) -async def create_seismometer( - background_task: BackgroundTasks, data: UpdateStationModel = Body(...) -): +async def create_seismometer(data: StationModel = Body(...)): data = data.model_dump() if ( existing_data := await db["station"].find_one({"name": data["name"]}) @@ -215,25 +209,18 @@ async def create_seismometer( detail=f"Seismometer with name {data['name']} already exists", ) - all_stations = await db["station"].find().to_list(1000000000) - calculated = dict() - - data["closest_stations"] = calculate_closest_station(data, all_stations, calculated) - new_data = await db["station"].insert_one(data) if ( existing_data := await db["station"].find_one({"_id": new_data.inserted_id}) ) is not None: - await adjust_closest_stations() return existing_data @app.delete("/station/{name}") -async def delete_seismometer(name: str, background_task: BackgroundTasks): +async def delete_seismometer(name: str): delete_result = await db["station"].delete_one({"name": name}) if delete_result.deleted_count == 1: - await adjust_closest_stations() return Response(status_code=status.HTTP_204_NO_CONTENT) raise HTTPException( diff --git a/eews_backend/rest/model.py b/eews_backend/rest/model.py index e00dead..038d32c 100644 --- a/eews_backend/rest/model.py +++ b/eews_backend/rest/model.py @@ -1,17 +1,14 @@ -from typing import List, Optional +from typing import List, Literal, Optional from pydantic import BaseModel, Field, ConfigDict +class Location(BaseModel): + coordinates: list[float] | None = [None, None] + type: Literal['Point'] -class UpdateStationModel(BaseModel): +class StationModel(BaseModel): name: str = Field() description: Optional[str] = Field() - x: float = Field() - y: float = Field() - - -class StationModel(UpdateStationModel): - closest_stations: List[str] = Field() - + location: Location class MSeedData(BaseModel): network: Optional[str] = Field() diff --git a/frontend/src/components/Map/Map2.js b/frontend/src/components/Map/Map2.js index b9edd5c..06508a4 100644 --- a/frontend/src/components/Map/Map2.js +++ b/frontend/src/components/Map/Map2.js @@ -31,7 +31,7 @@ const Map = (props) => { const jsonData = await response.json() setStations(jsonData) jsonData.forEach(element => { - L.marker([element["x"], element["y"]], {icon: customMarkerIcon(element["name"])}).addTo(mapRef.current).bindPopup(popupContent(element["name"], element["description"])); + L.marker([element["location"]["coordinates"]["1"], element["location"]["coordinates"]["0"]], {icon: customMarkerIcon(element["name"])}).addTo(mapRef.current).bindPopup(popupContent(element["name"], element["description"])); }); } if (mapRef.current == undefined) { From 050d92a778c6e829d9be207ddfd689215d0082e6 Mon Sep 17 00:00:00 2001 From: Zsaschz Date: Wed, 16 Aug 2023 19:58:40 +0700 Subject: [PATCH 14/16] dockerize seedlink and rest api --- .../requirements.txt | 0 eews_backend/requirements2.txt | 72 ------------------- eews_backend/rest/Dockerfile | 11 +++ eews_backend/seedlink/Dockerfile | 9 +++ eews_backend/seedlink/main.py | 10 ++- 5 files changed, 29 insertions(+), 73 deletions(-) rename requirements2.txt => eews_backend/requirements.txt (100%) delete mode 100644 eews_backend/requirements2.txt create mode 100644 eews_backend/rest/Dockerfile create mode 100644 eews_backend/seedlink/Dockerfile diff --git a/requirements2.txt b/eews_backend/requirements.txt similarity index 100% rename from requirements2.txt rename to eews_backend/requirements.txt diff --git a/eews_backend/requirements2.txt b/eews_backend/requirements2.txt deleted file mode 100644 index ec0b524..0000000 --- a/eews_backend/requirements2.txt +++ /dev/null @@ -1,72 +0,0 @@ -aiohttp==3.8.4 -aiohttp-cors==0.7.0 -aiokafka==0.8.1 -aiosignal==1.3.1 -annotated-types==0.5.0 -anyio==3.7.1 -async-timeout==4.0.2 -attrs==23.1.0 -certifi==2023.5.7 -charset-normalizer==3.2.0 -click==8.1.4 -colorama==0.4.6 -colorlog==6.7.0 -confluent-kafka==2.2.0 -contourpy==1.1.0 -croniter==1.4.1 -cycler==0.11.0 -decorator==5.1.1 -dnspython==2.3.0 -exceptiongroup==1.1.2 -fastapi==0.100.0 -faust-streaming==0.10.14 -fonttools==4.41.0 -frozenlist==1.4.0 -greenlet==2.0.2 -h11==0.14.0 -haversine==2.8.0 -httptools==0.6.0 -idna==3.4 -importlib-resources==6.0.0 -influxdb-client==1.36.1 -intervaltree==3.1.0 -Jinja2==3.1.2 -kafka-python==2.0.2 -kiwisolver==1.4.4 -lxml==4.9.3 -MarkupSafe==2.1.3 -matplotlib==3.7.2 -mode-streaming==0.3.5 -motor==3.2.0 -multidict==6.0.4 -mypy-extensions==1.0.0 -numpy==1.24.4 -obspy==1.4.0 -opentracing==2.4.0 -packaging==23.1 -Pillow==10.0.0 -pydantic==2.0.2 -pydantic_core==2.1.2 -pymongo==4.4.0 -pyparsing==3.0.9 -python-dateutil==2.8.2 -python-dotenv==1.0.0 -python-multipart==0.0.6 -PyYAML==6.0 -reactivex==4.0.4 -requests==2.31.0 -scipy==1.10.1 -six==1.16.0 -sniffio==1.3.0 -sortedcontainers==2.4.0 -SQLAlchemy==2.0.18 -starlette==0.27.0 -terminaltables==3.1.10 -typing_extensions==4.7.1 -urllib3==2.0.3 -uvicorn==0.22.0 -venusian==3.0.0 -watchfiles==0.19.0 -websockets==11.0.3 -yarl==1.9.2 -zipp==3.16.1 diff --git a/eews_backend/rest/Dockerfile b/eews_backend/rest/Dockerfile new file mode 100644 index 0000000..6d909bb --- /dev/null +++ b/eews_backend/rest/Dockerfile @@ -0,0 +1,11 @@ +FROM python:3.10-slim + +ENV APP_HOME /app +WORKDIR $APP_HOME +COPY . ./ + +ENV PORT 8000 +EXPOSE $PORT +RUN pip install --no-cache-dir -r requirements.txt + +CMD exec uvicorn rest.main:app --host 0.0.0.0 --port ${PORT} --workers 8 \ No newline at end of file diff --git a/eews_backend/seedlink/Dockerfile b/eews_backend/seedlink/Dockerfile new file mode 100644 index 0000000..b9c3d4a --- /dev/null +++ b/eews_backend/seedlink/Dockerfile @@ -0,0 +1,9 @@ +FROM python:3.10-slim + +ENV APP_HOME /app +WORKDIR $APP_HOME +COPY . ./ + +RUN pip install --no-cache-dir -r requirements.txt + +CMD python -m seedlink.main \ No newline at end of file diff --git a/eews_backend/seedlink/main.py b/eews_backend/seedlink/main.py index 6953c67..f24d79e 100644 --- a/eews_backend/seedlink/main.py +++ b/eews_backend/seedlink/main.py @@ -1,6 +1,14 @@ from typing import List from .seedlink import Seedlink import multiprocessing +import os +from dotenv import load_dotenv + +load_dotenv() + +station_list = os.getenv("STATION_LIST") +if type(station_list) == str: + station_list = station_list.split(",") global process_list process_list: List[multiprocessing.Process] = [] @@ -12,7 +20,7 @@ def seedlink_process(station: str): def main(): - stations = ["JAGI", "BNDI"] + stations = station_list if station_list else ["JAGI", "BNDI"] for station in stations: process = multiprocessing.Process(target=seedlink_process, args=(station,)) process.name = f"seedlink_{station}" From c1f627018c86c6b3daef932f41452ef606c16bda Mon Sep 17 00:00:00 2001 From: Zsaschz Date: Thu, 24 Aug 2023 19:58:18 +0700 Subject: [PATCH 15/16] finish frontend and add prettier config --- .prettierrc | 4 + frontend/src/App.js | 37 +-- frontend/src/components/Charts/lineChart.js | 125 ++++------ .../src/components/Checklist/checklist.js | 67 +++-- frontend/src/components/Map/Map2.js | 156 ++++++------ frontend/src/pages/AdminMap.js | 233 ++++++++++++------ 6 files changed, 334 insertions(+), 288 deletions(-) create mode 100644 .prettierrc diff --git a/.prettierrc b/.prettierrc new file mode 100644 index 0000000..222861c --- /dev/null +++ b/.prettierrc @@ -0,0 +1,4 @@ +{ + "tabWidth": 2, + "useTabs": false +} diff --git a/frontend/src/App.js b/frontend/src/App.js index 4913efa..f14fe05 100644 --- a/frontend/src/App.js +++ b/frontend/src/App.js @@ -1,26 +1,29 @@ -import React,{useState} from "react"; -import './App.css'; +import React, { useState } from "react"; +import "./App.css"; import Navibar from "./components/Navbar/Navbar"; -import 'bootstrap/dist/css/bootstrap.min.css'; +import "bootstrap/dist/css/bootstrap.min.css"; import UserMap from "./pages/UserMap"; import AdminMap from "./pages/AdminMap"; import LogIn from "./pages/Login"; import Help from "./pages/Help"; import About from "./pages/About"; -import { BrowserRouter as Router, Routes, Route} from 'react-router-dom' +import { BrowserRouter as Router, Routes, Route } from "react-router-dom"; const App = () => { - return ( - - - - } /> - } /> - } /> - } /> - } /> - - - ); + return ( + + + + } /> + } /> + } + /> + } /> + } /> + + + ); }; -export default App; \ No newline at end of file +export default App; diff --git a/frontend/src/components/Charts/lineChart.js b/frontend/src/components/Charts/lineChart.js index ca4db1f..f32a2b8 100644 --- a/frontend/src/components/Charts/lineChart.js +++ b/frontend/src/components/Charts/lineChart.js @@ -1,91 +1,66 @@ -import { useEffect, useState,useRef } from 'react'; -import React from 'react'; +import { useEffect, useState, useRef } from "react"; +import React from "react"; import { - Label, Line, LineChart, XAxis, YAxis, - Legend, ReferenceLine, - CartesianGrid, - Tooltip, - ResponsiveContainer -} from 'recharts' -import 'react-dropdown/style.css'; -import '../style.css' + ResponsiveContainer, +} from "recharts"; +import "react-dropdown/style.css"; +import "../style.css"; -const RealtimeChart = (props) => { - const [data, setData] = useState(Array.from({length: 100}, (v, k) => {})); - const [name, setName] = useState(null) - const [p, setP] = useState(null); - - useEffect(() => { - - // if(props.json!==null && props.json[99]!==undefined){ - // setName(props.json[99].mseed_name); - // if(props.json[99].p_Arrival !== 0 && props.json[99].p_Arrival !== null){ - // setP(props.json[99].p_Arrival); - // if (props.json[25].p_Arrival === props.json[25].x){ - // lst.current = props.json.slice(0,50); - // console.log(lst.current); - // } - // if (lst.current === null){ - // setData(props.json); - // } - // else { - // setData(lst.current.concat(props.json.slice(50,100))) - // } - // } - // else { - // setData(props.json); - // } - // } - // else { - - // } +const strokeColor = { + BHN: "#8884d8", + BHE: "#ffc658", + BHZ: "#82ca9d", +}; - let mappedData = [] - Object.entries(props.json).forEach(([key, value]) => { - Object.entries(value).forEach(([index, point]) => { - let points = mappedData[index] != undefined ? mappedData[index] : {} - points[key] = point - points["time"] = new Date(props.time[index]).toLocaleTimeString() - mappedData[index] = points - }) - }) +const RealtimeChart = (props) => { + const [data, setData] = useState([]); - setData((prev) => [...prev.slice(25), ...mappedData]) - + useEffect(() => { + setData(props.data); }, [props]); - function yAxisFormatter(y) { - if (y > 0) { - return y/y - } else if (y == 0) { - return 0 - } else { - return -(y/y) - } - } - return ( -
-

Stasiun {props.stasiun}

-
- - - - - - - - - - - +
+

{props.channel}

+
+ + + val ? new Date(val).toLocaleTimeString() : "" + } + /> + + {Object.values(props.p).map((value) => { + return ( + + ); + })} + +
); }; -export default RealtimeChart +export default RealtimeChart; diff --git a/frontend/src/components/Checklist/checklist.js b/frontend/src/components/Checklist/checklist.js index c790f08..a6a5e54 100644 --- a/frontend/src/components/Checklist/checklist.js +++ b/frontend/src/components/Checklist/checklist.js @@ -1,15 +1,10 @@ import React, { Component, useState } from "react"; import ReactDOM from "react-dom"; import { default as ReactSelect } from "react-select"; -import Select from 'react-select'; +import Select from "react-select"; import "../style.css"; import { components } from "react-select"; -const stations = [ - { value: "GMJI", label: "GMJI" }, - { value: "JAGI", label: "JAGI" }, - { value: "PWJI", label: "PWJI" } -]; const Option = (props) => { return (
@@ -26,48 +21,42 @@ const Option = (props) => { }; const Checklist = (props) => { - - - const handleChange = (selected) => { - let selectList = [] - for(var i=0;i { console.log(selected); - } + }; return ( -
- - +
+ ); +}; export default Checklist; diff --git a/frontend/src/components/Map/Map2.js b/frontend/src/components/Map/Map2.js index 06508a4..b621243 100644 --- a/frontend/src/components/Map/Map2.js +++ b/frontend/src/components/Map/Map2.js @@ -1,95 +1,95 @@ -import React, {useRef,useEffect,useState} from "react"; -import '../style.css'; -import 'bootstrap/dist/css/bootstrap.min.css'; -import { divIcon } from 'leaflet'; -import L from 'leaflet' - +import React, { useRef, useEffect, useState } from "react"; +import "../style.css"; +import "bootstrap/dist/css/bootstrap.min.css"; +import { divIcon } from "leaflet"; +import L from "leaflet"; const Map = (props) => { - const [found,setFound] = useState(false); - const [coord,setCoord] = useState(new L.circle()); - const [stations, setStations] = useState([]) - const map = useRef(); + const [stations, setStations] = useState([]); + const [predictions, setPredictions] = useState([]); const mapRef = useRef(); let lst = useRef([]); - function customMarkerIcon(station, iconAnchor = [25, 41], popupAnchor = [-11, -40]) { + function customMarkerIcon( + station, + iconAnchor = [25, 41], + popupAnchor = [-11, -40] + ) { return divIcon({ html: `
${station}
`, iconAnchor: iconAnchor, - popupAnchor: popupAnchor - }) + popupAnchor: popupAnchor, + }); } function popupContent(station, description) { - return `${station}
${description}
` - } - - useEffect(() =>{ - async function getStations() { - const response = await fetch(`http://${props.url}/station`) - const jsonData = await response.json() - setStations(jsonData) - jsonData.forEach(element => { - L.marker([element["location"]["coordinates"]["1"], element["location"]["coordinates"]["0"]], {icon: customMarkerIcon(element["name"])}).addTo(mapRef.current).bindPopup(popupContent(element["name"], element["description"])); - }); + return `${station}
${description}
`; } - if (mapRef.current == undefined) { - mapRef.current = L.map('map', { - layers: [ - L.tileLayer("https://server.arcgisonline.com/ArcGIS/rest/services/World_Imagery/MapServer/tile/{z}/{y}/{x}") - ], - zoomControl: false - - }) - .setView([-8.0219, 111.8], 6.5) - - L.control.zoom({ - position: 'topright' - }).addTo(mapRef.current); + + function setSource(stasiun, koord, mag, depth) { + var magnitude = Math.round((mag + Number.EPSILON) * 100) / 100; + var depths = Math.round((depth + Number.EPSILON) * 100) / 100; + let text = `Koordinat: ${koord} Magnitude: ${magnitude} Kedalaman: ${depths} KM Stasiun Pendeteksi: ${stasiun}`; + var circle = new L.circle(koord, 50000, { + color: "red", + opacity: 0.5, + }).bindPopup(text); + mapRef.current.addLayer(circle); + setPredictions((prev) => [...prev, circle]); } - getStations() - - }, []); - useEffect(()=>{ - - if(props.jsonGMJI !== undefined && props.jsonPWJI !== undefined && props.jsonJAGI !== undefined){ - if(props.jsonGMJI[99].data_prediction.lat===undefined && props.jsonJAGI[99].data_prediction.lat===undefined && props.jsonPWJI[99].data_prediction.lat===undefined){ - setFound(false); - mapRef.current.removeLayer(coord); - } - else { - if(props.jsonGMJI[99].data_prediction.lat!==null){ - setSource('GMJI',[props.jsonGMJI[99].data_prediction.lat,props.jsonGMJI[99].data_prediction.long],props.jsonGMJI[99].data_prediction.magnitude,props.jsonGMJI[99].data_prediction.depth) - } - if(props.jsonPWJI[99].data_prediction.lat!==null){ - setSource('PWJI',[props.jsonPWJI[99].data_prediction.lat,props.jsonPWJI[99].data_prediction.long],props.jsonPWJI[99].data_prediction.magnitude,props.jsonPWJI[99].data_prediction.depth) - } - if(props.jsonJAGI[99].data_prediction.lat!==null){ - setSource('JAGI',[props.jsonJAGI[99].data_prediction.lat,props.jsonJAGI[99].data_prediction.long],props.jsonJAGI[99].data_prediction.magnitude,props.jsonJAGI[99].data_prediction.depth) - } - } + useEffect(() => { + async function getStations() { + const response = await fetch(`http://${props.url}/station`); + const jsonData = await response.json(); + setStations(jsonData); + jsonData.forEach((element) => { + L.marker( + [ + element["location"]["coordinates"][0], + element["location"]["coordinates"][1], + ], + { icon: customMarkerIcon(element["name"]) } + ) + .addTo(mapRef.current) + .bindPopup(popupContent(element["name"], element["description"])); + }); } - - function setSource(stasiun, koord, mag, depth){ - if(found===false){ - var magnitude = Math.round((mag + Number.EPSILON) * 100) / 100; - var lat = Math.round((koord[0] + Number.EPSILON) * 100) / 100; - var long= Math.round((koord[1] + Number.EPSILON) * 100) / 100; - var depths = Math.round((depth + Number.EPSILON) * 100) / 100; - let text = `Koordinat: ${lat},${long} Magnitude: ${magnitude} Kedalaman: ${depths} KM Stasiun Pendeteksi: ${stasiun}`; - var circle = new L.circle (koord, 30000, {color: "red", opacity:.5}).bindPopup(text) - setCoord(circle); - mapRef.current.addLayer(circle); - mapRef.current.setView(koord,7.5); - setFound(true); - } + if (mapRef.current == undefined) { + mapRef.current = L.map("map", { + layers: [ + L.tileLayer( + "https://server.arcgisonline.com/ArcGIS/rest/services/World_Imagery/MapServer/tile/{z}/{y}/{x}" + ), + ], + zoomControl: false, + }).setView([-8.0219, 111.8], 6.5); + + L.control + .zoom({ + position: "topright", + }) + .addTo(mapRef.current); } - },[props]) + getStations(); + }, []); - return ( -
- ); - } -export default Map; \ No newline at end of file + useEffect(() => { + predictions.forEach((value) => { + mapRef.current.removeLayer(value); + }); + setPredictions([]); + + props.prediction.forEach((value) => { + setSource( + value["station"], + [value["lat"], value["long"]], + value["magnitude"], + value["depth"] + ); + }); + }, [props]); + + return
; +}; +export default Map; diff --git a/frontend/src/pages/AdminMap.js b/frontend/src/pages/AdminMap.js index 3522927..bf21e69 100644 --- a/frontend/src/pages/AdminMap.js +++ b/frontend/src/pages/AdminMap.js @@ -1,102 +1,177 @@ -import React,{useEffect, useState, useRef} from "react"; +import React, { useEffect, useState, useRef, useMemo } from "react"; import "chartjs-plugin-streaming"; -import '../App.css'; +import "../App.css"; import RealtimeChart from "../components/Charts/lineChart"; import Ptable from "../components/Tabel"; -import MyTime from "../components/Timer/time" -import 'bootstrap/dist/css/bootstrap.min.css'; -import Map from "../components/Map/Map2" +import MyTime from "../components/Timer/time"; +import "bootstrap/dist/css/bootstrap.min.css"; +import Map from "../components/Map/Map2"; import Checklist from "../components/Checklist/checklist"; import Legend from "../components/Legend/Legend-Button"; -// import Performance from "../components/Legend/Performance"; import Button from "react-bootstrap/Button"; import Dropdown from "../components/Charts/dropdown"; -const RESERVED_FIELD = ["result", "table", "_field", "_measurement", "_start", "_stop", "_time"] +const channelOptions = [ + { value: "BHE", label: "BHE" }, + { value: "BHN", label: "BHN" }, + { value: "BHZ", label: "BHZ" }, +]; const AdminMap = (props) => { - const [stations, setStations] = useState([]); - const [optionSelected, setOptionSelected] = useState(null); - const [showTable, setShowTable] = useState(false); - const [checklistDisabled, setChecklistDisabled] = useState(true); - const [mseed, setMseed] = useState([]); - const [socket, setSocket] = useState(null); - const [data, setData] = useState(null); - const [time, setTime] = useState(null); - const [reset,setReset] = useState(false); - const [showTab,setShowTab] = useState(true); - const [showLegend, setShowLegend] = useState(false); + const [data, setData] = useState({}); + const [prediction, setPrediction] = useState([]); + const [showTab, setShowTab] = useState(true); + const [stationOptions, setStationOptions] = useState(new Set()); + const [stations, setStations] = useState(Object.keys(data)); + const [optionStationSelected, setOptionStationSelected] = useState(stations); + const [channels, setChannels] = useState(["BHE", "BHN", "BHZ"]); + const [optionChannelSelected, setOptionChannelSelected] = + useState(channelOptions); - - const handleClick = () => setShowTable(!showTable); - - useEffect(()=>{ - const socket = new WebSocket(`ws://${props.url}/ws`) - console.log(props.url) - socket.onmessage = function(e) { - const jsonData = JSON.parse(e.data) - let stations = {} - Object.entries(jsonData).forEach(([key, value]) => { - if (!RESERVED_FIELD.includes(key)) { - const [channel, station] = key.split("_") - let channels = stations[station] != undefined ? stations[station] : {} - channels[channel] = Object.values(value) - stations[station] = channels - } - }) - setTime(jsonData["_time"]) - setData(stations); - } - socket.onclose = console.log('Socket connection closed') - setSocket(socket) - }, []) + function functionMapper(data) { + let options = []; + if (data !== undefined) { + let arr = Array.from(data); + Object.values(arr).map((station) => { + options.push({ + value: station, + label: station, + }); + }); + } + return options; + } useEffect(() => { - // if(stations.includes("GMJI")){ - // setShowGMJI(true); - // setShowLegend(true); - // } + const socket = new WebSocket(`ws://${props.url}/ws`); + socket.onmessage = function (e) { + const res = JSON.parse(e.data); + const jsonData = res["data"]; + const prediction = res["prediction"]; - if (stations.length == 0) setShowLegend(false); + let newOptions = new Set(); + Object.keys(jsonData).forEach((key) => { + if (!stationOptions.has(key)) { + newOptions.add(key); + } + }); - }, [stations]); + setData((prev) => { + let newData = structuredClone(prev); + Object.entries(jsonData).forEach(([station, value]) => { + let stationData = newData[station] + ? newData[station] + : { + BHE: Array(100).fill(null), + BHN: Array(100).fill(null), + BHZ: Array(100).fill(null), + p_arrival: [], + }; + Object.entries(value).forEach(([key, array]) => { + let arrayData = stationData[key]; + + if (key == "p_arrival") { + Object.values(array).forEach((point) => { + arrayData.push(new Date(point).getTime()); + }); + stationData[key] = arrayData; + } else { + Object.values(array).forEach((point) => { + point["time"] = new Date(point["time"]).getTime(); + arrayData.push(point); + }); + } + stationData[key] = arrayData; + if (stationData[key].length > array.length * 4) { + stationData[key] = stationData[key].slice(array.length); + } else if (stationData[key].length > 200) { + stationData[key] = stationData[key].slice(50); + } + newData[station] = stationData; + }); + }); + return newData; + }); + setPrediction(prediction); + setStationOptions((prev) => new Set([...prev, ...newOptions])); + }; + }, []); return ( -
- -
- -
-
- -
- {showTab ? -
- {/* */} - -
- {data != null && Object.entries(data).map(([key, value]) => { - return ( - //
- - /*
- -o- BHN-o- BHZ-o- BHE| P-Arrival -
*/ - //
- )})} -
- {/* { showTable ? : null} */} -
-
-
: null} + {showTab ? ( +
+ + + +
+ {data != null && + Object.entries(data).map(([stasiun, value]) => { + if (stations.includes(stasiun)) { + return ( +
+

Stasiun {stasiun}

+ {channels.includes("BHE") && ( + + )} + {channels.includes("BHN") && ( + + )} + {channels.includes("BHZ") && ( + + )} +
+ ); + } + })} +
+
+ ) : null} +
-
); }; -export default AdminMap; \ No newline at end of file +export default AdminMap; From 1b1693c80444dd866bde20d9ac203407977458aa Mon Sep 17 00:00:00 2001 From: Zsaschz Date: Thu, 24 Aug 2023 19:59:46 +0700 Subject: [PATCH 16/16] update rest api and seedlink --- eews_backend/database/__init__.py | 2 - eews_backend/database/influxdb.py | 2 +- eews_backend/rest/main.py | 221 +++++++++++++++++-------- eews_backend/rest/websocket.py | 3 +- eews_backend/seedlink/Dockerfile | 2 +- eews_backend/seedlink/main.py | 25 ++- eews_backend/seedlink/requirements.txt | 30 ++++ eews_backend/seedlink/seedlink.py | 42 +++++ 8 files changed, 247 insertions(+), 80 deletions(-) create mode 100644 eews_backend/seedlink/requirements.txt diff --git a/eews_backend/database/__init__.py b/eews_backend/database/__init__.py index 53094a2..e69de29 100644 --- a/eews_backend/database/__init__.py +++ b/eews_backend/database/__init__.py @@ -1,2 +0,0 @@ -from .mongodb import * -from .influxdb import * \ No newline at end of file diff --git a/eews_backend/database/influxdb.py b/eews_backend/database/influxdb.py index 7e5e550..0f1be03 100644 --- a/eews_backend/database/influxdb.py +++ b/eews_backend/database/influxdb.py @@ -1,5 +1,5 @@ from influxdb_client.client.write_api import SYNCHRONOUS -from influxdb_client import InfluxDBClient, WriteOptions +from influxdb_client import InfluxDBClient from dotenv import load_dotenv import os diff --git a/eews_backend/rest/main.py b/eews_backend/rest/main.py index cca8c5e..2b735b6 100644 --- a/eews_backend/rest/main.py +++ b/eews_backend/rest/main.py @@ -1,5 +1,8 @@ from datetime import timedelta, timezone +import random +from dateutil import parser from math import ceil +import uuid from dotenv import load_dotenv from fastapi import ( FastAPI, @@ -15,15 +18,16 @@ from fastapi.responses import HTMLResponse from fastapi.staticfiles import StaticFiles from fastapi.middleware.cors import CORSMiddleware -from influxdb_client.client.flux_table import TableList +from influxdb_client.client.flux_table import TableList, FluxRecord from typing import Dict, List from influxdb_client import Point, WritePrecision from obspy import Stream, Trace, read +from pprint import pprint from logging.config import dictConfig from database.mongodb import mongo_client from database.influxdb import influx_client -from stream import KafkaProducer, PREPROCESSED_TOPIC +from stream import KafkaProducer, KafkaConsumer, PREPROCESSED_TOPIC, RAW_TOPIC from utils import * from .model import * from .websocket import ConnectionManager @@ -36,6 +40,7 @@ import pandas as pd import io import os +import json load_dotenv() dictConfig(logging_config) @@ -52,7 +57,6 @@ app = FastAPI() log = logging.getLogger("rest") -app.mount("/static", StaticFiles(directory=f"{MODULE_DIR}{STATIC_DIR}"), name="static") app.add_middleware( @@ -65,8 +69,9 @@ log.info(f"{SIMULATE_REALTIME=}") -producer = KafkaProducer(PREPROCESSED_TOPIC) manager = ConnectionManager() +# consumer = KafkaConsumer(RAW_TOPIC, uuid.uuid4()) +producer = KafkaProducer(PREPROCESSED_TOPIC) _, db = mongo_client() client = influx_client() @@ -108,27 +113,106 @@ """ +# @threaded +# def listen(consumer: KafkaConsumer, manager: ConnectionManager): +# log.info("Listening for messages") +# try: +# consumer.consume(on_message=lambda message: manager.broadcast(message)) +# except Exception as e: +# log.error(e) + + +# @app.on_event("startup") +# async def startup_event(): +# try: +# listen(consumer, manager) +# except Exception as e: +# log.error(e) + + +@app.get("/post", status_code=201) +async def post(): + pass + # data = { + # "lat": random.uniform(5.626791, -9), + # "long": random.uniform(95.429725, 140.884050), + # "depth": 8889.901495235445, + # "magnitude": 42.41240072250366, + # "time": 1548.9176885528566, + # "p-arrival": parser.parse("2015-08-20T15:11:00.000Z"), + # "expired": parser.parse("2015-08-20T15:12:00.000Z"), + # "station": "ABJI", + # } + # print(data) + # await db["prediction"].insert_one(data) + + # time = datetime(2015, 8, 20, 15, 11, 47, tzinfo=timezone.utc) + # records = [] + # with client.write_api() as writer: + # for i in range(10): + # records.append( + # Point("p_arrival") + # .time(time + timedelta(seconds=i), write_precision=WritePrecision.S) + # .tag("station", "KHK") + # .field("time_data", (time + timedelta(seconds=i)).isoformat()) + # ) + # print(records) + # writer.write(bucket="eews", record=records) + + @app.get("/test") async def test(): query_api = client.query_api() now = datetime(2015, 8, 20, 15, 11, 47, tzinfo=timezone.utc) - now = datetime.now(tz=timezone.utc) - first_starttime = now query = f""" - from(bucket: "eews") - |> range(start: {(now - timedelta(seconds=60)).isoformat()}, stop: {now.isoformat()}) - |> filter(fn: (r) => r["_measurement"] == "seismograf") - |> pivot(rowKey: ["_time"], columnKey: ["channel", "station"], valueColumn: "_value")""" - start = time.monotonic_ns() - data: pd.DataFrame = query_api.query_data_frame(query=query) - data2: TableList = query_api.query(query=query) - # TODO: Update result for easier handling in frontend + from(bucket: "eews") + |> range(start: {(now - timedelta(seconds=1)).isoformat()}, stop: {now.isoformat()}) + |> filter(fn: (r) => r["_measurement"] == "p_arrival" or r["_measurement"] == "seismograf")""" + data: TableList = query_api.query(query=query) result = {} - # log.debug(data) - # extended_data = fill_empty_timestamp((now - timedelta(seconds=1)), now, data) - # print(extended_data) - log.debug(f"{(time.monotonic_ns() - start)/10**9}s") - return data2.to_json() + for records in data: + row: FluxRecord + for row in records: + station = row.values["station"] + _time = row.values["_time"] + + current_station = result.get( + station, {"BHE": [], "BHN": [], "BHZ": [], "p_arrival": []} + ) + + if row.values["_measurement"] == "seismograf": + channel = row.values["channel"] + data = row.values["_value"] + current_channel = current_station[channel] + current_channel.append( + { + "time": _time.isoformat(), + "data": data, + } + ) + current_station[channel] = current_channel + result[station] = current_station + + elif row.values["_measurement"] == "p_arrival": + current_station["p_arrival"].append(_time.isoformat()) + result[station] = current_station + + prediction = ( + await db["prediction"] + .find( + { + "p-arrival": {"$lte": now - timedelta(seconds=1)}, + "expired": {"$gte": now}, + } + ) + .to_list(1000000000) + ) + for p in prediction: + del p["_id"] + + print(now - timedelta(seconds=1)) + print(now) + return {"data": result, "prediction": prediction} # return extended_data.to_dict() @@ -142,7 +226,7 @@ async def websocket_endpoint(websocket: WebSocket): await manager.connect(websocket) try: query_api = client.query_api() - now = datetime(2015, 8, 20, 15, 12, 1, tzinfo=timezone.utc) + now = datetime(2015, 8, 20, 15, 11, 47, tzinfo=timezone.utc) if SIMULATE_REALTIME: now = datetime.now(tz=timezone.utc) - timedelta( seconds=MSEED_RANGE_IN_SECONDS @@ -152,22 +236,58 @@ async def websocket_endpoint(websocket: WebSocket): query = f""" from(bucket: "eews") |> range(start: {(now - timedelta(seconds=1)).isoformat()}, stop: {now.isoformat()}) - |> filter(fn: (r) => r["_measurement"] == "seismograf") - |> pivot(rowKey: ["_time"], columnKey: ["channel", "station"], valueColumn: "_value")""" - data: pd.DataFrame = query_api.query_data_frame(query=query) - extended_data = fill_empty_timestamp( - (now - timedelta(seconds=1)), now, data - ) - # TODO: Update result for easier handling in frontend + |> filter(fn: (r) => r["_measurement"] == "p_arrival" or r["_measurement"] == "seismograf")""" + data: TableList = query_api.query(query=query) result = {} - log.debug(now) - log.debug(data) - json_data = extended_data.to_json() + for records in data: + row: FluxRecord + for row in records: + station = row.values["station"] + _time = row.values["_time"] + + current_station = result.get( + station, {"BHE": [], "BHN": [], "BHZ": [], "p_arrival": []} + ) + + if row.values["_measurement"] == "seismograf": + channel = row.values["channel"] + data = row.values["_value"] + current_channel = current_station[channel] + current_channel.append( + { + "time": _time.isoformat(), + "data": data, + } + ) + current_station[channel] = current_channel + result[station] = current_station + + elif row.values["_measurement"] == "p_arrival": + current_station["p_arrival"].append(_time.isoformat()) + result[station] = current_station + + prediction = ( + await db["prediction"] + .find( + { + "p-arrival": {"$lte": now - timedelta(seconds=1)}, + "expired": {"$gte": now}, + } + ) + .to_list(1000000000) + ) + for p in prediction: + del p["_id"] + del p["p-arrival"] + del p["expired"] + + json_data = json.dumps({"data": result, "prediction": prediction}) now += timedelta(seconds=1) await manager.broadcast(json_data) diff = (time.monotonic_ns() - start) / 10**9 await asyncio.sleep(1 - diff) - except Exception: + except Exception as e: + log.error(e) log.warning(f"Client {websocket} has been disconnected") manager.disconnect(websocket) @@ -248,44 +368,6 @@ async def upload_mseed(file: UploadFile, background_tasks: BackgroundTasks): return {"file_size": file.size, "filename": filename} -@measure_execution_time -async def adjust_closest_stations(all_stations=None): - log.info("Adjusting closest stations") - if not all_stations: - all_stations = await db["station"].find().to_list(1000000000) - - calculated = dict() - - for station in all_stations: - station["closest_stations"] = calculate_closest_station( - station, all_stations, calculated - ) - await db["station"].update_one({"name": station["name"]}, {"$set": station}) - - -def calculate_closest_station( - curr_station: List[Dict], all_stations: List[Dict], calculated: Dict = None -): - distances = [] - - for other_station in all_stations: - if other_station["name"] == curr_station["name"]: - continue - distance = float("inf") - if f"{other_station['name']}-{curr_station['name']}" in calculated: - distance = calculated[f"{other_station['name']}-{curr_station['name']}"] - else: - distance = hs.haversine( - (curr_station["x"], curr_station["y"]), - (other_station["x"], other_station["y"]), - ) - calculated[f"{curr_station['name']}-{other_station['name']}"] = distance - distances.append((other_station["name"], distance)) - - distances.sort(key=lambda x: x[1]) - return [i[0] for i in distances[:3]] - - @measure_execution_time def save_mseed(contents: bytes, filename: str): log.info("Saving mseed on the background") @@ -335,6 +417,7 @@ def save_to_influx(stream: Stream): delta = 1 / int(trace.stats.sampling_rate) channel = trace.stats.channel station = trace.stats.station + starttime = nearest_datetime_rounded(starttime, delta * 10**6) for data_point in trace.data: point = ( diff --git a/eews_backend/rest/websocket.py b/eews_backend/rest/websocket.py index 2631ea7..f213e58 100644 --- a/eews_backend/rest/websocket.py +++ b/eews_backend/rest/websocket.py @@ -1,5 +1,6 @@ from fastapi import WebSocket + class ConnectionManager: def __init__(self): self.active_connections: list[WebSocket] = [] @@ -17,4 +18,4 @@ async def send_personal_message(self, message: str, websocket: WebSocket): async def broadcast(self, message: str): for connection in self.active_connections: - await connection.send_text(message) \ No newline at end of file + await connection.send_text(message) diff --git a/eews_backend/seedlink/Dockerfile b/eews_backend/seedlink/Dockerfile index b9c3d4a..8aebbb5 100644 --- a/eews_backend/seedlink/Dockerfile +++ b/eews_backend/seedlink/Dockerfile @@ -4,6 +4,6 @@ ENV APP_HOME /app WORKDIR $APP_HOME COPY . ./ -RUN pip install --no-cache-dir -r requirements.txt +RUN pip install --no-cache-dir -r ./seedlink/requirements.txt CMD python -m seedlink.main \ No newline at end of file diff --git a/eews_backend/seedlink/main.py b/eews_backend/seedlink/main.py index f24d79e..cc81604 100644 --- a/eews_backend/seedlink/main.py +++ b/eews_backend/seedlink/main.py @@ -1,28 +1,41 @@ from typing import List +from dotenv import load_dotenv + from .seedlink import Seedlink + import multiprocessing import os -from dotenv import load_dotenv +import random +import string load_dotenv() station_list = os.getenv("STATION_LIST") if type(station_list) == str: station_list = station_list.split(",") +num_of_stations = int(os.getenv("NUM_OF_STATIONS")) global process_list process_list: List[multiprocessing.Process] = [] -def seedlink_process(station: str): - client = Seedlink(station) +def seedlink_process(station: str, override: str): + client = Seedlink(station, override_station=override) client.start() def main(): - stations = station_list if station_list else ["JAGI", "BNDI"] - for station in stations: - process = multiprocessing.Process(target=seedlink_process, args=(station,)) + stations = station_list if station_list else ["JAGI"] + station_set = set() + alphabet = string.ascii_uppercase + while len(station_set) < num_of_stations: + random_combination = "".join(random.choice(alphabet) for _ in range(4)) + station_set.add(random_combination) + + for index, station in enumerate(list(station_set)): + process = multiprocessing.Process( + target=seedlink_process, args=(stations[index % len(stations)], station) + ) process.name = f"seedlink_{station}" process_list.append(process) for process in process_list: diff --git a/eews_backend/seedlink/requirements.txt b/eews_backend/seedlink/requirements.txt new file mode 100644 index 0000000..19f8d56 --- /dev/null +++ b/eews_backend/seedlink/requirements.txt @@ -0,0 +1,30 @@ +certifi==2023.7.22 +charset-normalizer==3.2.0 +confluent-kafka==2.2.0 +contourpy==1.1.0 +cycler==0.11.0 +decorator==5.1.1 +fonttools==4.42.1 +greenlet==2.0.2 +idna==3.4 +influxdb-client==1.37.0 +kiwisolver==1.4.4 +lxml==4.9.3 +matplotlib==3.7.2 +numpy==1.25.2 +obspy==1.4.0 +packaging==23.1 +pandas==2.0.3 +Pillow==10.0.0 +pyparsing==3.0.9 +python-dateutil==2.8.2 +python-dotenv==1.0.0 +pytz==2023.3 +reactivex==4.0.4 +requests==2.31.0 +scipy==1.11.2 +six==1.16.0 +SQLAlchemy==2.0.20 +typing_extensions==4.7.1 +tzdata==2023.3 +urllib3==2.0.4 diff --git a/eews_backend/seedlink/seedlink.py b/eews_backend/seedlink/seedlink.py index 7210b77..5c5ed16 100644 --- a/eews_backend/seedlink/seedlink.py +++ b/eews_backend/seedlink/seedlink.py @@ -48,7 +48,10 @@ def __init__( self.seedlink_client = SeedlinkClient("geofon.gfz-potsdam.de", 18000) self.influx_client = influx_client() self.producer = KafkaProducer(PREPROCESSED_TOPIC) + # self.raw_producer = KafkaProducer(RAW_TOPIC) self.override_station = override_station + # self.raw_producer_threads = [] + # self.produce_raw() def start(self): # Set the start and end times for the plot @@ -100,6 +103,44 @@ def start(self): time.sleep(max(self.poll_interval - diff, 0)) + # @threaded + # def produce_raw(self): + # while True: + # if len(self.buffer) > 0 and self.all_threads_finished(): + # logger.info("Scheduling raw events to kafka") + # current_stream: Stream = self.buffer.pop(0) + # for trace in current_stream: + # thread = threading.Thread( + # target=self.produce_raw_trace, args=(trace,) + # ) + # self.raw_producer_threads.append(thread) + # thread.start() + + # def all_threads_finished(self): + # thread: threading.Thread + # for thread in self.raw_producer_threads: + # if thread.is_alive(): + # return False + # self.raw_producer_threads = [] + # return True + + # def produce_raw_trace(self, trace: Trace): + # starttime = trace.stats["starttime"].datetime + # sampling_rate = int(trace.stats["sampling_rate"]) + # delta = 1 / sampling_rate + # for data in trace.data: + # event = { + # "station": trace.stats["station"], + # "channel": trace.stats["channel"], + # "data": float(data), + # "time": str(starttime), + # } + # self.raw_producer.produce_message( + # event, f"{event['channel']}_{event['station']}" + # ) + # starttime += timedelta(seconds=delta) + # time.sleep(delta) + @measure_execution_time def produce_windowed_data(self, stream: Stream, first_starttime, first_endtime): rounded_starttime = nearest_datetime_rounded(first_starttime, 0.04 * 10**6) @@ -132,6 +173,7 @@ def save_to_influx(self, stream: Stream): delta = 1 / int(trace.stats.sampling_rate) channel = trace.stats.channel station = trace.stats.station + starttime = nearest_datetime_rounded(starttime, delta * 10**6) for data_point in trace.data: point = (