Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prediction #4

Open
wants to merge 33 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
4e765e4
add spark docker
natsyz Jul 24, 2023
cd6ed50
refactor and add function
natsyz Jul 24, 2023
d376e40
update spark image
natsyz Jul 26, 2023
e80e7ae
Merge branch 'main' into main
natsyz Jul 26, 2023
e77878e
init predict module
natsyz Jul 26, 2023
279b22e
attempt to fix missing timestamp from influx
Zsaschz Jul 27, 2023
26b93b3
local predict
natsyz Jul 27, 2023
11bfaa5
refactor
natsyz Jul 28, 2023
fbe3a43
add generator to simulate realtime data
Zsaschz Jul 30, 2023
a477f14
refactor db module
Zsaschz Jul 31, 2023
43bb8b5
refactor to use multiprocess
Zsaschz Jul 31, 2023
94ab505
add option to simulate realtime
Zsaschz Jul 31, 2023
39fe5dd
map to fetch stations data & refactor
Zsaschz Jul 31, 2023
f16f512
fix generator file
Zsaschz Jul 31, 2023
166b3a2
add fill empty timestamp and send key to kafka
Zsaschz Aug 3, 2023
f210c4e
fix time format for kafka event
Zsaschz Aug 3, 2023
0d7b91e
add predict script
natsyz Aug 4, 2023
3a547b5
Add pause time
natsyz Aug 7, 2023
c6d8236
add nginx conf
Zsaschz Aug 15, 2023
bf0ec4f
add seedlink realtime query and change window data on kafka
Zsaschz Aug 15, 2023
7e5823a
refactor event time
Zsaschz Aug 16, 2023
f08ec33
Update station model
natsyz Aug 16, 2023
636ff23
Merge branch 'rest' of https://github.com/Zsaschz/eews-event-driven i…
natsyz Aug 16, 2023
3e14168
Update flow: without read from db
natsyz Aug 16, 2023
050d92a
dockerize seedlink and rest api
Zsaschz Aug 16, 2023
98f5f8a
Merge pull request #7 from natsyz/rest
Zsaschz Aug 21, 2023
c1f6270
finish frontend and add prettier config
Zsaschz Aug 24, 2023
1b1693c
update rest api and seedlink
Zsaschz Aug 24, 2023
62308de
Merge branch 'rest' of https://github.com/Zsaschz/eews-event-driven i…
natsyz Oct 8, 2023
a203f71
merging p arrival prediction
natsyz Oct 8, 2023
ed52ba8
add prediction script
natsyz Oct 8, 2023
1980538
update code from cloud
natsyz Oct 8, 2023
4025597
syntax fixing
natsyz Oct 18, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,85 @@ services:
DOCKER_INFLUXDB_INIT_ORG: eews
DOCKER_INFLUXDB_INIT_BUCKET: eews

spark-master:
image: bitnami/spark:latest
ports:
- "8080:8080"
environment:
- SPARK_MODE=master
volumes:
- ./data:/data

spark-worker-1:
image: bitnami/spark:latest
environment:
- SPARK_MODE=worker
- SPARK_MASTER_URL=spark://spark-master:7077
depends_on:
- spark-master
volumes:
- ./data:/data

spark-worker-2:
image: bitnami/spark:latest
environment:
- SPARK_MODE=worker
- SPARK_MASTER_URL=spark://spark-master:7077
depends_on:
- spark-master
volumes:
- ./data:/data\

# spark-master:
# image: cluster-apache-spark:3.4.1
# ports:
# - "9095:9090"
# - "7077:7077"
# volumes:
# - ./apps/spark:/opt/spark-apps
# - ./data/spark:/opt/spark-data
# environment:
# - SPARK_LOCAL_IP=spark-master
# - SPARK_WORKLOAD=master

# spark-worker-a:
# image: cluster-apache-spark:3.4.1
# ports:
# - "9096:9090"
# - "7000:7000"
# depends_on:
# - spark-master
# environment:
# - SPARK_MASTER=spark://spark-master:7077
# - SPARK_WORKER_CORES=1
# - SPARK_WORKER_MEMORY=1G
# - SPARK_DRIVER_MEMORY=1G
# - SPARK_EXECUTOR_MEMORY=1G
# - SPARK_WORKLOAD=worker
# - SPARK_LOCAL_IP=spark-worker-a
# volumes:
# - ./apps/spark:/opt/spark-apps
# - ./data/spark:/opt/spark-data

# spark-worker-b:
# image: cluster-apache-spark:3.4.1
# ports:
# - "9097:9090"
# - "7001:7000"
# depends_on:
# - spark-master
# environment:
# - SPARK_MASTER=spark://spark-master:7077
# - SPARK_WORKER_CORES=1
# - SPARK_WORKER_MEMORY=1G
# - SPARK_DRIVER_MEMORY=1G
# - SPARK_EXECUTOR_MEMORY=1G
# - SPARK_WORKLOAD=worker
# - SPARK_LOCAL_IP=spark-worker-b
# volumes:
# - ./apps:/opt/spark-apps
# - ./data:/opt/spark-data

zookeeper:
restart: always
environment:
Expand Down
10 changes: 9 additions & 1 deletion eews_backend/database/mongodb.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from dotenv import load_dotenv
from motor import motor_asyncio
from typing import Optional
from pymongo import MongoClient
import os

load_dotenv()
Expand All @@ -12,9 +13,16 @@
client = motor_asyncio.AsyncIOMotorClient(MONGO_URL)
db = client[MONGO_DATABASE]

def new_client():
def mongo_client():
mongo_url = MONGO_URL
mongo_db = MONGO_DATABASE
client = motor_asyncio.AsyncIOMotorClient(mongo_url)
db = client[mongo_db]
return client, db

def mongo_client_sync():
mongo_url = MONGO_URL
mongo_db = MONGO_DATABASE
client = MongoClient(mongo_url)
db = client[mongo_db]
return client, db
Empty file.
76 changes: 76 additions & 0 deletions eews_backend/predict/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
from database.influxdb import *
from stream_processing.kafka import KafkaConsumer
from stream_processing.topics import P_ARRIVAL_TOPIC
from utils.helper_functions import get_nearest_station, letInterpolate, denormalization
from predict.predictor import Predictor
from dotenv import load_dotenv
import datetime

load_dotenv()

consumer = KafkaConsumer(P_ARRIVAL_TOPIC, "eews", {})
predictor = Predictor()

def main(msg):
stations = get_nearest_station(msg["station"], 200000)

seis_data = []

for station in stations:
data = read_seis_influx(station, datetime.datetime.strptime(msg["time"],'%Y-%m-%dT%H:%M:%S.%fZ'))
if data:
data_preprocessed = preprocess(data)
seis_data.append(data_preprocessed)

if len(seis_data) == 1:
seis_data *= 3
elif len(seis_data) == 2:
seis_data.append(seis_data[0])

preds = predictor.predict(seis_data)
data_mtr = denormalization(preds.iloc[0])

print(f"{data_mtr =}")


def read_seis_influx(station: str, time: datetime.datetime):
client = InfluxDBClient(
url=INFLUXDB_URL,
org=INFLUXDB_ORG,
token=INFLUXDB_TOKEN
)
query_api = client.query_api()
start_time = (time - datetime.timedelta(0,5)).strftime('%Y-%m-%dT%H:%M:%S.%fZ')
stop_time = (time + datetime.timedelta(0,5)).strftime('%Y-%m-%dT%H:%M:%S.%fZ')
data = []

query = f"""from(bucket: "eews")
|> range(start: {start_time}, stop: {stop_time})
|> filter(fn: (r) => r._measurement == "seismograf" and r.station == "{station}")"""

tables = query_api.query(query, org="eews")

data = []
for table in tables:
res = []
for record in table.records:
res.append(record.get_value())
data.append(res)

return data

def preprocess(data):
data_interpolated = list(map(lambda x : letInterpolate(x, 2000), data))

data_interpolated_transformed = []

assert len(data_interpolated[0]) == 2000
assert len(data_interpolated[1]) == 2000
assert len(data_interpolated[2]) == 2000

for i in range(len(data_interpolated[0])):
data_interpolated_transformed.append([data_interpolated[0][i], data_interpolated[1][i], data_interpolated[2][i]])
return data_interpolated_transformed

if __name__ == "__main__":
consumer.consume(on_message=main, on_error=None)
27 changes: 27 additions & 0 deletions eews_backend/predict/predictor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import numpy as np
import pandas as pd
import pickle
import threading

class Predictor():
_instance = None
_lock = threading.Lock()

def __new__(cls):
with cls._lock:
if cls._instance is None:
cls._instance = super(Predictor, cls).__new__(cls)

return cls._instance

def __init__(self):
self.model = pickle.load(open('predict\model\model.pkl', 'rb'))

def predict(self, data):
predictions = self.model.predict(np.array(data), batch_size=4)
result = pd.DataFrame(columns=['lat','long','depth','magnitude','time'])

for prediction, col_result in zip(np.array(predictions), ['lat','long','depth','magnitude','time']):
result[col_result] = prediction.squeeze()

return result
170 changes: 170 additions & 0 deletions eews_backend/stream_processing/sandbox_predict.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
### SPARK APP
#
## Command in terminal pc
# docker cp sandbox_predict.py [container-id]:/opt/bitnami/spark/sandbox_predict.py
# docker cp model.pkl [container-id]:/opt/bitnami/spark/model.pkl
# docker exec [container-name] ./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1 --master spark://[host]:[port] sandbox_predict.py
#
## Command in terminal docker (spark)
# pip install influxdb_client motor keras tensorflow
#
## Notes
# this code has not been run yet due to resource limitation on my end :(
#
###

from influxdb_client import InfluxDBClient
from motor import motor_asyncio

from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as f

import datetime
import pickle

BOOTSTAP_SERVERS = "$"
MONGO_URL = "$"
MONGO_DATABASE = "$"
INFLUXDB_ORG = "$"
INFLUXDB_URL = "$"
INFLUXDB_TOKEN = "$"

model = pickle.load(open('model.pkl', 'rb'))

def main():
spark, sc = init_spark()
model_broadcast = sc.broadcast(model)

df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", BOOTSTAP_SERVERS) \
.option("subscribe", "p-arrival") \
.load()

schema = StructType([StructField('station', StringType()), StructField('time', StringType())])
schema_output = StructType([StructField('lat', StringType()), \
StructField('long', StringType()), \
StructField('depth', StringType()), \
StructField('magnitude', StringType()), \
StructField('time', StringType())])

df_json = df.selectExpr("CAST(value AS STRING) as json") \
.select(f.from_json('json', schema).alias('data')) \
.select('data.*')

def apply_prediction(station, time):
import json
import datetime

# get data stasiun (mongo)
stations = get_nearest_station(station, 200000)

# get and preprocess data (influx)
seis_data = []

for station in stations:
data = read_seis_influx(station, datetime.datetime.strptime(time,'%Y-%m-%dT%H:%M:%S.%fZ')) # [[e,...,e],[n,...,n],[z,...,z]]
data_preprocessed = preprocess(data)
seis_data.append(data_preprocessed)

if len(stations) == 1:
seis_data *= 3
elif len(stations) == 2:
seis_data.append(seis_data[0])

# predict
preds = model_broadcast.predict(seis_data)
data_mtr = denormalization(preds.iloc[0])

return data_mtr

prediction_udf = f.udf(lambda data1, data2: apply_prediction(data1, data2), StringType())

query = df_json.select(f.from_json(prediction_udf(df_json.station, df_json.time), schema_output).alias('response'))\
.select('response.*') \
.writeStream \
.trigger(once=True) \
.format("console") \

query.awaitTermination()


def init_spark():
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
return spark, sc

def read_seis_influx(station: str, time: datetime.datetime):
client = InfluxDBClient(
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move all of the db client initialization to on top of the file so that it is only being initialized once. The current code means you will create a new connection to the db every time you want to query

url=INFLUXDB_URL,
org=INFLUXDB_ORG,
token=INFLUXDB_TOKEN
)
query_api = client.query_api()
start_time = (time - datetime.timedelta(0,5)).strftime('%Y-%m-%dT%H:%M:%S.%fZ')
stop_time = (time + datetime.timedelta(0,5)).strftime('%Y-%m-%dT%H:%M:%S.%fZ')
data = []

query = f"""from(bucket: "eews")
|> range(start: {start_time}, stop: {stop_time})
|> filter(fn: (r) => r._measurement == "seismograf" and r.station == "{station}")"""

tables = query_api.query(query, org="eews")

data = []
for table in tables: #ENZ
res = []
for record in table.records:
res.append(record.get_value())
data.append(res)

return data

def get_nearest_station(name, max_distance):
client = motor_asyncio.AsyncIOMotorClient(MONGO_URL)
db = client[MONGO_DATABASE]
coordinates = db['seismometer'].find_one({ 'name': name })['location']['coordinates']
stations = db['seismometer'].find({ 'location': {'$nearSphere': {'$geometry': {'type': 'Point', 'coordinates': coordinates}, '$maxDistance': max_distance}}}, {'name': 1, '_id': 0})
return names if len(names:=[station['name'] for station in stations]) <= 3 else names[:3]

def preprocess(data):
data_interpolated = list(map(lambda x : letInterpolate(x, 2000), data)) # interpolate to 2000 data each channel

data_interpolated_transformed = [] # transform to [[e,n,z], ..., [e,n,z]]

for i in range(len(data_interpolated[0])):
data_interpolated_transformed.append([data_interpolated[0][i], data_interpolated[1][i], data_interpolated[2][i]])
return data_interpolated_transformed

def letInterpolate(inp, new_len):
delta = (len(inp)-1) / (new_len-1)
outp = [interpolate(inp, i*delta) for i in range(new_len)]
return outp

def interpolate(lst, fi):
i, f = int(fi // 1), fi % 1 # Split floating-point index into whole & fractional parts.
j = i+1 if f > 0 else i # Avoid index error.
return (1-f) * lst[i] + f * lst[j]

def denormalization(data):
max,min = {},{}
max['lat'] = -6.64264
min['lat'] = -11.5152
max['long'] = 115.033
min['long'] = 111.532
max['depth'] = 588.426
min['depth'] = 1.16
max['magnitude'] = 6.5
min['magnitude'] = 3.0
max['time'] = 74.122
min['time'] = 4.502

dats = {}
for col in data.index:
dats[col] = data[col]*(max[col] - min[col])+min[col]
return dats

if __name__ == "__main__":
main()
Loading