Skip to content

Commit

Permalink
Add tree support
Browse files Browse the repository at this point in the history
  • Loading branch information
yamelsenih committed Jun 17, 2024
1 parent 89b1a59 commit 31853f9
Show file tree
Hide file tree
Showing 9 changed files with 800 additions and 21 deletions.
4 changes: 2 additions & 2 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ RUST_LOG=info
PORT=7878
ALLOWED_ORIGIN="*"
KAFKA_ENABLED="Y"
KAFKA_QUEUES="menu browser form process window"
KAFKA_HOST="0.0.0.0:29092"
KAFKA_QUEUES="menu browser form process window menu_item menu_tree"
KAFKA_HOST="localhost:29092"
KAFKA_GROUP="default"
OPENSEARCH_URL="http://localhost:9200"
VERSION="1.0.0-dev"
27 changes: 12 additions & 15 deletions docker-compose/.env
Original file line number Diff line number Diff line change
Expand Up @@ -6,53 +6,50 @@ GENERIC_RESTART="on-failure"

# OpenSearch https://opensearch.org/
OPENSEARCH_IMAGE="opensearchproject/opensearch:2.13.0"
OPENSEARCH_HOST="default-stack.opensearch-service"
OPENSEARCH_VOLUME="default-stack.volume_opensearch"
OPENSEARCH_HOST="${COMPOSE_PROJECT_NAME}.opensearch-service"
OPENSEARCH_VOLUME="${COMPOSE_PROJECT_NAME}.volume_opensearch"
OPENSEARCH_PORT="9200"
OPENSEARCH_PERFORMANCE_PORT="9600"

# OpenSearch restore db
OPENSEARCH_SETUP_HOST="default-stack.opensearch-setup"
OPENSEARCH_SETUP_HOST="${COMPOSE_PROJECT_NAME}.opensearch-setup"
OPENSEARCH_SETUP_NODE_HOSTNAME="http://${OPENSEARCH_HOST}:${OPENSEARCH_PORT}"

# OpenSearch Dashboards UI
OPENSEARCH_DASHBOARDS_IMAGE="opensearchproject/opensearch-dashboards:2.13.0"
OPENSEARCH_DASHBOARDS_HOST="default-stack.opensearch-dashboards"
OPENSEARCH_DASHBOARDS_HOST="${COMPOSE_PROJECT_NAME}.opensearch-dashboards"
OPENSEARCH_DASHBOARDS_PORT=5601 # Do not change, not parameterizable
OPENSEARCH_DASHBOARDS_EXTERNAL_PORT=5601
OPENSEARCH_DASHBOARDS_OPENSEARCH_HOSTS="[\"http://${OPENSEARCH_HOST}:${OPENSEARCH_PORT}\"]"


# Zookeeper to manage kafka brokers
ZOOKEEPER_IMAGE="confluentinc/cp-zookeeper:7.6.1"
ZOOKEEPER_HOST="default-stack.zookeeper"
ZOOKEEPER_HOST="${COMPOSE_PROJECT_NAME}.zookeeper"
ZOOKEEPER_PORT=2181
ZOOKEEPER_TICK_TIME=2000

# Kafka https://www.confluent.io/home/
KAFKA_IMAGE="confluentinc/cp-kafka:latest"
KAFKA_HOST="default-stack.kafka"
KAFKA_PORT=9092
KAFKA_EXTERNAL_PORT=9092
KAFKA_BROKERCONNECT="kafka:${KAFKA_PORT}"
KAFKA_BROKER_PORT=29092
KAFKA_BROKER_EXTERNAL_PORT=29092
KAFKA_EXTERNAL_BROKERCONNECT="${KAFKA_HOST}:${KAFKA_BROKER_EXTERNAL_PORT}"
KAFKA_HOST="${COMPOSE_PROJECT_NAME}.kafka"
KAFKA_EXTERNAL_PORT=29092
KAFKA_INTERNAL_HOST="kafka:9092"
KAFKA_BROKER_HOST="localhost:${KAFKA_EXTERNAL_PORT}"

# Kafdrop Kafka Cluster Overview
KAFDROP_IMAGE="obsidiandynamics/kafdrop:4.0.1"
KAFDROP_HOST="default-stack.kafdrop"
KAFDROP_HOST="${COMPOSE_PROJECT_NAME}.kafdrop"
KAFDROP_PORT=9000
KAFDROP_EXTERNAL_PORT=19000
KAFDROP_KAFKA_HOST="${KAFKA_BROKERCONNECT}"


# Dictionary gateway with OpenSearch
DICTIONARY_RS_HOST="default-stack.dictionary-rs"
DICTIONARY_RS_HOST="${COMPOSE_PROJECT_NAME}.dictionary-rs"
DICTIONARY_RS_IMAGE="openls/dictionary-rs:1.2.0"
DICTIONARY_RS_PORT=7878



# Networks
DEFAULT_NETWORK="default-stack.adempiere_network"
DEFAULT_NETWORK="${COMPOSE_PROJECT_NAME}.adempiere_network"
120 changes: 120 additions & 0 deletions docker-compose/docker-compose-dev.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
services:

zookeeper:
image: $ZOOKEEPER_IMAGE
container_name: $ZOOKEEPER_HOST
restart: $GENERIC_RESTART
healthcheck:
test: "bash -c 'printf \"GET / HTTP/1.1\n\n\" > /dev/tcp/127.0.0.1/$ZOOKEEPER_PORT; exit $?;'"
interval: 10s
retries: 60
start_period: 20s
timeout: 10s
environment:
ZOOKEEPER_CLIENT_PORT: ${ZOOKEEPER_PORT}
ZOOKEEPER_TICK_TIME: ${ZOOKEEPER_TICK_TIME}
ports:
- ${ZOOKEEPER_PORT}:2181
networks:
- shared_network

kafka:
image: ${KAFKA_IMAGE}
container_name: ${KAFKA_HOST}
restart: ${GENERIC_RESTART}
depends_on:
zookeeper:
condition: service_healthy
healthcheck:
test: "bash -c 'printf \"GET / HTTP/1.1\n\n\" > /dev/tcp/127.0.0.1/${KAFKA_PORT}; exit $?;'"
interval: 10s
retries: 60
start_period: 20s
timeout: 10s
ports:
- ${KAFKA_EXTERNAL_PORT}:${KAFKA_EXTERNAL_PORT}
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://${KAFKA_BROKER_HOST}
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
networks:
- shared_network

kafdrop:
image: ${KAFDROP_IMAGE}
container_name: ${KAFDROP_HOST}
hostname: ${KAFDROP_HOST}
restart: ${GENERIC_RESTART}
depends_on:
kafka:
condition: service_healthy
environment:
KAFKA_BROKERCONNECT: ${KAFDROP_KAFKA_HOST}
ports:
- ${KAFDROP_EXTERNAL_PORT}:${KAFDROP_PORT}
networks:
- shared_network

opensearch-service:
image: ${OPENSEARCH_IMAGE}
container_name: ${OPENSEARCH_HOST}
restart: ${GENERIC_RESTART}
healthcheck:
test: "bash -c 'printf \"GET / HTTP/1.1\n\n\" > /dev/tcp/127.0.0.1/9200; exit $?;'"
interval: 10s
retries: 60
start_period: 20s
timeout: 10s
environment:
- node.name=opensearch-service
- discovery.type=single-node
- "DISABLE_INSTALL_DEMO_CONFIG=true" # Prevents execution of bundled demo script which installs demo certificates and security configurations to OpenSearch
- "DISABLE_SECURITY_PLUGIN=true" # Disables Security plugin
ulimits:
memlock:
soft: -1
hard: -1
nofile:
soft: 65536 # maximum number of open files for the OpenSearch user, set to at least 65536 on modern systems
hard: 65536
volumes:
- volume_opensearch:/usr/share/opensearch/data
- ./opensearch/snapshots:/mnt/snapshots
- ./opensearch/opensearch.yml:/usr/share/opensearch/config/opensearch.yml
ports:
- ${OPENSEARCH_PORT}:9200
- ${OPENSEARCH_PERFORMANCE_PORT}:9600 # required for Performance Analyzer
networks:
- shared_network

opensearch-dashboards:
image: ${OPENSEARCH_DASHBOARDS_IMAGE}
container_name: ${OPENSEARCH_DASHBOARDS_HOST}
hostname: ${OPENSEARCH_DASHBOARDS_HOST}
healthcheck:
test: "bash -c 'printf \"GET / HTTP/1.1\n\n\" > /dev/tcp/127.0.0.1/5601; exit $?;'"
interval: 10s
retries: 60
start_period: 20s
timeout: 10s
depends_on:
opensearch-service:
condition: service_healthy
environment:
OPENSEARCH_HOSTS: ${OPENSEARCH_DASHBOARDS_OPENSEARCH_HOSTS}
DISABLE_SECURITY_DASHBOARDS_PLUGIN: true
ports:
- ${OPENSEARCH_DASHBOARDS_EXTERNAL_PORT}:5601 # Map host port 5601 to container port 5601
networks:
- shared_network

networks:
shared_network:
name: ${DEFAULT_NETWORK}

volumes:
volume_opensearch:
name: ${OPENSEARCH_VOLUME}
6 changes: 3 additions & 3 deletions docker-compose/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ services:
start_period: 20s
timeout: 10s
environment:
ZOOKEEPER_CLIENT_PORT: $ZOOKEEPER_PORT
ZOOKEEPER_CLIENT_PORT: ${ZOOKEEPER_PORT}
ZOOKEEPER_TICK_TIME: ${ZOOKEEPER_TICK_TIME}
# ports:
# - ${ZOOKEEPER_PORT}:2181
Expand All @@ -32,11 +32,11 @@ services:
start_period: 20s
timeout: 10s
ports:
- ${KAFKA_BROKER_EXTERNAL_PORT}:${KAFKA_BROKER_PORT}
- ${KAFKA_EXTERNAL_PORT}:${KAFKA_EXTERNAL_PORT}
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://${KAFKA_BROKERCONNECT},PLAINTEXT_HOST://${KAFKA_EXTERNAL_BROKERCONNECT}
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://${KAFKA_BROKER_HOST}
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
Expand Down
36 changes: 35 additions & 1 deletion src/bin/server.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::env;
use dictionary_rs::{controller::{kafka::create_consumer, opensearch::{create, delete, IndexDocument}}, models::{browser::{browser_from_id, browsers, BrowserDocument}, form::{form_from_id, forms, FormDocument}, menu::{menu_from_id, menus, MenuDocument}, process::{process_from_id, processes, ProcessDocument}, window::{window_from_id, windows, WindowDocument}}};
use dictionary_rs::{controller::{kafka::create_consumer, opensearch::{create, delete, IndexDocument}}, models::{browser::{browser_from_id, browsers, BrowserDocument}, form::{form_from_id, forms, FormDocument}, menu::{menu_from_id, menus, MenuDocument}, menu_item::MenuItemDocument, menu_tree::MenuTreeDocument, process::{process_from_id, processes, ProcessDocument}, window::{window_from_id, windows, WindowDocument}}};
use dotenv::dotenv;
use rdkafka::{Message, consumer::{CommitMode, Consumer}};
use salvo::{conn::tcp::TcpAcceptor, cors::Cors, http::header, hyper::Method, prelude::*};
Expand Down Expand Up @@ -471,6 +471,40 @@ async fn consume_queue() {
Err(error) => log::warn!("{}", error)
}
}
} else if topic == "menu_item" {
let _document = match serde_json::from_str(payload) {
Ok(value) => value,
Err(error) => {
log::warn!("{}", error);
MenuItemDocument {
document: None
}
},
};
if _document.document.is_some() {
let _menu_document: &dyn IndexDocument = &(_document.document.unwrap());
match process_index(event_type, _menu_document).await {
Ok(_) => consumer.commit_message(&message, CommitMode::Async).unwrap(),
Err(error) => log::warn!("{}", error)
}
}
} else if topic == "menu_tree" {
let _document = match serde_json::from_str(payload) {
Ok(value) => value,
Err(error) => {
log::warn!("{}", error);
MenuTreeDocument {
document: None
}
},
};
if _document.document.is_some() {
let _menu_document: &dyn IndexDocument = &(_document.document.unwrap());
match process_index(event_type, _menu_document).await {
Ok(_) => consumer.commit_message(&message, CommitMode::Async).unwrap(),
Err(error) => log::warn!("{}", error)
}
}
} else if topic == "process" {
let _document = match serde_json::from_str(payload) {
Ok(value) => value,
Expand Down
2 changes: 2 additions & 0 deletions src/controller/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ pub fn create_consumer(brokers: &str, group_id: &str, topics: &[&str]) -> Result
.set("group.id", group_id)
.set("bootstrap.servers", brokers)
.set("enable.partition.eof", "false")
.set("enable.partition.eof", "false")
.set("session.timeout.ms", "6000")
.set("enable.auto.commit", "true")
.set("message.max.bytes", "1000000000")
Expand All @@ -45,6 +46,7 @@ pub fn create_consumer(brokers: &str, group_id: &str, topics: &[&str]) -> Result
.set("queued.max.messages.kbytes", "2097151")
.set("fetch.message.max.bytes", "1000000000")
.set("max.partition.fetch.bytes", "1000000000")
.set("max.poll.interval.ms", "86400000")
.set("fetch.max.bytes", "2147483135")
.set("auto.offset.reset", "earliest")
.set_log_level(RDKafkaLogLevel::Debug)
Expand Down
Loading

0 comments on commit 31853f9

Please sign in to comment.