From d966a8894e62e44ce79feba582acf69b7323d168 Mon Sep 17 00:00:00 2001 From: yangwucheng Date: Wed, 3 Apr 2024 10:58:01 +0800 Subject: [PATCH 1/3] docs: hadoop confs in taskmanager (#3761) * fix: sbin deploy can't set HADOOP_CONF_DIR for taskmanager * docs: hadoop confs in taskmanager --------- Co-authored-by: yangwucheng --- docs/zh/deploy/conf.md | 35 ++++++++++++++++++++++++++++++++--- 1 file changed, 32 insertions(+), 3 deletions(-) diff --git a/docs/zh/deploy/conf.md b/docs/zh/deploy/conf.md index 7c4bae64335..500455a64a4 100644 --- a/docs/zh/deploy/conf.md +++ b/docs/zh/deploy/conf.md @@ -273,6 +273,7 @@ batchjob.jar.path= namenode.uri= offline.data.prefix=file:///tmp/openmldb_offline_storage/ hadoop.conf.dir= +hadoop.user.name= #enable.hive.support=false ``` @@ -280,6 +281,7 @@ hadoop.conf.dir= Spark Config中重点关注的配置如下: + ```{note} 理解配置项与环境变量的关系。 @@ -307,7 +309,21 @@ TaskManager只接受`local`及其变种、`yarn`、`yarn-cluster`、`yarn-client local模式即Spark任务运行在本地(TaskManager所在主机),该模式下不需要太多配置,只需要注意两点: - 离线表的存储地址`offline.data.prefix`,默认为`file:///tmp/openmldb_offline_storage/`,即TaskManager所在主机的`/tmp`目录,你可以修改该配置为其他目录。 - - 可以配置为HDFS路径,需要在**启动TaskManager前**配置环境变量`HADOOP_CONF_DIR`为Hadoop配置文件所在目录(注意是环境变量,不是TaskManager的配置项),文件目录中应包含Hadoop的`core-site.xml`、`hdfs-site.xml`等配置文件,更多见[Spark官方文档](https://spark.apache.org/docs/3.2.1/configuration.html#inheriting-hadoop-cluster-configuration)。 + - 可以配置为HDFS路径,如果配置为HDFS路径,需要正确配置变量 `hadoop.conf.dir` 和 `hadoop.user.name`,其中 `hadoop.conf.dir` 表示Hadoop配置文件所在目录(注意该目录是TaskManager节点目录;文件目录中应包含Hadoop的`core-site.xml`、`hdfs-site.xml`等配置文件,更多见[Spark官方文档](https://spark.apache.org/docs/3.2.1/configuration.html#inheriting-hadoop-cluster-configuration)),`hadoop.user.name` 表示hadoop运行用户,可以通过以下三种方式之一配置这两个变量: + 1. 在 `conf/taskmanager.properties` 配置文件中配置变量 `hadoop.conf.dir`, `hadoop.user.name` + 2. 在(TaskManager节点)**启动TaskManager前**配置环境变量 `HADOOP_CONF_DIR`, `HADOOP_USER_NAME` + 3. 拷贝Hadoop配置文件(`core-site.xml`、`hdfs-site.xml`等)到 `{spark.home}/conf` 目录中 + > sbin部署不能传递非指定的变量,目前TaskManager只会传递环境变量 `SPARK_HOME` 和 `RUNNER_JAVA_HOME`。所以如果是sbin部署,尽量使用第一种方法。 + > + > 如果使用第二种方式,配置的环境变量 `HADOOP_CONF_DIR`, `HADOOP_USER_NAME` 最好是永久生效的,如果不希望环境变量 `HADOOP_CONF_DIR`, `HADOOP_USER_NAME` 永久生效,可以在一个session里,先临时配置环境变量 `HADOOP_CONF_DIR`, `HADOOP_USER_NAME` ,然后启动TaskManager,例如 + > ```bash + > cd + > export HADOOP_CONF_DIR=<这里替换为Hadoop配置目录> + > export HADOOP_USER_NAME=<这里替换为Hadoop用户名> + > bash bin/start.sh start taskmanager + > ``` + > + > 环境变量生效范围参考 理解配置项与环境变量的关系 ```{note} HDFS路径目前需要配置`namenode.uri`,删除离线表时会连接HDFS FileSystem`namenode.uri`,并删除离线表的存储目录(Offline Table Path)。未来将废弃此配置项。 ``` @@ -319,9 +335,22 @@ local模式即Spark任务运行在本地(TaskManager所在主机),该模 ##### yarn/yarn-cluster模式 - "yarn"和"yarn-cluster"是同一个模式,即Spark任务运行在Yarn集群上,该模式下需要配置的参数较多,主要包括: -- 在**启动TaskManager前**配置环境变量`HADOOP_CONF_DIR`为Hadoop和Yarn的配置文件所在目录,文件目录中应包含Hadoop的`core-site.xml`、`hdfs-site.xml`、Yarn的`yarn-site.xml`等配置文件,参考[Spark官方文档](https://spark.apache.org/docs/3.2.1/running-on-yarn.html#launching-spark-on-yarn)。 +- 正确配置变量 `hadoop.conf.dir` 和 `hadoop.user.name`,其中 `hadoop.conf.dir` 表示Hadoop和Yarn配置文件所在目录(注意该目录是TaskManager节点目录;文件目录中应包含Hadoop的`core-site.xml`、`hdfs-site.xml`, `yarn-site.xml`等配置文件,参考[Spark官方文档](https://spark.apache.org/docs/3.2.1/running-on-yarn.html#launching-spark-on-yarn)),`hadoop.user.name` 表示hadoop运行用户,可以通过以下三种方式之一配置这两个变量: + 1. 在 `conf/taskmanager.properties` 配置文件中配置变量 `hadoop.conf.dir`, `hadoop.user.name` + 2. 在(TaskManager节点)**启动TaskManager前**配置环境变量 `HADOOP_CONF_DIR`, `HADOOP_USER_NAME` + 3. 拷贝Hadoop和Yarn配置文件(`core-site.xml`、`hdfs-site.xml`等)到 `{spark.home}/conf` 目录中 + > sbin部署不能传递非指定的变量,目前TaskManager只会传递环境变量 `SPARK_HOME` 和 `RUNNER_JAVA_HOME`。所以如果是sbin部署,尽量使用第一种方法。 + > + > 如果使用第二种方式,配置的环境变量 `HADOOP_CONF_DIR`, `HADOOP_USER_NAME` 最好是永久生效的,如果不希望环境变量 `HADOOP_CONF_DIR`, `HADOOP_USER_NAME` 永久生效,可以在一个session里,先临时配置环境变量 `HADOOP_CONF_DIR`, `HADOOP_USER_NAME` ,然后启动TaskManager,例如 + > ```bash + > cd + > export HADOOP_CONF_DIR=<这里替换为Hadoop配置目录> + > export HADOOP_USER_NAME=<这里替换为Hadoop用户名> + > bash bin/start.sh start taskmanager + > ``` + > + > 环境变量生效范围参考 理解配置项与环境变量的关系 - `spark.yarn.jars`配置Yarn需要读取的Spark运行jar包地址,必须是`hdfs://`地址。可以上传[OpenMLDB Spark 发行版](../../tutorial/openmldbspark_distribution.md)解压后的`jars`目录到HDFS上,并配置为`hdfs:///jars/*`(注意通配符)。[如果不配置该参数,Yarn会将`$SPARK_HOME/jars`打包上传分发,并且每次离线任务都要分发](https://spark.apache.org/docs/3.2.1/running-on-yarn.html#preparations),效率较低,所以推荐配置。 - `batchjob.jar.path`必须是HDFS路径(具体到包名),上传batchjob jar包到HDFS上,并配置为对应地址,保证Yarn集群上所有Worker可以获得batchjob包。 - `offline.data.prefix`必须是HDFS路径,保证Yarn集群上所有Worker可读写数据。应使用前面配置的环境变量`HADOOP_CONF_DIR`中的Hadoop集群地址。 From f54cbfa9b4f01c1e2a2691069ffb3c27875cd8c9 Mon Sep 17 00:00:00 2001 From: yangwucheng Date: Wed, 3 Apr 2024 11:43:57 +0800 Subject: [PATCH 2/3] docs: update doc install_deploy.md of zookeeper 3.4.14 java requirements (#3768) --------- Co-authored-by: yangwucheng Co-authored-by: Siqi Wang Co-authored-by: HuangWei --- docs/en/deploy/install_deploy.md | 4 ++++ docs/zh/deploy/install_deploy.md | 3 +++ 2 files changed, 7 insertions(+) diff --git a/docs/en/deploy/install_deploy.md b/docs/en/deploy/install_deploy.md index 3eec28627b8..e436579d724 100644 --- a/docs/en/deploy/install_deploy.md +++ b/docs/en/deploy/install_deploy.md @@ -22,6 +22,10 @@ Generally, ldd version should be >= 2.17, and GLIBC_2.17 should be present in li If you need to deploy ZooKeeper and TaskManager, you need a Java runtime environment. +Servers needs Java 1.8 or above. + +Zookeeper Client 3.4.14 requires `Java 1.7` - `Java 13`. Java SDK depends on it, so it should use the same Java version, don't run in higher version. If you wish to use zkCli, please use `Java 1.8` or `Java 11`. + ### Hardware Regarding hardware requirements: diff --git a/docs/zh/deploy/install_deploy.md b/docs/zh/deploy/install_deploy.md index a7a6200651a..bcc30919eae 100644 --- a/docs/zh/deploy/install_deploy.md +++ b/docs/zh/deploy/install_deploy.md @@ -21,6 +21,9 @@ strings /lib64/libc.so.6 | grep ^GLIBC_ 如果需要部署 ZooKeeper 和 TaskManager,则需要有 Java 运行环境。 +> 两个Server需要`Java 1.8`及以上版本。 +> Zookeeper Client 3.4.14 需要 `Java 1.7` - `Java 13` 版本。Java SDK也使用这个Client,所以同样要求不能使用较高版本Java。如果希望使用zkCli,推荐使用 `Java 1.8` 或 `Java 11` 版本。 + ### 硬件 * CPU: From 0cef78a9a6a576e16e6479f38c917ca4c2cd0640 Mon Sep 17 00:00:00 2001 From: yht520100 Date: Sun, 7 Apr 2024 16:03:13 +0800 Subject: [PATCH 3/3] feat: upgrade sqlalchemy to 2.0.27 and updated test cases (#3805) * feat: upgrade sqlalchemy to 2.0.27 and updated test cases * feat: updated test cases for openmldb_tool * feat: optimize python-sdk document description --------- Co-authored-by: Yuan Haitao --- docs/en/quickstart/sdk/python_sdk.md | 79 ++++++++++++---- docs/zh/quickstart/sdk/python_sdk.md | 79 ++++++++++++---- python/openmldb_sdk/setup.py | 2 +- .../tests/openmldb_client_test.py | 94 ++++++++----------- .../openmldb_sdk/tests/sqlalchemy_api_test.py | 16 ++-- .../diagnostic_tool/connector.py | 4 +- 6 files changed, 169 insertions(+), 105 deletions(-) diff --git a/docs/en/quickstart/sdk/python_sdk.md b/docs/en/quickstart/sdk/python_sdk.md index eb69f3b1a5e..491a3c81812 100644 --- a/docs/en/quickstart/sdk/python_sdk.md +++ b/docs/en/quickstart/sdk/python_sdk.md @@ -116,6 +116,8 @@ cursor.close() This section demonstrates the use of the Python SDK through OpenMLDB SQLAlchemy. Similarly, if any of the DBAPI interfaces fail, they will raise a `DatabaseError` exception. Users can catch and handle this exception as needed. The handling of return values should follow the SQLAlchemy standard. +The integrated SQLAlchemy defaults to version 2.0 while remaining compatible with the old version 1.4. If a user's SQLAlchemy version is 1.4, they can adjust the interface names according to the [version differences](python_sdk.md#sqlalchemy-version-differences). OpenMLDB SDK only supports version 1.4 in version 0.8.5 and before. Starting from version 0.8.5 (excluding 0.8.5), it begins to support version 2.0. + ### Create Connection ```python @@ -134,98 +136,135 @@ connection = engine.connect() ### Create Database -Use the `connection.execute()` interface to create database `db1`: +Use the `connection.exec_driver_sql()` interface to create database `db1`: ```python try: - connection.execute("CREATE DATABASE db1") + connection.exec_driver_sql("CREATE DATABASE db1") except Exception as e: print(e) -connection.execute("USE db1") +connection.exec_driver_sql("USE db1") ``` ### Create Table -Use the `connection.execute()` interface to create table `t1`: +Use the `connection.exec_driver_sql()` interface to create table `t1`: ```python try: - connection.execute("CREATE TABLE t1 ( col1 bigint, col2 date, col3 string, col4 string, col5 int, index(key=col3, ts=col1))") + connection.exec_driver_sql("CREATE TABLE t1 ( col1 bigint, col2 date, col3 string, col4 string, col5 int, index(key=col3, ts=col1))") except Exception as e: print(e) ``` ### Insert Data into Table -Use the `connection.execute (ddl)` interface to execute the SQL insert statement, and you can insert data into the table: +Use the `connection.exec_driver_sql (ddl)` interface to execute the SQL insert statement, and you can insert data into the table: ```python try: - connection.execute("INSERT INTO t1 VALUES(1000, '2020-12-25', 'guangdon', 'shenzhen', 1);") + connection.exec_driver_sql("INSERT INTO t1 VALUES(1000, '2020-12-25', 'guangdon', 'shenzhen', 1);") except Exception as e: print(e) ``` -Use the `connection.execute (ddl, data)` interface to execute the insert statement of SQL with placeholder. You can specify the insert data dynamically or insert multiple rows: +Use the `connection.exec_driver_sql (ddl, data)` interface to execute the insert statement of SQL with placeholder. You can specify the insert data dynamically or insert multiple rows: ```python try: insert = "INSERT INTO t1 VALUES(1002, '2020-12-27', ?, ?, 3);" - connection.execute(insert, ({"col3":"fujian", "col4":"fuzhou"})) - connection.execute(insert, [{"col3":"jiangsu", "col4":"nanjing"}, {"col3":"zhejiang", "col4":"hangzhou"}]) + connection.exec_driver_sql(insert, ({"col3":"fujian", "col4":"fuzhou"})) + connection.exec_driver_sql(insert, [{"col3":"jiangsu", "col4":"nanjing"}, {"col3":"zhejiang", "col4":"hangzhou"}]) except Exception as e: print(e) ``` ### Execute SQL Batch Query -Use the `connection.execute (sql)` interface to execute SQL batch query statements: +Use the `connection.exec_driver_sql (sql)` interface to execute SQL batch query statements: ```python try: - rs = connection.execute("SELECT * FROM t1") + rs = connection.exec_driver_sql("SELECT * FROM t1") for row in rs: print(row) - rs = connection.execute("SELECT * FROM t1 WHERE col3 = ?;", ('hefei')) - rs = connection.execute("SELECT * FROM t1 WHERE col3 = ?;",[('hefei'), ('shanghai')]) + rs = connection.exec_driver_sql("SELECT * FROM t1 WHERE col3 = ?;", tuple(['hefei'])) except Exception as e: print(e) ``` ### Execute SQL Query -Use the `connection.execute (sql, request)` interface to execute the SQL request query. You can put the input data into the second parameter of the execute function: +Use the `connection.exec_driver_sql (sql, request)` interface to execute the SQL request query. You can put the input data into the second parameter of the execute function: ```python try: - rs = connection.execute("SELECT * FROM t1", ({"col1":9999, "col2":'2020-12-27', "col3":'zhejiang', "col4":'hangzhou', "col5":100})) + rs = connection.exec_driver_sql("SELECT * FROM t1", ({"col1":9999, "col2":'2020-12-27', "col3":'zhejiang', "col4":'hangzhou', "col5":100})) except Exception as e: print(e) ``` ### Delete Table -Use the `connection.execute (ddl)` interface to delete table `t1`: +Use the `connection.exec_driver_sql (ddl)` interface to delete table `t1`: ```python try: - connection.execute("DROP TABLE t1") + connection.exec_driver_sql("DROP TABLE t1") except Exception as e: print(e) ``` ### Delete Database -Use the connection.execute(ddl)interface to delete database `db1`: +Use the connection.exec_driver_sql(ddl)interface to delete database `db1`: ```python try: - connection.execute("DROP DATABASE db1") + connection.exec_driver_sql("DROP DATABASE db1") except Exception as e: print(e) ``` +### SQLAlchemy Version Differences + +Differences in Native SQL Usage: In SQLAlchemy 1.4, the method `connection.execute()` is used, while in SQLAlchemy 2.0, the method `connection.exec_driver_sql()` is used. The general differences between these two methods are as follows, for more details, refer to the official documentation. + +```python +# DDL Example1 - [SQLAlchemy 1.4] +connection.execute("CREATE TABLE t1 (col1 bigint, col2 date)") +# DDL Example1 - [SQLAlchemy 2.0] +connection.exec_driver_sql("CREATE TABLE t1 (col1 bigint, col2 date)") + +# Insert Example1 - [SQLAlchemy 1.4] +connection.execute("INSERT INTO t1 VALUES(1000, '2020-12-25');") +connection.execute("INSERT INTO t1 VALUES(?, ?);", ({"col1":1001, "col2":"2020-12-26"})) +connection.execute("INSERT INTO t1 VALUES(?, ?);", [{"col1":1002, "col2":"2020-12-27"}]) +# Insert Example1 - [SQLAlchemy 2.0] +connection.exec_driver_sql("INSERT INTO t1 VALUES(1000, '2020-12-25');") +connection.exec_driver_sql("INSERT INTO t1 VALUES(?, ?);", ({"col1":1001, "col2":"2020-12-26"})) +connection.exec_driver_sql("INSERT INTO t1 VALUES(?, ?);", [{"col1":1002, "col2":"2020-12-27"}]) + +# Query Example1 - [SQLAlchemy 1.4] - Native SQL Query +connection.execute("select * from t1 where col3 = ?;", 'hefei') +connection.execute("select * from t1 where col3 = ?;", ['hefei']) +connection.execute("select * from t1 where col3 = ?;", [('hefei')]) +# Query Example1 - [SQLAlchemy 2.0] - Native SQL Query +connection.exec_driver_sql("select * from t1 where col3 = ?;", tuple(['hefei'])) + +# Query Example2 - [SQLAlchemy 1.4] - ORM Query +connection.execute(select([self.test_table])) +# Query Example2 - [SQLAlchemy 2.0] - ORM Query +connection.execute(select(self.test_table)) + +# Query Example3 - [SQLAlchemy 1.4] - SQL Request Query +connection.execute("SELECT * FROM t1", ({"col1":9999, "col2":'2020-12-28'})) +# Query Example3 - [SQLAlchemy 2.0] - SQL Request Query +connection.exec_driver_sql("SELECT * FROM t1", ({"col1":9999, "col2":'2020-12-28'})) + +``` + ## Notebook Magic Function The OpenMLDB Python SDK supports the expansion of Notebook magic function. Use the following statement to register the function. diff --git a/docs/zh/quickstart/sdk/python_sdk.md b/docs/zh/quickstart/sdk/python_sdk.md index da70edd9daa..64cdcb22ddf 100644 --- a/docs/zh/quickstart/sdk/python_sdk.md +++ b/docs/zh/quickstart/sdk/python_sdk.md @@ -117,6 +117,8 @@ cursor.close() 本节演示通过 OpenMLDB SQLAlchemy 使用 Python SDK。同样的,所有dbapi接口如果执行失败,会抛出异常`DatabaseError`,用户可自行捕获异常并处理。返回值处理参考SQLAlchemy标准。 +集成的SQLAlchemy默认版本为2.0,同时兼容旧版本1.4。若用户的SQLAlchemy版本为1.4,可以根据[版本差异](python_sdk.md#sqlalchemy-版本差异)调整接口名称。OpenMLDB SDK在0.8.5版本及之前仅支持1.4版本,从0.8.5版本之后(不包括0.8.5)才开始支持2.0版本。 + ### 创建连接 ```python @@ -135,98 +137,135 @@ connection = engine.connect() ### 创建数据库 -使用 `connection.execute()` 接口创建数据库 `db1`: +使用 `connection.exec_driver_sql()` 接口创建数据库 `db1`: ```python try: - connection.execute("CREATE DATABASE db1") + connection.exec_driver_sql("CREATE DATABASE db1") except Exception as e: print(e) -connection.execute("USE db1") +connection.exec_driver_sql("USE db1") ``` ### 创建表 -使用 `connection.execute()` 接口创建表 `t1`: +使用 `connection.exec_driver_sql()` 接口创建表 `t1`: ```python try: - connection.execute("CREATE TABLE t1 ( col1 bigint, col2 date, col3 string, col4 string, col5 int, index(key=col3, ts=col1))") + connection.exec_driver_sql("CREATE TABLE t1 ( col1 bigint, col2 date, col3 string, col4 string, col5 int, index(key=col3, ts=col1))") except Exception as e: print(e) ``` ### 插入数据到表中 -使用 `connection.execute(ddl)` 接口执行 SQL 的插入语句,可以向表中插入数据: +使用 `connection.exec_driver_sql(ddl)` 接口执行 SQL 的插入语句,可以向表中插入数据: ```python try: - connection.execute("INSERT INTO t1 VALUES(1000, '2020-12-25', 'guangdon', 'shenzhen', 1);") + connection.exec_driver_sql("INSERT INTO t1 VALUES(1000, '2020-12-25', 'guangdon', 'shenzhen', 1);") except Exception as e: print(e) ``` -使用 `connection.execute(ddl, data)` 接口执行带 planceholder 的 SQL 的插入语句,可以动态指定插入数据,也可插入多行: +使用 `connection.exec_driver_sql(ddl, data)` 接口执行带 planceholder 的 SQL 的插入语句,可以动态指定插入数据,也可插入多行: ```python try: insert = "INSERT INTO t1 VALUES(1002, '2020-12-27', ?, ?, 3);" - connection.execute(insert, ({"col3":"fujian", "col4":"fuzhou"})) - connection.execute(insert, [{"col3":"jiangsu", "col4":"nanjing"}, {"col3":"zhejiang", "col4":"hangzhou"}]) + connection.exec_driver_sql(insert, ({"col3":"fujian", "col4":"fuzhou"})) + connection.exec_driver_sql(insert, [{"col3":"jiangsu", "col4":"nanjing"}, {"col3":"zhejiang", "col4":"hangzhou"}]) except Exception as e: print(e) ``` ### 执行 SQL 批式查询 -使用 `connection.execute(sql)` 接口执行 SQL 批式查询语句: +使用 `connection.exec_driver_sql(sql)` 接口执行 SQL 批式查询语句: ```python try: - rs = connection.execute("SELECT * FROM t1") + rs = connection.exec_driver_sql("SELECT * FROM t1") for row in rs: print(row) - rs = connection.execute("SELECT * FROM t1 WHERE col3 = ?;", ('hefei')) - rs = connection.execute("SELECT * FROM t1 WHERE col3 = ?;", [('hefei'), ('shanghai')]) + rs = connection.exec_driver_sql("SELECT * FROM t1 WHERE col3 = ?;", tuple(['hefei'])) except Exception as e: print(e) ``` ### 执行 SQL 请求式查询 -使用 `connection.execute(sql, request)` 接口执行 SQL 请求式查询,可以把输入数据放到 execute 函数的第二个参数中: +使用 `connection.exec_driver_sql(sql, request)` 接口执行 SQL 请求式查询,可以把输入数据放到 execute 函数的第二个参数中: ```python try: - rs = connection.execute("SELECT * FROM t1", ({"col1":9999, "col2":'2020-12-27', "col3":'zhejiang', "col4":'hangzhou', "col5":100})) + rs = connection.exec_driver_sql("SELECT * FROM t1", ({"col1":9999, "col2":'2020-12-27', "col3":'zhejiang', "col4":'hangzhou', "col5":100})) except Exception as e: print(e) ``` ### 删除表 -使用 `connection.execute(ddl)` 接口删除表 `t1`: +使用 `connection.exec_driver_sql(ddl)` 接口删除表 `t1`: ```python try: - connection.execute("DROP TABLE t1") + connection.exec_driver_sql("DROP TABLE t1") except Exception as e: print(e) ``` ### 删除数据库 -使用 `connection.execute(ddl)` 接口删除数据库 `db1`: +使用 `connection.exec_driver_sql(ddl)` 接口删除数据库 `db1`: ```python try: - connection.execute("DROP DATABASE db1") + connection.exec_driver_sql("DROP DATABASE db1") except Exception as e: print(e) ``` +### SQLAlchemy 版本差异 + +原生SQL使用差异,SQLAlchemy 1.4 版本使用`connection.execute()`方法,SQLAlchemy 2.0 版本使用`connection.exec_driver_sql()`方法,两个方法的常规差异如下,详细可参考官方文档。 + +```python +# DDL案例1-[SQLAlchemy 1.4] +connection.execute("CREATE TABLE t1 (col1 bigint, col2 date)") +# DDL案例1-[SQLAlchemy 2.0] +connection.exec_driver_sql("CREATE TABLE t1 (col1 bigint, col2 date)") + +# 插入案例1-[SQLAlchemy 1.4] +connection.execute("INSERT INTO t1 VALUES(1000, '2020-12-25');") +connection.execute("INSERT INTO t1 VALUES(?, ?);", ({"col1":1001, "col2":"2020-12-26"})) +connection.execute("INSERT INTO t1 VALUES(?, ?);", [{"col1":1002, "col2":"2020-12-27"}]) +# 插入案例1-[SQLAlchemy 2.0] +connection.exec_driver_sql("INSERT INTO t1 VALUES(1000, '2020-12-25');") +connection.exec_driver_sql("INSERT INTO t1 VALUES(?, ?);", ({"col1":1001, "col2":"2020-12-26"})) +connection.exec_driver_sql("INSERT INTO t1 VALUES(?, ?);", [{"col1":1002, "col2":"2020-12-27"}]) + +# 查询案例1-[SQLAlchemy 1.4]-原生SQL查询 +connection.execute("select * from t1 where col3 = ?;", 'hefei') +connection.execute("select * from t1 where col3 = ?;", ['hefei']) +connection.execute("select * from t1 where col3 = ?;", [('hefei')]) +# 查询案例1-[SQLAlchemy 2.0]-原生SQL查询 +connection.exec_driver_sql("select * from t1 where col3 = ?;", tuple(['hefei'])) + +# 查询案例2-[SQLAlchemy 1.4]-ORM查询 +connection.execute(select([self.test_table])) +# 查询案例2-[SQLAlchemy 2.0]-ORM查询 +connection.execute(select(self.test_table)) + +# 查询案例3-[SQLAlchemy 1.4]-请求式查询 +connection.execute("SELECT * FROM t1", ({"col1":9999, "col2":'2020-12-28'})) +# 查询案例3-[SQLAlchemy 2.0]-请求式查询 +connection.exec_driver_sql("SELECT * FROM t1", ({"col1":9999, "col2":'2020-12-28'})) + +``` + ## 使用 Notebook Magic Function OpenMLDB Python SDK 支持了 Notebook magic function 拓展,使用以下语句注册函数。 diff --git a/python/openmldb_sdk/setup.py b/python/openmldb_sdk/setup.py index 5403de0c367..fa92ff71911 100644 --- a/python/openmldb_sdk/setup.py +++ b/python/openmldb_sdk/setup.py @@ -29,7 +29,7 @@ ], install_requires=[ "importlib-metadata < 5.0", - "sqlalchemy <= 1.4.50", + "sqlalchemy <= 2.0.27", "IPython <= 7.30.1", "prettytable <= 3.1.0", ], diff --git a/python/openmldb_sdk/tests/openmldb_client_test.py b/python/openmldb_sdk/tests/openmldb_client_test.py index 54dc8963d55..e4fef405acb 100644 --- a/python/openmldb_sdk/tests/openmldb_client_test.py +++ b/python/openmldb_sdk/tests/openmldb_client_test.py @@ -43,9 +43,8 @@ class TestOpenMLDBClient: def setup_class(cls): cls.engine = db.create_engine('openmldb:///?zk={}&zkPath={}'.format(OpenMLDB_ZK_CLUSTER, OpenMLDB_ZK_PATH)) cls.connection = cls.engine.connect() - cls.connection.execute("create database if not exists {};".format( - cls.db)) - cls.connection.execute(f"use {cls.db}") + cls.connection.exec_driver_sql("create database if not exists {};".format(cls.db)) + cls.connection.exec_driver_sql(f"use {cls.db}") @staticmethod def has_table(connection, table_name): @@ -53,12 +52,10 @@ def has_table(connection, table_name): def recreate_table(self, table, schema): if self.has_table(self.connection, table): - self.connection.execute("drop table {}".format(table)) + self.connection.exec_driver_sql("drop table {}".format(table)) assert not self.has_table(self.connection, table) # key is col3, partitionnum==1 - self.connection.execute( - "create table {}({}) OPTIONS(partitionnum=1);".format( - table, schema)) + self.connection.exec_driver_sql("create table {}({}) OPTIONS(partitionnum=1);".format(table, schema)) assert self.has_table(self.connection, table) @staticmethod @@ -120,21 +117,20 @@ def test_basic(self): # 1004, ?, 'hubei', 'wuhan', 5 - 2020-11-29 insert4 = "insert into {} values({});".format( table, self.stringify_join(test_rows[4], [1])) - self.connection.execute(insert0) - self.connection.execute(insert1, ({ + self.connection.exec_driver_sql(insert0) + self.connection.exec_driver_sql(insert1, ({ "col4": test_rows[1][3], "col5": test_rows[1][4] })) - self.connection.execute(insert2, ({ + self.connection.exec_driver_sql(insert2, ({ "col3": test_rows[2][2], "col4": test_rows[2][3] })) - self.connection.execute( - insert3, self.convert_to_dicts(column_names, [test_rows[3]])) - self.connection.execute(insert4, [{"col2": test_rows[4][1]}]) + self.connection.exec_driver_sql(insert3, self.convert_to_dicts(column_names, [test_rows[3]])) + self.connection.exec_driver_sql(insert4, [{"col2": test_rows[4][1]}]) # order by is not supported now - result = self.connection.execute("select * from {};".format(table)) + result = self.connection.exec_driver_sql("select * from {};".format(table)) # fetch many times result_list = result.fetchmany() assert len(result_list) == 1 @@ -148,52 +144,45 @@ def test_basic(self): # insert invalid type with pytest.raises(TypeError): - self.connection.execute(insert1, (2, 2)) + self.connection.exec_driver_sql(insert1, (2, 2)) # insert many new_rows = [(1005, "2020-12-29", "shandong", 'jinan', 6), (1006, "2020-12-30", "fujian", 'fuzhou', 7)] # insert3 is all ? - self.connection.execute(insert3, - self.convert_to_dicts(column_names, new_rows)) + self.connection.exec_driver_sql(insert3, self.convert_to_dicts(column_names, new_rows)) test_rows += new_rows # test fetch all - rs = self.connection.execute("select * from {};".format(table)) + rs = self.connection.exec_driver_sql("select * from {};".format(table)) result = sorted(rs.fetchall(), key=lambda x: x[0]) assert result == test_rows # test convert to list - rs = self.connection.execute("select * from {};".format(table)) + rs = self.connection.exec_driver_sql("select * from {};".format(table)) rs = sorted(list(rs), key=lambda x: x[0]) assert rs == test_rows # test condition select - rs = self.connection.execute( - "select * from {} where col3 = 'hefei';".format(table)) + rs = self.connection.exec_driver_sql("select * from {} where col3 = 'hefei';".format(table)) # hefei row idx == 1 assert list(rs) == [test_rows[1]] # test request mode, select sql with dict parameters(one dict or list of dict) request_row = (9999, "2020-12-27", "zhejiang", "hangzhou", 100) - rs = self.connection.execute( - "select * from {};".format(table), - self.convert_to_dict(column_names, request_row)) + rs = self.connection.exec_driver_sql("select * from {};".format(table), + self.convert_to_dict(column_names, request_row)) assert list(rs) == [request_row] - rs = self.connection.execute( - "select * from {};".format(table), - self.convert_to_dicts(column_names, [request_row])) + rs = self.connection.exec_driver_sql("select * from {};".format(table), + self.convert_to_dicts(column_names, [request_row])) assert list(rs) == [request_row] # test parameterized query in batch mode, select sql with tuple or tuple list - rs = self.connection.execute( - "select * from {} where col3 = ?;".format(table), 'hefei') - assert list(rs) == [test_rows[1]] - rs = self.connection.execute( - "select * from {} where col3 = ?;".format(table), ['hefei']) - assert list(rs) == [test_rows[1]] - rs = self.connection.execute( - "select * from {} where col3 = ?;".format(table), [('hefei')]) + rs = self.connection.exec_driver_sql("select * from {} where col3 = ?;".format(table), tuple(['hefei'])) assert list(rs) == [test_rows[1]] + # rs = self.connection.exec_driver_sql("select * from {} where col3 = ?;".format(table), ['hefei']) + # assert list(rs) == [test_rows[1]] + # rs = self.connection.exec_driver_sql("select * from {} where col3 = ?;".format(table), [('hefei')]) + # assert list(rs) == [test_rows[1]] def test_procedure(self): # TODO(hw): creating procedure is not recommended, test deploy @@ -204,13 +193,13 @@ def test_procedure(self): # try to delete sp before recreate, cause table may have associated deployment 'sp' try: - self.connection.execute("drop procedure sp;") + self.connection.exec_driver_sql("drop procedure sp;") except DatabaseError as e: assert "not found" in str(e) self.recreate_table(table, schema_str) - self.connection.execute( + self.connection.exec_driver_sql( "create procedure sp (col1 bigint, col2 date, col3 string, col4 string, col5 int) " "begin select * from {}; end;".format(table)) @@ -221,7 +210,7 @@ def test_procedure(self): self.convert_to_dict(column_names, test_rows[0])) assert list(rs.fetchall()) == test_rows - self.connection.execute("drop procedure sp;") + self.connection.exec_driver_sql("drop procedure sp;") # test batch request mode mouse2 = self.engine.raw_connection().cursor() @@ -248,43 +237,40 @@ def test_more_parameterized_query(self): 'city' + str(i), i, (1590738990 + i) * 1000) for i in range(1, 10)] - self.connection.execute( + self.connection.exec_driver_sql( "insert into {} values (?, ?, ?, ?, ?, ?);".format(table), self.convert_to_dicts(schema, test_rows)) - rs = self.connection.execute( - "select * from {} where col3 = ?;".format(table), 'province1') + rs = self.connection.exec_driver_sql( + "select * from {} where col3 = ?;".format(table), tuple(['province1'])) rs = sorted(list(rs), key=lambda x: x[0]) assert rs == test_rows[0::4] # test parameterized query in batch mode case 2 - rs = self.connection.execute( - "select * from {} where col3 = ?;".format(table), 'province2') + rs = self.connection.exec_driver_sql( + "select * from {} where col3 = ?;".format(table), tuple(['province2'])) rs = sorted(list(rs), key=lambda x: x[0]) assert rs == test_rows[1::4] # test parameterized query in batch mode case 3 - rs = self.connection.execute( - "select * from {} where col3 = ?;".format(table), 'province3') + rs = self.connection.exec_driver_sql( + "select * from {} where col3 = ?;".format(table), tuple(['province3'])) rs = sorted(list(rs), key=lambda x: x[0]) assert rs == test_rows[2::4] # test parameterized query in batch mode case 3 and col1 < 1004, only one row[2] - rs = self.connection.execute( - "select * from {} where col3 = ? and col1 < ?;".format(table), - ('province3', 1004)) + rs = self.connection.exec_driver_sql("select * from {} where col3 = ? and col1 < ?;".format(table), + ('province3', 1004)) assert list(rs) == [test_rows[2]] # test parameterized query in batch mode case 4 - rs = self.connection.execute( - "select * from {} where col3 = ? and col1 < ? and col2 < ?;".format( - table), ('province3', 2000, date.fromisoformat('2022-05-04'))) + rs = self.connection.exec_driver_sql("select * from {} where col3 = ? and col1 < ? and col2 < ?;".format(table), + ('province3', 2000, date.fromisoformat('2022-05-04'))) assert list(rs) == [test_rows[2]] # test parameterized query in batch mode case 5 - rs = self.connection.execute( - "select * from {} where col3 = ? and col6 < ?;".format(table), - ('province3', datetime.fromtimestamp(1590738997.000))) + rs = self.connection.exec_driver_sql("select * from {} where col3 = ? and col6 < ?;".format(table), + ('province3', datetime.fromtimestamp(1590738997.000))) assert list(rs) == [test_rows[2]] diff --git a/python/openmldb_sdk/tests/sqlalchemy_api_test.py b/python/openmldb_sdk/tests/sqlalchemy_api_test.py index b5c4dbd7b50..c970c2aba3b 100644 --- a/python/openmldb_sdk/tests/sqlalchemy_api_test.py +++ b/python/openmldb_sdk/tests/sqlalchemy_api_test.py @@ -38,8 +38,8 @@ def setup_class(self): 'openmldb:///?zk={}&zkPath={}'.format( OpenMLDB_ZK_CLUSTER, OpenMLDB_ZK_PATH)) self.connection = self.engine.connect() - self.connection.execute('create database if not exists db_test') - self.connection.execute('use db_test') + self.connection.exec_driver_sql('create database if not exists db_test') + self.connection.exec_driver_sql('use db_test') self.metadata = MetaData() self.test_table = Table('test_table', self.metadata, Column('x', String), Column('y', Integer)) @@ -53,13 +53,13 @@ def test_insert(self): y=100)) def test_select(self): - for row in self.connection.execute(select([self.test_table])): + for row in self.connection.execute(select(self.test_table)): assert 'first' in list(row) assert 100 in list(row) @pytest.mark.skip(reason="test may fail to init") def test_request_timeout(self): - self.connection.execute( + self.connection.exec_driver_sql( "insert into test_table (y, x) values(400, 'a'),(401,'b'),(402, 'c');" ) @@ -69,7 +69,7 @@ def test_request_timeout(self): connection = engine.connect() with pytest.raises(DatabaseError) as e: - connection.execute( + connection.exec_driver_sql( "select * from test_table where x='b'").fetchall() assert 'select fail' in str(e.value) @@ -79,17 +79,17 @@ def test_zk_log(self): 'openmldb:///db_test?zk={}&zkPath={}&zkLogLevel=0'.format( OpenMLDB_ZK_CLUSTER, OpenMLDB_ZK_PATH)) connection = engine.connect() - connection.execute("select 1;") + connection.exec_driver_sql("select 1;") # redirect to /tmp/test_openmldb_zk.log, may core dump when client close # engine = db.create_engine( # 'openmldb:///db_test?zk={}&zkPath={}&zkLogFile=/tmp/test_openmldb_zk.log'.format( # OpenMLDB_ZK_CLUSTER, OpenMLDB_ZK_PATH)) # connection = engine.connect() - # connection.execute("select 1;") + # connection.exec_driver_sql("select 1;") def teardown_class(self): - self.connection.execute("drop table test_table;") + self.connection.exec_driver_sql("drop table test_table;") self.connection.close() diff --git a/python/openmldb_tool/diagnostic_tool/connector.py b/python/openmldb_tool/diagnostic_tool/connector.py index 5c69ad9a1d4..1466da15a71 100644 --- a/python/openmldb_tool/diagnostic_tool/connector.py +++ b/python/openmldb_tool/diagnostic_tool/connector.py @@ -54,10 +54,10 @@ def get_conn(self): def execute(self, sql): """ddl won't return resultset, can not fetchall""" - return self.conn.execute(sql) + return self.conn.exec_driver_sql(sql) def execfetch(self, sql, show=False): - cr = self.conn.execute(sql) + cr = self.conn.exec_driver_sql(sql) res = cr.fetchall() if show: t = PrettyTable(cr.keys())