diff --git a/docs/en/deploy/install_deploy.md b/docs/en/deploy/install_deploy.md
index 3eec28627b8..e436579d724 100644
--- a/docs/en/deploy/install_deploy.md
+++ b/docs/en/deploy/install_deploy.md
@@ -22,6 +22,10 @@ Generally, ldd version should be >= 2.17, and GLIBC_2.17 should be present in li
If you need to deploy ZooKeeper and TaskManager, you need a Java runtime environment.
+Servers needs Java 1.8 or above.
+Zookeeper Client 3.4.14 requires `Java 1.7` - `Java 13`. Java SDK depends on it, so it should use the same Java version, don't run in higher version. If you wish to use zkCli, please use `Java 1.8` or `Java 11`.
### Hardware
Regarding hardware requirements:
diff --git a/docs/en/quickstart/sdk/python_sdk.md b/docs/en/quickstart/sdk/python_sdk.md
index eb69f3b1a5e..491a3c81812 100644
--- a/docs/en/quickstart/sdk/python_sdk.md
+++ b/docs/en/quickstart/sdk/python_sdk.md
@@ -116,6 +116,8 @@ cursor.close()
This section demonstrates the use of the Python SDK through OpenMLDB SQLAlchemy. Similarly, if any of the DBAPI interfaces fail, they will raise a `DatabaseError` exception. Users can catch and handle this exception as needed. The handling of return values should follow the SQLAlchemy standard.
+The integrated SQLAlchemy defaults to version 2.0 while remaining compatible with the old version 1.4. If a user's SQLAlchemy version is 1.4, they can adjust the interface names according to the [version differences](python_sdk.md#sqlalchemy-version-differences). OpenMLDB SDK only supports version 1.4 in version 0.8.5 and before. Starting from version 0.8.5 (excluding 0.8.5), it begins to support version 2.0.
### Create Connection
@@ -134,98 +136,135 @@ connection = engine.connect()
### Create Database
-Use the `connection.execute()` interface to create database `db1`:
+Use the `connection.exec_driver_sql()` interface to create database `db1`:
- connection.execute("CREATE DATABASE db1")
+ connection.exec_driver_sql("CREATE DATABASE db1")
except Exception as e:
-connection.execute("USE db1")
+connection.exec_driver_sql("USE db1")
### Create Table
-Use the `connection.execute()` interface to create table `t1`:
+Use the `connection.exec_driver_sql()` interface to create table `t1`:
- connection.execute("CREATE TABLE t1 ( col1 bigint, col2 date, col3 string, col4 string, col5 int, index(key=col3, ts=col1))")
+ connection.exec_driver_sql("CREATE TABLE t1 ( col1 bigint, col2 date, col3 string, col4 string, col5 int, index(key=col3, ts=col1))")
except Exception as e:
### Insert Data into Table
-Use the `connection.execute (ddl)` interface to execute the SQL insert statement, and you can insert data into the table:
+Use the `connection.exec_driver_sql (ddl)` interface to execute the SQL insert statement, and you can insert data into the table:
- connection.execute("INSERT INTO t1 VALUES(1000, '2020-12-25', 'guangdon', 'shenzhen', 1);")
+ connection.exec_driver_sql("INSERT INTO t1 VALUES(1000, '2020-12-25', 'guangdon', 'shenzhen', 1);")
except Exception as e:
-Use the `connection.execute (ddl, data)` interface to execute the insert statement of SQL with placeholder. You can specify the insert data dynamically or insert multiple rows:
+Use the `connection.exec_driver_sql (ddl, data)` interface to execute the insert statement of SQL with placeholder. You can specify the insert data dynamically or insert multiple rows:
insert = "INSERT INTO t1 VALUES(1002, '2020-12-27', ?, ?, 3);"
- connection.execute(insert, ({"col3":"fujian", "col4":"fuzhou"}))
- connection.execute(insert, [{"col3":"jiangsu", "col4":"nanjing"}, {"col3":"zhejiang", "col4":"hangzhou"}])
+ connection.exec_driver_sql(insert, ({"col3":"fujian", "col4":"fuzhou"}))
+ connection.exec_driver_sql(insert, [{"col3":"jiangsu", "col4":"nanjing"}, {"col3":"zhejiang", "col4":"hangzhou"}])
except Exception as e:
### Execute SQL Batch Query
-Use the `connection.execute (sql)` interface to execute SQL batch query statements:
+Use the `connection.exec_driver_sql (sql)` interface to execute SQL batch query statements:
- rs = connection.execute("SELECT * FROM t1")
+ rs = connection.exec_driver_sql("SELECT * FROM t1")
for row in rs:
- rs = connection.execute("SELECT * FROM t1 WHERE col3 = ?;", ('hefei'))
- rs = connection.execute("SELECT * FROM t1 WHERE col3 = ?;",[('hefei'), ('shanghai')])
+ rs = connection.exec_driver_sql("SELECT * FROM t1 WHERE col3 = ?;", tuple(['hefei']))
except Exception as e:
### Execute SQL Query
-Use the `connection.execute (sql, request)` interface to execute the SQL request query. You can put the input data into the second parameter of the execute function:
+Use the `connection.exec_driver_sql (sql, request)` interface to execute the SQL request query. You can put the input data into the second parameter of the execute function:
- rs = connection.execute("SELECT * FROM t1", ({"col1":9999, "col2":'2020-12-27', "col3":'zhejiang', "col4":'hangzhou', "col5":100}))
+ rs = connection.exec_driver_sql("SELECT * FROM t1", ({"col1":9999, "col2":'2020-12-27', "col3":'zhejiang', "col4":'hangzhou', "col5":100}))
except Exception as e:
### Delete Table
-Use the `connection.execute (ddl)` interface to delete table `t1`:
+Use the `connection.exec_driver_sql (ddl)` interface to delete table `t1`:
- connection.execute("DROP TABLE t1")
+ connection.exec_driver_sql("DROP TABLE t1")
except Exception as e:
### Delete Database
-Use the connection.execute(ddl)interface to delete database `db1`:
+Use the connection.exec_driver_sql(ddl)interface to delete database `db1`:
- connection.execute("DROP DATABASE db1")
+ connection.exec_driver_sql("DROP DATABASE db1")
except Exception as e:
+### SQLAlchemy Version Differences
+Differences in Native SQL Usage: In SQLAlchemy 1.4, the method `connection.execute()` is used, while in SQLAlchemy 2.0, the method `connection.exec_driver_sql()` is used. The general differences between these two methods are as follows, for more details, refer to the official documentation.
+# DDL Example1 - [SQLAlchemy 1.4]
+connection.execute("CREATE TABLE t1 (col1 bigint, col2 date)")
+# DDL Example1 - [SQLAlchemy 2.0]
+connection.exec_driver_sql("CREATE TABLE t1 (col1 bigint, col2 date)")
+# Insert Example1 - [SQLAlchemy 1.4]
+connection.execute("INSERT INTO t1 VALUES(1000, '2020-12-25');")
+connection.execute("INSERT INTO t1 VALUES(?, ?);", ({"col1":1001, "col2":"2020-12-26"}))
+connection.execute("INSERT INTO t1 VALUES(?, ?);", [{"col1":1002, "col2":"2020-12-27"}])
+# Insert Example1 - [SQLAlchemy 2.0]
+connection.exec_driver_sql("INSERT INTO t1 VALUES(1000, '2020-12-25');")
+connection.exec_driver_sql("INSERT INTO t1 VALUES(?, ?);", ({"col1":1001, "col2":"2020-12-26"}))
+connection.exec_driver_sql("INSERT INTO t1 VALUES(?, ?);", [{"col1":1002, "col2":"2020-12-27"}])
+# Query Example1 - [SQLAlchemy 1.4] - Native SQL Query
+connection.execute("select * from t1 where col3 = ?;", 'hefei')
+connection.execute("select * from t1 where col3 = ?;", ['hefei'])
+connection.execute("select * from t1 where col3 = ?;", [('hefei')])
+# Query Example1 - [SQLAlchemy 2.0] - Native SQL Query
+connection.exec_driver_sql("select * from t1 where col3 = ?;", tuple(['hefei']))
+# Query Example2 - [SQLAlchemy 1.4] - ORM Query
+# Query Example2 - [SQLAlchemy 2.0] - ORM Query
+# Query Example3 - [SQLAlchemy 1.4] - SQL Request Query
+connection.execute("SELECT * FROM t1", ({"col1":9999, "col2":'2020-12-28'}))
+# Query Example3 - [SQLAlchemy 2.0] - SQL Request Query
+connection.exec_driver_sql("SELECT * FROM t1", ({"col1":9999, "col2":'2020-12-28'}))
## Notebook Magic Function
The OpenMLDB Python SDK supports the expansion of Notebook magic function. Use the following statement to register the function.
diff --git a/docs/zh/deploy/conf.md b/docs/zh/deploy/conf.md
index 7c4bae64335..500455a64a4 100644
--- a/docs/zh/deploy/conf.md
+++ b/docs/zh/deploy/conf.md
@@ -273,6 +273,7 @@ batchjob.jar.path=
@@ -280,6 +281,7 @@ hadoop.conf.dir=
Spark Config中重点关注的配置如下:
@@ -307,7 +309,21 @@ TaskManager只接受`local`及其变种、`yarn`、`yarn-cluster`、`yarn-client
- 离线表的存储地址`offline.data.prefix`,默认为`file:///tmp/openmldb_offline_storage/`,即TaskManager所在主机的`/tmp`目录,你可以修改该配置为其他目录。
- - 可以配置为HDFS路径,需要在**启动TaskManager前**配置环境变量`HADOOP_CONF_DIR`为Hadoop配置文件所在目录(注意是环境变量,不是TaskManager的配置项),文件目录中应包含Hadoop的`core-site.xml`、`hdfs-site.xml`等配置文件,更多见[Spark官方文档](https://spark.apache.org/docs/3.2.1/configuration.html#inheriting-hadoop-cluster-configuration)。
+ - 可以配置为HDFS路径,如果配置为HDFS路径,需要正确配置变量 `hadoop.conf.dir` 和 `hadoop.user.name`,其中 `hadoop.conf.dir` 表示Hadoop配置文件所在目录(注意该目录是TaskManager节点目录;文件目录中应包含Hadoop的`core-site.xml`、`hdfs-site.xml`等配置文件,更多见[Spark官方文档](https://spark.apache.org/docs/3.2.1/configuration.html#inheriting-hadoop-cluster-configuration)),`hadoop.user.name` 表示hadoop运行用户,可以通过以下三种方式之一配置这两个变量:
+ 1. 在 `conf/taskmanager.properties` 配置文件中配置变量 `hadoop.conf.dir`, `hadoop.user.name`
+ 2. 在(TaskManager节点)**启动TaskManager前**配置环境变量 `HADOOP_CONF_DIR`, `HADOOP_USER_NAME`
+ 3. 拷贝Hadoop配置文件(`core-site.xml`、`hdfs-site.xml`等)到 `{spark.home}/conf` 目录中
+ > sbin部署不能传递非指定的变量,目前TaskManager只会传递环境变量 `SPARK_HOME` 和 `RUNNER_JAVA_HOME`。所以如果是sbin部署,尽量使用第一种方法。
+ >
+ > 如果使用第二种方式,配置的环境变量 `HADOOP_CONF_DIR`, `HADOOP_USER_NAME` 最好是永久生效的,如果不希望环境变量 `HADOOP_CONF_DIR`, `HADOOP_USER_NAME` 永久生效,可以在一个session里,先临时配置环境变量 `HADOOP_CONF_DIR`, `HADOOP_USER_NAME` ,然后启动TaskManager,例如
+ > ```bash
+ > cd
+ > export HADOOP_CONF_DIR=<这里替换为Hadoop配置目录>
+ > export HADOOP_USER_NAME=<这里替换为Hadoop用户名>
+ > bash bin/start.sh start taskmanager
+ > ```
+ >
+ > 环境变量生效范围参考 理解配置项与环境变量的关系
HDFS路径目前需要配置`namenode.uri`,删除离线表时会连接HDFS FileSystem`namenode.uri`,并删除离线表的存储目录(Offline Table Path)。未来将废弃此配置项。
@@ -319,9 +335,22 @@ local模式即Spark任务运行在本地(TaskManager所在主机),该模
##### yarn/yarn-cluster模式
-- 在**启动TaskManager前**配置环境变量`HADOOP_CONF_DIR`为Hadoop和Yarn的配置文件所在目录,文件目录中应包含Hadoop的`core-site.xml`、`hdfs-site.xml`、Yarn的`yarn-site.xml`等配置文件,参考[Spark官方文档](https://spark.apache.org/docs/3.2.1/running-on-yarn.html#launching-spark-on-yarn)。
+- 正确配置变量 `hadoop.conf.dir` 和 `hadoop.user.name`,其中 `hadoop.conf.dir` 表示Hadoop和Yarn配置文件所在目录(注意该目录是TaskManager节点目录;文件目录中应包含Hadoop的`core-site.xml`、`hdfs-site.xml`, `yarn-site.xml`等配置文件,参考[Spark官方文档](https://spark.apache.org/docs/3.2.1/running-on-yarn.html#launching-spark-on-yarn)),`hadoop.user.name` 表示hadoop运行用户,可以通过以下三种方式之一配置这两个变量:
+ 1. 在 `conf/taskmanager.properties` 配置文件中配置变量 `hadoop.conf.dir`, `hadoop.user.name`
+ 2. 在(TaskManager节点)**启动TaskManager前**配置环境变量 `HADOOP_CONF_DIR`, `HADOOP_USER_NAME`
+ 3. 拷贝Hadoop和Yarn配置文件(`core-site.xml`、`hdfs-site.xml`等)到 `{spark.home}/conf` 目录中
+ > sbin部署不能传递非指定的变量,目前TaskManager只会传递环境变量 `SPARK_HOME` 和 `RUNNER_JAVA_HOME`。所以如果是sbin部署,尽量使用第一种方法。
+ >
+ > 如果使用第二种方式,配置的环境变量 `HADOOP_CONF_DIR`, `HADOOP_USER_NAME` 最好是永久生效的,如果不希望环境变量 `HADOOP_CONF_DIR`, `HADOOP_USER_NAME` 永久生效,可以在一个session里,先临时配置环境变量 `HADOOP_CONF_DIR`, `HADOOP_USER_NAME` ,然后启动TaskManager,例如
+ > ```bash
+ > cd
+ > export HADOOP_CONF_DIR=<这里替换为Hadoop配置目录>
+ > export HADOOP_USER_NAME=<这里替换为Hadoop用户名>
+ > bash bin/start.sh start taskmanager
+ > ```
+ >
+ > 环境变量生效范围参考 理解配置项与环境变量的关系
- `spark.yarn.jars`配置Yarn需要读取的Spark运行jar包地址,必须是`hdfs://`地址。可以上传[OpenMLDB Spark 发行版](../../tutorial/openmldbspark_distribution.md)解压后的`jars`目录到HDFS上,并配置为`hdfs:///jars/*`(注意通配符)。[如果不配置该参数,Yarn会将`$SPARK_HOME/jars`打包上传分发,并且每次离线任务都要分发](https://spark.apache.org/docs/3.2.1/running-on-yarn.html#preparations),效率较低,所以推荐配置。
- `batchjob.jar.path`必须是HDFS路径(具体到包名),上传batchjob jar包到HDFS上,并配置为对应地址,保证Yarn集群上所有Worker可以获得batchjob包。
- `offline.data.prefix`必须是HDFS路径,保证Yarn集群上所有Worker可读写数据。应使用前面配置的环境变量`HADOOP_CONF_DIR`中的Hadoop集群地址。
diff --git a/docs/zh/deploy/install_deploy.md b/docs/zh/deploy/install_deploy.md
index a7a6200651a..bcc30919eae 100644
--- a/docs/zh/deploy/install_deploy.md
+++ b/docs/zh/deploy/install_deploy.md
@@ -21,6 +21,9 @@ strings /lib64/libc.so.6 | grep ^GLIBC_
如果需要部署 ZooKeeper 和 TaskManager,则需要有 Java 运行环境。
+> 两个Server需要`Java 1.8`及以上版本。
+> Zookeeper Client 3.4.14 需要 `Java 1.7` - `Java 13` 版本。Java SDK也使用这个Client,所以同样要求不能使用较高版本Java。如果希望使用zkCli,推荐使用 `Java 1.8` 或 `Java 11` 版本。
### 硬件
* CPU:
diff --git a/docs/zh/quickstart/sdk/python_sdk.md b/docs/zh/quickstart/sdk/python_sdk.md
index da70edd9daa..64cdcb22ddf 100644
--- a/docs/zh/quickstart/sdk/python_sdk.md
+++ b/docs/zh/quickstart/sdk/python_sdk.md
@@ -117,6 +117,8 @@ cursor.close()
本节演示通过 OpenMLDB SQLAlchemy 使用 Python SDK。同样的,所有dbapi接口如果执行失败,会抛出异常`DatabaseError`,用户可自行捕获异常并处理。返回值处理参考SQLAlchemy标准。
+集成的SQLAlchemy默认版本为2.0,同时兼容旧版本1.4。若用户的SQLAlchemy版本为1.4,可以根据[版本差异](python_sdk.md#sqlalchemy-版本差异)调整接口名称。OpenMLDB SDK在0.8.5版本及之前仅支持1.4版本,从0.8.5版本之后(不包括0.8.5)才开始支持2.0版本。
### 创建连接
@@ -135,98 +137,135 @@ connection = engine.connect()
### 创建数据库
-使用 `connection.execute()` 接口创建数据库 `db1`:
+使用 `connection.exec_driver_sql()` 接口创建数据库 `db1`:
- connection.execute("CREATE DATABASE db1")
+ connection.exec_driver_sql("CREATE DATABASE db1")
except Exception as e:
-connection.execute("USE db1")
+connection.exec_driver_sql("USE db1")
### 创建表
-使用 `connection.execute()` 接口创建表 `t1`:
+使用 `connection.exec_driver_sql()` 接口创建表 `t1`:
- connection.execute("CREATE TABLE t1 ( col1 bigint, col2 date, col3 string, col4 string, col5 int, index(key=col3, ts=col1))")
+ connection.exec_driver_sql("CREATE TABLE t1 ( col1 bigint, col2 date, col3 string, col4 string, col5 int, index(key=col3, ts=col1))")
except Exception as e:
### 插入数据到表中
-使用 `connection.execute(ddl)` 接口执行 SQL 的插入语句,可以向表中插入数据:
+使用 `connection.exec_driver_sql(ddl)` 接口执行 SQL 的插入语句,可以向表中插入数据:
- connection.execute("INSERT INTO t1 VALUES(1000, '2020-12-25', 'guangdon', 'shenzhen', 1);")
+ connection.exec_driver_sql("INSERT INTO t1 VALUES(1000, '2020-12-25', 'guangdon', 'shenzhen', 1);")
except Exception as e:
-使用 `connection.execute(ddl, data)` 接口执行带 planceholder 的 SQL 的插入语句,可以动态指定插入数据,也可插入多行:
+使用 `connection.exec_driver_sql(ddl, data)` 接口执行带 planceholder 的 SQL 的插入语句,可以动态指定插入数据,也可插入多行:
insert = "INSERT INTO t1 VALUES(1002, '2020-12-27', ?, ?, 3);"
- connection.execute(insert, ({"col3":"fujian", "col4":"fuzhou"}))
- connection.execute(insert, [{"col3":"jiangsu", "col4":"nanjing"}, {"col3":"zhejiang", "col4":"hangzhou"}])
+ connection.exec_driver_sql(insert, ({"col3":"fujian", "col4":"fuzhou"}))
+ connection.exec_driver_sql(insert, [{"col3":"jiangsu", "col4":"nanjing"}, {"col3":"zhejiang", "col4":"hangzhou"}])
except Exception as e:
### 执行 SQL 批式查询
-使用 `connection.execute(sql)` 接口执行 SQL 批式查询语句:
+使用 `connection.exec_driver_sql(sql)` 接口执行 SQL 批式查询语句:
- rs = connection.execute("SELECT * FROM t1")
+ rs = connection.exec_driver_sql("SELECT * FROM t1")
for row in rs:
- rs = connection.execute("SELECT * FROM t1 WHERE col3 = ?;", ('hefei'))
- rs = connection.execute("SELECT * FROM t1 WHERE col3 = ?;", [('hefei'), ('shanghai')])
+ rs = connection.exec_driver_sql("SELECT * FROM t1 WHERE col3 = ?;", tuple(['hefei']))
except Exception as e:
### 执行 SQL 请求式查询
-使用 `connection.execute(sql, request)` 接口执行 SQL 请求式查询,可以把输入数据放到 execute 函数的第二个参数中:
+使用 `connection.exec_driver_sql(sql, request)` 接口执行 SQL 请求式查询,可以把输入数据放到 execute 函数的第二个参数中:
- rs = connection.execute("SELECT * FROM t1", ({"col1":9999, "col2":'2020-12-27', "col3":'zhejiang', "col4":'hangzhou', "col5":100}))
+ rs = connection.exec_driver_sql("SELECT * FROM t1", ({"col1":9999, "col2":'2020-12-27', "col3":'zhejiang', "col4":'hangzhou', "col5":100}))
except Exception as e:
### 删除表
-使用 `connection.execute(ddl)` 接口删除表 `t1`:
+使用 `connection.exec_driver_sql(ddl)` 接口删除表 `t1`:
- connection.execute("DROP TABLE t1")
+ connection.exec_driver_sql("DROP TABLE t1")
except Exception as e:
### 删除数据库
-使用 `connection.execute(ddl)` 接口删除数据库 `db1`:
+使用 `connection.exec_driver_sql(ddl)` 接口删除数据库 `db1`:
- connection.execute("DROP DATABASE db1")
+ connection.exec_driver_sql("DROP DATABASE db1")
except Exception as e:
+### SQLAlchemy 版本差异
+原生SQL使用差异,SQLAlchemy 1.4 版本使用`connection.execute()`方法,SQLAlchemy 2.0 版本使用`connection.exec_driver_sql()`方法,两个方法的常规差异如下,详细可参考官方文档。
+# DDL案例1-[SQLAlchemy 1.4]
+connection.execute("CREATE TABLE t1 (col1 bigint, col2 date)")
+# DDL案例1-[SQLAlchemy 2.0]
+connection.exec_driver_sql("CREATE TABLE t1 (col1 bigint, col2 date)")
+# 插入案例1-[SQLAlchemy 1.4]
+connection.execute("INSERT INTO t1 VALUES(1000, '2020-12-25');")
+connection.execute("INSERT INTO t1 VALUES(?, ?);", ({"col1":1001, "col2":"2020-12-26"}))
+connection.execute("INSERT INTO t1 VALUES(?, ?);", [{"col1":1002, "col2":"2020-12-27"}])
+# 插入案例1-[SQLAlchemy 2.0]
+connection.exec_driver_sql("INSERT INTO t1 VALUES(1000, '2020-12-25');")
+connection.exec_driver_sql("INSERT INTO t1 VALUES(?, ?);", ({"col1":1001, "col2":"2020-12-26"}))
+connection.exec_driver_sql("INSERT INTO t1 VALUES(?, ?);", [{"col1":1002, "col2":"2020-12-27"}])
+# 查询案例1-[SQLAlchemy 1.4]-原生SQL查询
+connection.execute("select * from t1 where col3 = ?;", 'hefei')
+connection.execute("select * from t1 where col3 = ?;", ['hefei'])
+connection.execute("select * from t1 where col3 = ?;", [('hefei')])
+# 查询案例1-[SQLAlchemy 2.0]-原生SQL查询
+connection.exec_driver_sql("select * from t1 where col3 = ?;", tuple(['hefei']))
+# 查询案例2-[SQLAlchemy 1.4]-ORM查询
+# 查询案例2-[SQLAlchemy 2.0]-ORM查询
+# 查询案例3-[SQLAlchemy 1.4]-请求式查询
+connection.execute("SELECT * FROM t1", ({"col1":9999, "col2":'2020-12-28'}))
+# 查询案例3-[SQLAlchemy 2.0]-请求式查询
+connection.exec_driver_sql("SELECT * FROM t1", ({"col1":9999, "col2":'2020-12-28'}))
## 使用 Notebook Magic Function
OpenMLDB Python SDK 支持了 Notebook magic function 拓展,使用以下语句注册函数。
diff --git a/python/openmldb_sdk/setup.py b/python/openmldb_sdk/setup.py
index 5403de0c367..fa92ff71911 100644
--- a/python/openmldb_sdk/setup.py
+++ b/python/openmldb_sdk/setup.py
@@ -29,7 +29,7 @@
"importlib-metadata < 5.0",
- "sqlalchemy <= 1.4.50",
+ "sqlalchemy <= 2.0.27",
"IPython <= 7.30.1",
"prettytable <= 3.1.0",
diff --git a/python/openmldb_sdk/tests/openmldb_client_test.py b/python/openmldb_sdk/tests/openmldb_client_test.py
index 54dc8963d55..e4fef405acb 100644
--- a/python/openmldb_sdk/tests/openmldb_client_test.py
+++ b/python/openmldb_sdk/tests/openmldb_client_test.py
@@ -43,9 +43,8 @@ class TestOpenMLDBClient:
def setup_class(cls):
cls.engine = db.create_engine('openmldb:///?zk={}&zkPath={}'.format(OpenMLDB_ZK_CLUSTER, OpenMLDB_ZK_PATH))
cls.connection = cls.engine.connect()
- cls.connection.execute("create database if not exists {};".format(
- cls.db))
- cls.connection.execute(f"use {cls.db}")
+ cls.connection.exec_driver_sql("create database if not exists {};".format(cls.db))
+ cls.connection.exec_driver_sql(f"use {cls.db}")
def has_table(connection, table_name):
@@ -53,12 +52,10 @@ def has_table(connection, table_name):
def recreate_table(self, table, schema):
if self.has_table(self.connection, table):
- self.connection.execute("drop table {}".format(table))
+ self.connection.exec_driver_sql("drop table {}".format(table))
assert not self.has_table(self.connection, table)
# key is col3, partitionnum==1
- self.connection.execute(
- "create table {}({}) OPTIONS(partitionnum=1);".format(
- table, schema))
+ self.connection.exec_driver_sql("create table {}({}) OPTIONS(partitionnum=1);".format(table, schema))
assert self.has_table(self.connection, table)
@@ -120,21 +117,20 @@ def test_basic(self):
# 1004, ?, 'hubei', 'wuhan', 5 - 2020-11-29
insert4 = "insert into {} values({});".format(
table, self.stringify_join(test_rows[4], [1]))
- self.connection.execute(insert0)
- self.connection.execute(insert1, ({
+ self.connection.exec_driver_sql(insert0)
+ self.connection.exec_driver_sql(insert1, ({
"col4": test_rows[1][3],
"col5": test_rows[1][4]
- self.connection.execute(insert2, ({
+ self.connection.exec_driver_sql(insert2, ({
"col3": test_rows[2][2],
"col4": test_rows[2][3]
- self.connection.execute(
- insert3, self.convert_to_dicts(column_names, [test_rows[3]]))
- self.connection.execute(insert4, [{"col2": test_rows[4][1]}])
+ self.connection.exec_driver_sql(insert3, self.convert_to_dicts(column_names, [test_rows[3]]))
+ self.connection.exec_driver_sql(insert4, [{"col2": test_rows[4][1]}])
# order by is not supported now
- result = self.connection.execute("select * from {};".format(table))
+ result = self.connection.exec_driver_sql("select * from {};".format(table))
# fetch many times
result_list = result.fetchmany()
assert len(result_list) == 1
@@ -148,52 +144,45 @@ def test_basic(self):
# insert invalid type
with pytest.raises(TypeError):
- self.connection.execute(insert1, (2, 2))
+ self.connection.exec_driver_sql(insert1, (2, 2))
# insert many
new_rows = [(1005, "2020-12-29", "shandong", 'jinan', 6),
(1006, "2020-12-30", "fujian", 'fuzhou', 7)]
# insert3 is all ?
- self.connection.execute(insert3,
- self.convert_to_dicts(column_names, new_rows))
+ self.connection.exec_driver_sql(insert3, self.convert_to_dicts(column_names, new_rows))
test_rows += new_rows
# test fetch all
- rs = self.connection.execute("select * from {};".format(table))
+ rs = self.connection.exec_driver_sql("select * from {};".format(table))
result = sorted(rs.fetchall(), key=lambda x: x[0])
assert result == test_rows
# test convert to list
- rs = self.connection.execute("select * from {};".format(table))
+ rs = self.connection.exec_driver_sql("select * from {};".format(table))
rs = sorted(list(rs), key=lambda x: x[0])
assert rs == test_rows
# test condition select
- rs = self.connection.execute(
- "select * from {} where col3 = 'hefei';".format(table))
+ rs = self.connection.exec_driver_sql("select * from {} where col3 = 'hefei';".format(table))
# hefei row idx == 1
assert list(rs) == [test_rows[1]]
# test request mode, select sql with dict parameters(one dict or list of dict)
request_row = (9999, "2020-12-27", "zhejiang", "hangzhou", 100)
- rs = self.connection.execute(
- "select * from {};".format(table),
- self.convert_to_dict(column_names, request_row))
+ rs = self.connection.exec_driver_sql("select * from {};".format(table),
+ self.convert_to_dict(column_names, request_row))
assert list(rs) == [request_row]
- rs = self.connection.execute(
- "select * from {};".format(table),
- self.convert_to_dicts(column_names, [request_row]))
+ rs = self.connection.exec_driver_sql("select * from {};".format(table),
+ self.convert_to_dicts(column_names, [request_row]))
assert list(rs) == [request_row]
# test parameterized query in batch mode, select sql with tuple or tuple list
- rs = self.connection.execute(
- "select * from {} where col3 = ?;".format(table), 'hefei')
- assert list(rs) == [test_rows[1]]
- rs = self.connection.execute(
- "select * from {} where col3 = ?;".format(table), ['hefei'])
- assert list(rs) == [test_rows[1]]
- rs = self.connection.execute(
- "select * from {} where col3 = ?;".format(table), [('hefei')])
+ rs = self.connection.exec_driver_sql("select * from {} where col3 = ?;".format(table), tuple(['hefei']))
assert list(rs) == [test_rows[1]]
+ # rs = self.connection.exec_driver_sql("select * from {} where col3 = ?;".format(table), ['hefei'])
+ # assert list(rs) == [test_rows[1]]
+ # rs = self.connection.exec_driver_sql("select * from {} where col3 = ?;".format(table), [('hefei')])
+ # assert list(rs) == [test_rows[1]]
def test_procedure(self):
# TODO(hw): creating procedure is not recommended, test deploy
@@ -204,13 +193,13 @@ def test_procedure(self):
# try to delete sp before recreate, cause table may have associated deployment 'sp'
- self.connection.execute("drop procedure sp;")
+ self.connection.exec_driver_sql("drop procedure sp;")
except DatabaseError as e:
assert "not found" in str(e)
self.recreate_table(table, schema_str)
- self.connection.execute(
+ self.connection.exec_driver_sql(
"create procedure sp (col1 bigint, col2 date, col3 string, col4 string, col5 int) "
"begin select * from {}; end;".format(table))
@@ -221,7 +210,7 @@ def test_procedure(self):
self.convert_to_dict(column_names, test_rows[0]))
assert list(rs.fetchall()) == test_rows
- self.connection.execute("drop procedure sp;")
+ self.connection.exec_driver_sql("drop procedure sp;")
# test batch request mode
mouse2 = self.engine.raw_connection().cursor()
@@ -248,43 +237,40 @@ def test_more_parameterized_query(self):
'city' + str(i), i, (1590738990 + i) * 1000)
for i in range(1, 10)]
- self.connection.execute(
+ self.connection.exec_driver_sql(
"insert into {} values (?, ?, ?, ?, ?, ?);".format(table),
self.convert_to_dicts(schema, test_rows))
- rs = self.connection.execute(
- "select * from {} where col3 = ?;".format(table), 'province1')
+ rs = self.connection.exec_driver_sql(
+ "select * from {} where col3 = ?;".format(table), tuple(['province1']))
rs = sorted(list(rs), key=lambda x: x[0])
assert rs == test_rows[0::4]
# test parameterized query in batch mode case 2
- rs = self.connection.execute(
- "select * from {} where col3 = ?;".format(table), 'province2')
+ rs = self.connection.exec_driver_sql(
+ "select * from {} where col3 = ?;".format(table), tuple(['province2']))
rs = sorted(list(rs), key=lambda x: x[0])
assert rs == test_rows[1::4]
# test parameterized query in batch mode case 3
- rs = self.connection.execute(
- "select * from {} where col3 = ?;".format(table), 'province3')
+ rs = self.connection.exec_driver_sql(
+ "select * from {} where col3 = ?;".format(table), tuple(['province3']))
rs = sorted(list(rs), key=lambda x: x[0])
assert rs == test_rows[2::4]
# test parameterized query in batch mode case 3 and col1 < 1004, only one row[2]
- rs = self.connection.execute(
- "select * from {} where col3 = ? and col1 < ?;".format(table),
- ('province3', 1004))
+ rs = self.connection.exec_driver_sql("select * from {} where col3 = ? and col1 < ?;".format(table),
+ ('province3', 1004))
assert list(rs) == [test_rows[2]]
# test parameterized query in batch mode case 4
- rs = self.connection.execute(
- "select * from {} where col3 = ? and col1 < ? and col2 < ?;".format(
- table), ('province3', 2000, date.fromisoformat('2022-05-04')))
+ rs = self.connection.exec_driver_sql("select * from {} where col3 = ? and col1 < ? and col2 < ?;".format(table),
+ ('province3', 2000, date.fromisoformat('2022-05-04')))
assert list(rs) == [test_rows[2]]
# test parameterized query in batch mode case 5
- rs = self.connection.execute(
- "select * from {} where col3 = ? and col6 < ?;".format(table),
- ('province3', datetime.fromtimestamp(1590738997.000)))
+ rs = self.connection.exec_driver_sql("select * from {} where col3 = ? and col6 < ?;".format(table),
+ ('province3', datetime.fromtimestamp(1590738997.000)))
assert list(rs) == [test_rows[2]]
diff --git a/python/openmldb_sdk/tests/sqlalchemy_api_test.py b/python/openmldb_sdk/tests/sqlalchemy_api_test.py
index b5c4dbd7b50..c970c2aba3b 100644
--- a/python/openmldb_sdk/tests/sqlalchemy_api_test.py
+++ b/python/openmldb_sdk/tests/sqlalchemy_api_test.py
@@ -38,8 +38,8 @@ def setup_class(self):
self.connection = self.engine.connect()
- self.connection.execute('create database if not exists db_test')
- self.connection.execute('use db_test')
+ self.connection.exec_driver_sql('create database if not exists db_test')
+ self.connection.exec_driver_sql('use db_test')
self.metadata = MetaData()
self.test_table = Table('test_table', self.metadata,
Column('x', String), Column('y', Integer))
@@ -53,13 +53,13 @@ def test_insert(self):
def test_select(self):
- for row in self.connection.execute(select([self.test_table])):
+ for row in self.connection.execute(select(self.test_table)):
assert 'first' in list(row)
assert 100 in list(row)
@pytest.mark.skip(reason="test may fail to init")
def test_request_timeout(self):
- self.connection.execute(
+ self.connection.exec_driver_sql(
"insert into test_table (y, x) values(400, 'a'),(401,'b'),(402, 'c');"
@@ -69,7 +69,7 @@ def test_request_timeout(self):
connection = engine.connect()
with pytest.raises(DatabaseError) as e:
- connection.execute(
+ connection.exec_driver_sql(
"select * from test_table where x='b'").fetchall()
assert 'select fail' in str(e.value)
@@ -79,17 +79,17 @@ def test_zk_log(self):
connection = engine.connect()
- connection.execute("select 1;")
+ connection.exec_driver_sql("select 1;")
# redirect to /tmp/test_openmldb_zk.log, may core dump when client close
# engine = db.create_engine(
# 'openmldb:///db_test?zk={}&zkPath={}&zkLogFile=/tmp/test_openmldb_zk.log'.format(
# connection = engine.connect()
- # connection.execute("select 1;")
+ # connection.exec_driver_sql("select 1;")
def teardown_class(self):
- self.connection.execute("drop table test_table;")
+ self.connection.exec_driver_sql("drop table test_table;")
diff --git a/python/openmldb_tool/diagnostic_tool/connector.py b/python/openmldb_tool/diagnostic_tool/connector.py
index 5c69ad9a1d4..1466da15a71 100644
--- a/python/openmldb_tool/diagnostic_tool/connector.py
+++ b/python/openmldb_tool/diagnostic_tool/connector.py
@@ -54,10 +54,10 @@ def get_conn(self):
def execute(self, sql):
"""ddl won't return resultset, can not fetchall"""
- return self.conn.execute(sql)
+ return self.conn.exec_driver_sql(sql)
def execfetch(self, sql, show=False):
- cr = self.conn.execute(sql)
+ cr = self.conn.exec_driver_sql(sql)
res = cr.fetchall()
if show:
t = PrettyTable(cr.keys())