Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prediction #4

Open
wants to merge 33 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
4e765e4
add spark docker
natsyz Jul 24, 2023
cd6ed50
refactor and add function
natsyz Jul 24, 2023
d376e40
update spark image
natsyz Jul 26, 2023
e80e7ae
Merge branch 'main' into main
natsyz Jul 26, 2023
e77878e
init predict module
natsyz Jul 26, 2023
279b22e
attempt to fix missing timestamp from influx
Zsaschz Jul 27, 2023
26b93b3
local predict
natsyz Jul 27, 2023
11bfaa5
refactor
natsyz Jul 28, 2023
fbe3a43
add generator to simulate realtime data
Zsaschz Jul 30, 2023
a477f14
refactor db module
Zsaschz Jul 31, 2023
43bb8b5
refactor to use multiprocess
Zsaschz Jul 31, 2023
94ab505
add option to simulate realtime
Zsaschz Jul 31, 2023
39fe5dd
map to fetch stations data & refactor
Zsaschz Jul 31, 2023
f16f512
fix generator file
Zsaschz Jul 31, 2023
166b3a2
add fill empty timestamp and send key to kafka
Zsaschz Aug 3, 2023
f210c4e
fix time format for kafka event
Zsaschz Aug 3, 2023
0d7b91e
add predict script
natsyz Aug 4, 2023
3a547b5
Add pause time
natsyz Aug 7, 2023
c6d8236
add nginx conf
Zsaschz Aug 15, 2023
bf0ec4f
add seedlink realtime query and change window data on kafka
Zsaschz Aug 15, 2023
7e5823a
refactor event time
Zsaschz Aug 16, 2023
f08ec33
Update station model
natsyz Aug 16, 2023
636ff23
Merge branch 'rest' of https://github.com/Zsaschz/eews-event-driven i…
natsyz Aug 16, 2023
3e14168
Update flow: without read from db
natsyz Aug 16, 2023
050d92a
dockerize seedlink and rest api
Zsaschz Aug 16, 2023
98f5f8a
Merge pull request #7 from natsyz/rest
Zsaschz Aug 21, 2023
c1f6270
finish frontend and add prettier config
Zsaschz Aug 24, 2023
1b1693c
update rest api and seedlink
Zsaschz Aug 24, 2023
62308de
Merge branch 'rest' of https://github.com/Zsaschz/eews-event-driven i…
natsyz Oct 8, 2023
a203f71
merging p arrival prediction
natsyz Oct 8, 2023
ed52ba8
add prediction script
natsyz Oct 8, 2023
1980538
update code from cloud
natsyz Oct 8, 2023
4025597
syntax fixing
natsyz Oct 18, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ db.sqlite3
/data/
/mseed/
*.mseed
*.zip
*.pick

# See https://help.github.com/articles/ignoring-files/ for more about ignoring files.

Expand Down
4 changes: 4 additions & 0 deletions .prettierrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"tabWidth": 2,
"useTabs": false
}
87 changes: 87 additions & 0 deletions docker-compose.kafka.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
version: "3.3"

services:
zookeeper:
restart: always
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
image: confluentinc/cp-zookeeper:latest
ports:
- "2181:2181/tcp"
volumes:
- ./zookeeper/data:/data
- ./zookeeper/data/datalog:/datalog

kafka:
restart: always
environment:
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://34.128.127.171:9092
KAFKA_BROKER_ID: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_JMX_HOSTNAME: 34.128.127.171
KAFKA_JMX_PORT: 9999
image: confluentinc/cp-kafka:latest
user: root
ports:
- "9092:9092"
- "9999:9999"
volumes:
- ./kafka/data/kafka-1:/var/lib/kafka/data
depends_on:
- zookeeper

kafka-2:
restart: always
environment:
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-2:29092,PLAINTEXT_HOST://34.128.127.171:9093
KAFKA_BROKER_ID: 2
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_JMX_HOSTNAME: 34.128.127.171
KAFKA_JMX_PORT: 9999
image: confluentinc/cp-kafka:latest
user: root
ports:
- "9093:9093"
- "9998:9999"
volumes:
- ./kafka/data/kafka-2:/var/lib/kafka/data
depends_on:
- zookeeper

kafka-3:
restart: always
environment:
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-3:29092,PLAINTEXT_HOST://34.128.127.171:9094
KAFKA_BROKER_ID: 3
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_JMX_HOSTNAME: 34.128.127.171
KAFKA_JMX_PORT: 9999
image: confluentinc/cp-kafka:latest
user: root
ports:
- "9094:9094"
- "9997:9999"
volumes:
- ./kafka/data/kafka-3:/var/lib/kafka/data
depends_on:
- zookeeper
- ./cassandra-sink/kafka-connect/plugins:/data/connectors

# Utility
kafka-ui:
image: provectuslabs/kafka-ui
container_name: kafka-ui
ports:
- "9080:8080"
restart: always
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: PLAINTEXT://kafka:29092,PLAINTEXT://kafka-2:29092,PLAINTEXT://kafka-3:29092
KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://kafka-schema-registry:8081/
120 changes: 119 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
version: '3'

services:
nginx_lb:
image: nginx
ports:
- 8080:8080
volumes:
- ./nginx/nginx.conf:/etc/nginx/nginx.conf

mongodb:
image: mongo
restart: always
ports:
- 27018:27017
- 27017:27017
volumes:
- ./data/mongodb:/data/db

Expand All @@ -23,6 +30,85 @@ services:
DOCKER_INFLUXDB_INIT_ORG: eews
DOCKER_INFLUXDB_INIT_BUCKET: eews

spark-master:
image: bitnami/spark:latest
ports:
- "8080:8080"
environment:
- SPARK_MODE=master
volumes:
- ./data:/data

spark-worker-1:
image: bitnami/spark:latest
environment:
- SPARK_MODE=worker
- SPARK_MASTER_URL=spark://spark-master:7077
depends_on:
- spark-master
volumes:
- ./data:/data

spark-worker-2:
image: bitnami/spark:latest
environment:
- SPARK_MODE=worker
- SPARK_MASTER_URL=spark://spark-master:7077
depends_on:
- spark-master
volumes:
- ./data:/data\

# spark-master:
# image: cluster-apache-spark:3.4.1
# ports:
# - "9095:9090"
# - "7077:7077"
# volumes:
# - ./apps/spark:/opt/spark-apps
# - ./data/spark:/opt/spark-data
# environment:
# - SPARK_LOCAL_IP=spark-master
# - SPARK_WORKLOAD=master

# spark-worker-a:
# image: cluster-apache-spark:3.4.1
# ports:
# - "9096:9090"
# - "7000:7000"
# depends_on:
# - spark-master
# environment:
# - SPARK_MASTER=spark://spark-master:7077
# - SPARK_WORKER_CORES=1
# - SPARK_WORKER_MEMORY=1G
# - SPARK_DRIVER_MEMORY=1G
# - SPARK_EXECUTOR_MEMORY=1G
# - SPARK_WORKLOAD=worker
# - SPARK_LOCAL_IP=spark-worker-a
# volumes:
# - ./apps/spark:/opt/spark-apps
# - ./data/spark:/opt/spark-data

# spark-worker-b:
# image: cluster-apache-spark:3.4.1
# ports:
# - "9097:9090"
# - "7001:7000"
# depends_on:
# - spark-master
# environment:
# - SPARK_MASTER=spark://spark-master:7077
# - SPARK_WORKER_CORES=1
# - SPARK_WORKER_MEMORY=1G
# - SPARK_DRIVER_MEMORY=1G
# - SPARK_EXECUTOR_MEMORY=1G
# - SPARK_WORKLOAD=worker
# - SPARK_LOCAL_IP=spark-worker-b
# volumes:
# - ./apps:/opt/spark-apps
# - ./data:/opt/spark-data

zookeeper:
restart: always
environment:
Expand All @@ -35,6 +121,36 @@ services:
- ./data/zookeeper/data:/data
- ./data/zookeeper/data/datalog:/datalog

spark-master:
image: risw/spark-python:v-1
ports:
- "8080:8080"
environment:
- SPARK_MODE=master
volumes:
- ./data/spark/master:/data

spark-worker-1:
image: risw/spark-python:v-1
environment:
- SPARK_MODE=worker
- SPARK_MASTER_URL=spark://spark-master:7077
depends_on:
- spark-master
volumes:
- ./data/spark/worker-1:/data

spark-worker-2:
image: risw/spark-python:v-1
environment:
- SPARK_MODE=worker
- SPARK_MASTER_URL=spark://spark-master:7077
depends_on:
- spark-master
volumes:
- ./data/spark/worker-2:/data


kafka:
restart: always
environment:
Expand Down Expand Up @@ -65,3 +181,5 @@ services:
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
KAFKA_CLUSTERS_1_NAME: cloud
KAFKA_CLUSTERS_1_BOOTSTRAPSERVERS: 34.101.119.214:9092,34.101.119.214:9093,34.101.119.214:9094
2 changes: 0 additions & 2 deletions eews_backend/database/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +0,0 @@
from .mongodb import *
from .influxdb import *
10 changes: 4 additions & 6 deletions eews_backend/database/influxdb.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from influxdb_client.client.write_api import SYNCHRONOUS
from influxdb_client import InfluxDBClient, WriteOptions
from influxdb_client import InfluxDBClient
from dotenv import load_dotenv
import os

Expand All @@ -12,8 +12,6 @@
INFLUXDB_URL = os.getenv("INFLUXDB_URL")
INFLUXDB_TOKEN = os.getenv("INFLUXDB_TOKEN")

client = InfluxDBClient(
url=INFLUXDB_URL,
org=INFLUXDB_ORG,
token=INFLUXDB_TOKEN
)

def influx_client():
return InfluxDBClient(url=INFLUXDB_URL, org=INFLUXDB_ORG, token=INFLUXDB_TOKEN)
20 changes: 18 additions & 2 deletions eews_backend/database/mongodb.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,36 @@
from dotenv import load_dotenv
from motor import motor_asyncio
from typing import Optional
from pymongo import MongoClient
import os

load_dotenv()

DEFAULT_MONGO_DATABASE = "db"
MONGO_URL = os.getenv("MONGO_URL")
MONGO_DATABASE = os.getenv("MONGO_DATABASE") if os.getenv("MONGO_DATABASE") else DEFAULT_MONGO_DATABASE
MONGO_DATABASE = (
os.getenv("MONGO_DATABASE")
if os.getenv("MONGO_DATABASE")
else DEFAULT_MONGO_DATABASE
)

<<<<<<< HEAD
client = motor_asyncio.AsyncIOMotorClient(MONGO_URL)
db = client[MONGO_DATABASE]

def new_client():
=======

>>>>>>> 1b1693c80444dd866bde20d9ac203407977458aa
def mongo_client():
mongo_url = MONGO_URL
mongo_db = MONGO_DATABASE
client = motor_asyncio.AsyncIOMotorClient(mongo_url)
db = client[mongo_db]
return client, db

def mongo_client_sync():
mongo_url = MONGO_URL
mongo_db = MONGO_DATABASE
client = MongoClient(mongo_url)
db = client[mongo_db]
return client, db
85 changes: 85 additions & 0 deletions eews_backend/generator/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
from ast import List
import requests
from obspy import read, Stream, UTCDateTime, Trace
import time
import os
import shutil
import threading
import multiprocessing

MSEED_FOLDER = "eews_backend/mseed/"
SOURCE_MSEED = "eews_backend/generator/20150920_151412.mseed"
REST_URL = "http://127.0.0.1:8000"
MSEED_RANGE_IN_SECONDS = 10

global process_list
process_list = []


def main():
split_mseed()
global process_list
print("Start sending file to", REST_URL)
for station in os.listdir(MSEED_FOLDER):
send_process = multiprocessing.Process(target=send, args=(station,))
send_process.name = station
process_list.append(send_process)
for process in process_list:
process.start()


def send(station: str):
folder = f"{MSEED_FOLDER}/{station}/"
BHE: List[str] = os.listdir(f"{folder}BHE/")
BHN: List[str] = os.listdir(f"{folder}BHN/")
BHZ: List[str] = os.listdir(f"{folder}BHZ/")

for index in range(len(BHE)):
bhe_mseed: bytes = open(f"{MSEED_FOLDER}/{station}/BHE/{BHE[index]}", "rb")
bhn_mseed: bytes = open(f"{MSEED_FOLDER}/{station}/BHN/{BHN[index]}", "rb")
bhz_mseed: bytes = open(f"{MSEED_FOLDER}/{station}/BHZ/{BHZ[index]}", "rb")

threading.Thread(
target=post, args=(f"{REST_URL}/mseed", {"file": bhe_mseed})
).start()
threading.Thread(
target=post, args=(f"{REST_URL}/mseed", {"file": bhn_mseed})
).start()
threading.Thread(
target=post, args=(f"{REST_URL}/mseed", {"file": bhz_mseed})
).start()

time.sleep(10)


def post(url, files):
requests.post(url, files=files)


def split_mseed():
print("Mseed will be saved to folder", MSEED_FOLDER)
print("Splitting mseed")
st: Stream = read(SOURCE_MSEED)
first_starttime = min([trace.stats["starttime"] for trace in st])
dt = UTCDateTime(first_starttime)
last_endtime = max([trace.stats["endtime"] for trace in st])
trace: Trace
shutil.rmtree(MSEED_FOLDER)
while dt <= last_endtime:
trimmed = st.slice(dt, dt + MSEED_RANGE_IN_SECONDS)
for trace in trimmed:
stats = trace.stats
filename = f"{MSEED_FOLDER}{stats['station']}/{stats['channel']}/{dt.strftime('%Y%m%d')}_{stats['starttime'].strftime('%H%M%S')}.mseed"
os.makedirs(os.path.dirname(filename), exist_ok=True)
trace.write(filename=filename, format="MSEED")
dt += MSEED_RANGE_IN_SECONDS
print("Finished splitting mseed")


if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
for process in process_list:
process.terminate()
process.join()
Loading