-
Notifications
You must be signed in to change notification settings - Fork 322
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: add online offline consistency validation script (#3974)
* Add validation scripts
- Loading branch information
Showing
7 changed files
with
84 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
The idea of this solution is to run the same query in both online and offline modes in OpenMLDB and compare their results to ensure they produce the same outcome. | ||
|
||
Run the test_consistency.sh script in /tmp/ and ensure that the paths for OPENMLDB_BIN_PATH, zk_cluster, and zk_root_path are set correctly. |
Binary file not shown.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
SET @@execute_mode='offline'; | ||
SET @@sync_job = "true"; | ||
USE demo_db; | ||
SELECT c1, c2, sum(c3) OVER w1 AS w1_c3_sum FROM demo_table1 WINDOW w1 AS (PARTITION BY demo_table1.c1 ORDER BY demo_table1.c6 ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) INTO outfile "file:///tmp/offline.csv" OPTIONS (header = false, mode = 'overwrite'); | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
USE demo_db; | ||
SELECT c1, c2, sum(c3) OVER w1 AS w1_c3_sum FROM demo_table1 WINDOW w1 AS (PARTITION BY demo_table1.c1 ORDER BY demo_table1.c6 ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) CONFIG (execute_mode = 'request', values = ("aaa", 11, 22, 1.2, 1.3, 1635247427000, "2021-05-20")); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
CREATE DATABASE demo_db; | ||
USE demo_db; | ||
CREATE TABLE demo_table1(c1 string, c2 int, c3 bigint, c4 float, c5 double, c6 timestamp, c7 date); | ||
SET @@execute_mode='offline'; | ||
SET @@sync_job = "true"; | ||
LOAD DATA INFILE 'file:///tmp/data.parquet' INTO TABLE demo_table1 options(format='parquet', mode='append'); | ||
SET @@execute_mode='offline'; | ||
SELECT c1, c2, sum(c3) OVER w1 AS w1_c3_sum FROM demo_table1 WINDOW w1 AS (PARTITION BY demo_table1.c1 ORDER BY demo_table1.c6 ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) INTO OUTFILE '/tmp/feature_data' OPTIONS(mode='overwrite'); | ||
SET @@execute_mode='online'; | ||
DEPLOY demo_data_service SELECT c1, c2, sum(c3) OVER w1 AS w1_c3_sum FROM demo_table1 WINDOW w1 AS (PARTITION BY demo_table1.c1 ORDER BY demo_table1.c6 ROWS BETWEEN 2 PRECEDING AND CURRENT ROW); | ||
SET @@execute_mode='online'; | ||
LOAD DATA INFILE 'file:///tmp/data.parquet' INTO TABLE demo_table1 options(format='parquet', header=true, mode='append'); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
import subprocess | ||
import os | ||
import csv | ||
import sys | ||
|
||
openmldb_binary_path = sys.argv[1] | ||
# Execute the offline SQL command | ||
subprocess.run(f"{openmldb_binary_path} --zk_cluster=127.0.0.1:2181 --zk_root_path=/openmldb --role=sql_client < offline.sql", | ||
shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) | ||
|
||
# Define the directory containing the CSV files | ||
csv_dir = "/tmp/offline.csv/" | ||
|
||
# List all files in the directory | ||
files = os.listdir(csv_dir) | ||
|
||
# Filter out non-CSV files (and avoid reading .crc files) | ||
csv_files = [file for file in files if file.endswith('.csv')] | ||
|
||
# Initialize an empty list to store the combined data | ||
combined_data = [] | ||
|
||
# Read and concatenate all CSV files | ||
for file in csv_files: | ||
with open(os.path.join(csv_dir, file), newline='') as csvfile: | ||
reader = csv.reader(csvfile) | ||
for row in reader: | ||
combined_data.append(row) | ||
|
||
# Define the command to be executed | ||
command = f"{openmldb_binary_path} --zk_cluster=127.0.0.1:2181 --zk_root_path=/openmldb --role=sql_client < online.sql" | ||
|
||
# Execute the command | ||
try: | ||
result = subprocess.run(command, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) | ||
output = result.stdout | ||
|
||
# Extract the relevant line containing the data | ||
lines = output.splitlines() | ||
for i, line in enumerate(lines): | ||
if "c1" in line and "c2" in line and "w1_c3_sum" in line: | ||
data_line = lines[i + 2] # The line containing the data is two lines below the header | ||
break | ||
|
||
# Split the line into an array | ||
data_array = data_line.split() | ||
|
||
# Check if the specific row exists | ||
row_exists = any(row == data_array for row in combined_data) | ||
|
||
if row_exists: | ||
print("Online and offline data consistent") | ||
else: | ||
print("Online and offline data not consistent") | ||
|
||
# Print the resulting array | ||
except subprocess.CalledProcessError as e: | ||
print("An error occurred while executing the command:", e) | ||
print("Error Output:\n", e.stderr) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
OPENMLDB_BIN_PATH=/work/openmldb/bin/openmldb | ||
$OPENMLDB_BIN_PATH --zk_cluster=127.0.0.1:2181 --zk_root_path=/openmldb --role=sql_client < /tmp/prepare_data.sql | ||
python3 test.py $OPENMLDB_BIN_PATH |