From f172e4552ee6dcae03057dd710939e977318891e Mon Sep 17 00:00:00 2001 From: Zac Davies <80654433+zacdav-db@users.noreply.github.com> Date: Mon, 6 May 2024 14:13:39 +1000 Subject: [PATCH] SQL Connector Bindings + SQL Execution API (#44) Changes * Adding tests for sql connector and sql execution api * Adjusting db_host(), no longer required to specify scheme (https/http) as brickster now will override to https. * Incrementing to 0.2.2 * Skip sql connector tests when env isn't available * Adjusting docs for R CMD Check --------- Co-authored-by: Zac Davies --- DESCRIPTION | 9 +- NAMESPACE | 12 + R/data-structures.R | 2 +- R/databricks-helpers.R | 31 ++ R/package-auth.R | 18 +- R/request-helpers.R | 15 +- R/sql-connector.R | 345 +++++++++++++++++++ R/sql-query-execution.R | 243 +++++++++++++ R/zzz.R | 20 ++ README.md | 5 +- _pkgdown.yml | 7 + man/DatabricksSqlClient.Rd | 418 +++++++++++++++++++++++ man/db_sql_client.Rd | 68 ++++ man/db_sql_exec_cancel.Rd | 39 +++ man/db_sql_exec_query.Rd | 116 +++++++ man/db_sql_exec_result.Rd | 51 +++ man/db_sql_exec_status.Rd | 46 +++ man/determine_brickster_venv.Rd | 15 + man/install_db_sql_connector.Rd | 39 +++ man/lib_maven.Rd | 2 +- man/py_db_sql_connector.Rd | 24 ++ tests/testthat/helper-skip.R | 11 + tests/testthat/test-auth.R | 43 ++- tests/testthat/test-databricks-helpers.R | 13 + tests/testthat/test-request-helpers.R | 8 +- tests/testthat/test-sql-connector.R | 91 +++++ tests/testthat/test-sql-execution.R | 103 ++++++ 27 files changed, 1779 insertions(+), 15 deletions(-) create mode 100644 R/databricks-helpers.R create mode 100644 R/sql-connector.R create mode 100644 R/sql-query-execution.R create mode 100644 man/DatabricksSqlClient.Rd create mode 100644 man/db_sql_client.Rd create mode 100644 man/db_sql_exec_cancel.Rd create mode 100644 man/db_sql_exec_query.Rd create mode 100644 man/db_sql_exec_result.Rd create mode 100644 man/db_sql_exec_status.Rd create mode 100644 man/determine_brickster_venv.Rd create mode 100644 man/install_db_sql_connector.Rd create mode 100644 man/py_db_sql_connector.Rd create mode 100644 tests/testthat/test-databricks-helpers.R create mode 100644 tests/testthat/test-sql-connector.R create mode 100644 tests/testthat/test-sql-execution.R diff --git a/DESCRIPTION b/DESCRIPTION index 1fe16c9..97d0db2 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -1,6 +1,6 @@ Package: brickster Title: R Toolkit for Databricks -Version: 0.2.1 +Version: 0.2.2 Authors@R: c( person(given = "Zac", @@ -18,6 +18,7 @@ License: Apache License (>= 2) Encoding: UTF-8 LazyData: true Imports: + arrow, base64enc, cli, curl, @@ -29,7 +30,11 @@ Imports: jsonlite, magrittr, purrr, - rlang + reticulate, + R6 (>= 2.4.0), + rlang, + tibble, + utils Suggests: testthat (>= 3.0.0), htmltools, diff --git a/NAMESPACE b/NAMESPACE index 3278eae..01ffdc4 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -1,5 +1,6 @@ # Generated by roxygen2: do not edit by hand +export(DatabricksSqlClient) export(access_control_req_group) export(access_control_req_user) export(access_control_request) @@ -90,6 +91,11 @@ export(db_secrets_scope_acl_put) export(db_secrets_scope_create) export(db_secrets_scope_delete) export(db_secrets_scope_list_all) +export(db_sql_client) +export(db_sql_exec_cancel) +export(db_sql_exec_query) +export(db_sql_exec_result) +export(db_sql_exec_status) export(db_sql_global_warehouse_get) export(db_sql_query_history) export(db_sql_warehouse_create) @@ -116,6 +122,7 @@ export(db_workspace_list) export(db_workspace_mkdirs) export(db_wsid) export(dbfs_storage_info) +export(determine_brickster_venv) export(docker_image) export(email_notifications) export(file_storage_info) @@ -126,6 +133,7 @@ export(get_latest_dbr) export(git_source) export(in_databricks_nb) export(init_script_info) +export(install_db_sql_connector) export(is.access_control_req_group) export(is.access_control_req_user) export(is.access_control_request) @@ -174,6 +182,7 @@ export(notebook_task) export(notebook_use_posit_repo) export(open_workspace) export(pipeline_task) +export(py_db_sql_connector) export(python_wheel_task) export(remove_lib_path) export(s3_storage_info) @@ -183,8 +192,11 @@ export(spark_jar_task) export(spark_python_task) export(spark_submit_task) export(wait_for_lib_installs) +import(R6) +import(arrow) import(cli) import(httr2) +import(tibble) importFrom(glue,glue) importFrom(magrittr,`%>%`) importFrom(rlang,.data) diff --git a/R/data-structures.R b/R/data-structures.R index 50b7b63..642a986 100644 --- a/R/data-structures.R +++ b/R/data-structures.R @@ -718,7 +718,7 @@ is.lib_pypi <- function(x) { #' `org.jsoup:jsoup:1.7.2`. #' @param repo Maven repo to install the Maven package from. If omitted, both #' Maven Central Repository and Spark Packages are searched. -#' @param exclusions List of dependences to exclude. For example: +#' @param exclusions List of dependencies to exclude. For example: #' `list("slf4j:slf4j", "*:hadoop-client")`. #' [Maven dependency exclusions](https://maven.apache.org/guides/introduction/introduction-to-optional-and-excludes-dependencies.html). #' diff --git a/R/databricks-helpers.R b/R/databricks-helpers.R new file mode 100644 index 0000000..ac15d64 --- /dev/null +++ b/R/databricks-helpers.R @@ -0,0 +1,31 @@ +on_databricks <- function() { + dbr <- Sys.getenv("DATABRICKS_RUNTIME_VERSION") + dbr != "" +} + +in_databricks_nb <- function() { + ("/databricks/spark/R/lib" %in% .libPaths()) && + exists("DATABRICKS_GUID", envir = .GlobalEnv) +} + +use_posit_repo <- function() { + if (in_databricks_nb()) { + codename <- system("lsb_release -c --short", intern = T) + mirror <- paste0("https://packagemanager.posit.co/cran/__linux__/", codename, "/latest") + options(repos = c(POSIT = mirror)) + } +} + +#' Determine brickster virtualenv +#' +#' @details Returns `NULL` when running within Databricks, +#' otherwise `"r-brickster"` +#' +#' @export +determine_brickster_venv <- function() { + if (on_databricks()) { + NULL + } else { + "r-brickster" + } +} diff --git a/R/package-auth.R b/R/package-auth.R index af884d1..b503d2e 100644 --- a/R/package-auth.R +++ b/R/package-auth.R @@ -39,9 +39,25 @@ db_host <- function(id = NULL, prefix = NULL, profile = getOption("db_profile", if (is.null(id) && is.null(prefix)) { host <- read_env_var(key = "host", profile = profile) + parsed_url <- httr2::url_parse(host) + + # inject scheme if not present then re-build with https + if (is.null(parsed_url$schema)) { + parsed_url$scheme <- "https" + } + + # if hostname is missing change path to host + if (is.null(parsed_url$host)) { + parsed_url$hostname <- parsed_url$path + parsed_url$path <- NULL + } + + host <- httr2::url_build(parsed_url) + host <- httr2::url_parse(host)$hostname + } else { # otherwise construct host string - host <- paste0("https://", prefix, id, ".cloud.databricks.com") + host <- paste0(prefix, id, ".cloud.databricks.com") } host diff --git a/R/request-helpers.R b/R/request-helpers.R index 8343857..ac75be7 100644 --- a/R/request-helpers.R +++ b/R/request-helpers.R @@ -17,10 +17,19 @@ #' @importFrom magrittr `%>%` db_request <- function(endpoint, method, version = NULL, body = NULL, host, token, ...) { - req <- httr2::request(base_url = paste0(host, "api", "/", version, "/")) %>% + url <- list( + scheme = "https", + hostname = host, + path = paste0("/api/", version) + ) + + url <- httr2::url_build(url) + user_agent_str <- paste0("brickster/", utils::packageVersion("brickster")) + + req <- httr2::request(base_url = url) %>% httr2::req_auth_bearer_token(token) %>% - httr2::req_headers("User-Agent" = "brickster/1.0") %>% - httr2::req_user_agent(string = "brickster/1.0") %>% + httr2::req_headers("User-Agent" = user_agent_str) %>% + httr2::req_user_agent(string = user_agent_str) %>% httr2::req_url_path_append(endpoint) %>% httr2::req_method(method) %>% httr2::req_retry(max_tries = 3, backoff = ~ 2) diff --git a/R/sql-connector.R b/R/sql-connector.R new file mode 100644 index 0000000..f5d91a7 --- /dev/null +++ b/R/sql-connector.R @@ -0,0 +1,345 @@ +#' Install Databricks SQL Connector (Python) +#' +#' @inheritParams reticulate::py_install +#' @details Installs [`databricks-sql-connector`](https://github.com/databricks/databricks-sql-python). +#' Environemnt is resolved by [determine_brickster_venv()] which defaults to +#' `r-brickster` virtualenv. +#' +#' When running within Databricks it will use the existing python environment. +#' +#' @export +#' +#' @examples +#' \dontrun{install_db_sql_connector()} +install_db_sql_connector <- function(envname = determine_brickster_venv(), + method = "auto", ...) { + reticulate::py_install( + "databricks-sql-connector", + envname = envname, + method = method, + ... + ) +} + +#' Create Databricks SQL Connector Client +#' +#' @details Create client using Databricks SQL Connector. +#' +#' @param id String, ID of either the SQL warehouse or all purpose cluster. +#' Important to set `compute_type` to the associated type of `id`. +#' @param catalog Initial catalog to use for the connection. Defaults to `NULL` +#' in which case the default catalog will be used. +#' @param schema Initial schema to use for the connection. Defaults to `NULL` +#' in which case the default catalog will be used. +#' @param compute_type One of `"warehouse"` (default) or `"cluster"`, corresponding to +#' associated compute type of the resource specified in `id`. +#' @param use_cloud_fetch Boolean (default is `FALSE`). `TRUE` to send fetch +#' requests directly to the cloud object store to download chunks of data. +#' `FALSE` to send fetch requests directly to Databricks. +#' +#' If `use_cloud_fetch` is set to `TRUE` but network access is blocked, then +#' the fetch requests will fail. +#' @param session_configuration A optional named list of Spark session +#' configuration parameters. Setting a configuration is equivalent to using the +#' `SET key=val` SQL command. +#' Run the SQL command `SET -v` to get a full list of available configurations. +#' @param workspace_id String, workspace Id used to build the http path for the +#' connection. This defaults to using [db_wsid()] to get `DATABRICKS_WSID` +#' environment variable. Not required if `compute_type` is `"cluster"`. +#' @param ... passed onto [DatabricksSqlClient()]. +#' @inheritParams db_sql_exec_query +#' @inheritParams auth_params +#' +#' @import arrow +#' @returns [DatabricksSqlClient()] +#' @examples +#' \dontrun{ +#' client <- db_sql_client(id = "", use_cloud_fetch = TRUE) +#' } +#' @export +db_sql_client <- function(id, + catalog = NULL, schema = NULL, + compute_type = c("warehouse", "cluster"), + use_cloud_fetch = FALSE, + session_configuration = list(), + host = db_host(), token = db_token(), + workspace_id = db_wsid(), + ...) { + + compute_type <- match.arg(compute_type) + http_path <- generate_http_path( + id = id, + is_warehouse = compute_type == "warehouse", + workspace_id = workspace_id + ) + + DatabricksSqlClient$new( + host = host, + token = token, + http_path = http_path, + catalog = catalog, + schema = schema, + use_cloud_fetch = use_cloud_fetch, + session_configuration = session_configuration, + ... + ) + +} + +#' @title Databricks SQL Connector +#' +#' @description +#' Wraps the [`databricks-sql-connector`](https://github.com/databricks/databricks-sql-python) +#' using [reticulate](https://rstudio.github.io/reticulate/). +#' +#' [API reference on Databricks docs](https://docs.databricks.com/en/dev-tools/python-sql-connector.html#api-reference) +#' @import R6 +#' @import tibble +#' @export +DatabricksSqlClient <- R6::R6Class( + classname = "db_sql_client", + public = list( + + #' @description + #' Creates a new instance of this [R6][R6::R6Class] class. + #' + #' Note that this object is typically constructed via [db_sql_client()]. + #' + #' @param host (`character(1)`)\cr + #' See [db_sql_client()]. + #' @param token (`character(1)`)\cr + #' See [db_sql_client()]. + #' @param http_path (`character(1)`)\cr + #' See [db_sql_client()]. + #' @param catalog (`character(1)`)\cr + #' See [db_sql_client()]. + #' @param schema (`character(1)`)\cr + #' See [db_sql_client()]. + #' @param use_cloud_fetch (`logical(1)`)\cr + #' See [db_sql_client()]. + #' @param session_configuration (`list(...)`)\cr + #' See [db_sql_client()]. + #' @param ... Parameters passed to [connection method](https://docs.databricks.com/en/dev-tools/python-sql-connector.html#methods) + #' @return [DatabricksSqlClient]. + initialize = function(host, token, http_path, + catalog, schema, + use_cloud_fetch, session_configuration, + ...) { + + private$connection <- py_db_sql_connector$connect( + server_hostname = host, + access_token = token, + http_path = http_path, + use_cloud_fetch = use_cloud_fetch, + session_configuration = session_configuration, + ... + ) + }, + + #' @description + #' Execute a metadata query about the columns. + #' + #' @param catalog_name (`character(1)`)\cr + #' A catalog name to retrieve information about. + #' The `%` character is interpreted as a wildcard. + #' @param schema_name (`character(1)`)\cr + #' A schema name to retrieve information about. + #' The `%` character is interpreted as a wildcard. + #' @param table_name (`character(1)`)\cr + #' A table name to retrieve information about. + #' The `%` character is interpreted as a wildcard. + #' @param column_name (`character(1)`)\cr + #' A column name to retrieve information about. + #' The `%` character is interpreted as a wildcard. + #' @param as_tibble (`logical(1)`)\cr + #' If `TRUE` (default) will return [tibble::tibble], otherwise returns + #' [arrow::Table]. + #' @examples + #' \dontrun{ + #' client$columns(catalog_name = "defa%") + #' client$columns(catalog_name = "default", table_name = "gold_%") + #' } + #' @return [tibble::tibble] or [arrow::Table]. + columns = function(catalog_name = NULL, schema_name = NULL, + table_name = NULL, column_name = NULL, + as_tibble = TRUE) { + cursor <- private$connection$cursor() + on.exit(cursor$close()) + cursor$columns( + catalog_name = catalog_name, + schema_name = schema_name, + table_name = table_name, + column_name = column_name + ) + handle_results(cursor$fetchall_arrow(), as_tibble) + }, + + #' @description + #' Execute a metadata query about the catalogs. + #' + #' @param as_tibble (`logical(1)`)\cr + #' If `TRUE` (default) will return [tibble::tibble], otherwise returns + #' [arrow::Table]. + #' @examples + #' \dontrun{ + #' client$catalogs() + #' } + #' @return [tibble::tibble] or [arrow::Table]. + catalogs = function(as_tibble = TRUE) { + cursor <- private$connection$cursor() + on.exit(cursor$close()) + cursor$catalogs() + handle_results(cursor$fetchall_arrow(), as_tibble) + }, + + #' @description + #' Execute a metadata query about the schemas. + #' + #' @param catalog_name (`character(1)`)\cr + #' A catalog name to retrieve information about. + #' The `%` character is interpreted as a wildcard. + #' @param schema_name (`character(1)`)\cr + #' A schema name to retrieve information about. + #' The `%` character is interpreted as a wildcard. + #' @param as_tibble (`logical(1)`)\cr + #' If `TRUE` (default) will return [tibble::tibble], otherwise returns + #' [arrow::Table]. + #' @examples + #' \dontrun{ + #' client$schemas(catalog_name = "main") + #' } + #' @return [tibble::tibble] or [arrow::Table]. + schemas = function(catalog_name = NULL, schema_name = NULL, + as_tibble = TRUE) { + cursor <- private$connection$cursor() + on.exit(cursor$close()) + cursor$schemas( + catalog_name = catalog_name, + schema_name = schema_name + ) + handle_results(cursor$fetchall_arrow(), as_tibble) + }, + + #' @description + #' Execute a metadata query about tables and views + #' + #' @param catalog_name (`character(1)`)\cr + #' A catalog name to retrieve information about. + #' The `%` character is interpreted as a wildcard. + #' @param schema_name (`character(1)`)\cr + #' A schema name to retrieve information about. + #' The `%` character is interpreted as a wildcard. + #' @param table_name (`character(1)`)\cr + #' A table name to retrieve information about. + #' The `%` character is interpreted as a wildcard. + #' @param table_types (`character()`)\cr + #' A list of table types to match, for example `"TABLE"` or `"VIEW"`. + #' @param as_tibble (`logical(1)`)\cr + #' If `TRUE` (default) will return [tibble::tibble], otherwise returns + #' [arrow::Table]. + #' @return [tibble::tibble] or [arrow::Table]. + tables = function(catalog_name = NULL, schema_name = NULL, + table_name = NULL, table_types = NULL, + as_tibble = TRUE) { + cursor <- private$connection$cursor() + on.exit(cursor$close()) + cursor$tables( + catalog_name = catalog_name, + schema_name = schema_name, + table_name = table_name, + table_types = table_types + ) + handle_results(cursor$fetchall_arrow(), as_tibble) + }, + + #' @description + #' Prepares and then runs a database query or command. + #' + #' @param operation (`character(1)`)\cr + #' The query or command to prepare and then run. + #' @param parameters (`list()`)\cr + #' Optional. A sequence of parameters to use with the operation parameter. + #' @param as_tibble (`logical(1)`)\cr + #' If `TRUE` (default) will return [tibble::tibble], otherwise returns + #' [arrow::Table]. + #' @examples + #' \dontrun{ + #' client$execute("select 1") + #' client$execute("select * from x.y.z limit 100") + #' client$execute( + #' operation = "select * from x.y.z where a < %(threshold)s limit 1000", + #' parameters = list(threshold = 100) + #' ) + #'} + #' @return [tibble::tibble] or [arrow::Table]. + execute = function(operation, parameters = NULL, as_tibble = TRUE) { + cursor <- private$connection$cursor() + on.exit(cursor$close()) + cursor$execute( + operation = operation, + parameters = parameters + ) + handle_results(cursor$fetchall_arrow(), as_tibble) + }, + + #' @description + #' Prepares and then runs a database query or command using all parameter + #' sequences in the seq_of_parameters argument. Only the final result set + #' is retained. + #' + #' @param operation (`character(1)`)\cr + #' The query or command to prepare and then run. + #' @param seq_of_parameters (`list(list())`)\cr + #' A sequence of many sets of parameter values to use with the operation + #' parameter. + #' @param as_tibble (`logical(1)`)\cr + #' If `TRUE` (default) will return [tibble::tibble], otherwise returns + #' [arrow::Table]. + #' @examples + #' \dontrun{ + #' client$execute_many( + #' operation = "select * from x.y.z where a < %(threshold)s limit 1000", + #' seq_of_parameters = list( + #' list(threshold = 100), + #' list(threshold = 200), + #' list(threshold = 300) + #' ) + #' ) + #'} + #' @return [tibble::tibble] or [arrow::Table]. + execute_many = function(operation, seq_of_parameters = NULL, + as_tibble = TRUE) { + cursor <- private$connection$cursor() + on.exit(cursor$close()) + cursor$executemany( + operation = operation, + seq_of_parameters = seq_of_parameters + ) + handle_results(cursor$fetchall_arrow(), as_tibble) + } + + ), + private = list( + connection = NULL, + + finalize = function() { + private$connection$close() + } + ) +) + +generate_http_path <- function(id, is_warehouse = TRUE, + workspace_id = db_wsid()) { + if (is_warehouse) { + paste0("/sql/1.0/warehouses/", id) + } else { + paste0("/sql/protocolv1/o/", workspace_id, "/", id) + } +} + +handle_results <- function(x, as_tibble) { + if (as_tibble) { + x <- dplyr::collect(x) + } + x +} diff --git a/R/sql-query-execution.R b/R/sql-query-execution.R new file mode 100644 index 0000000..678edf8 --- /dev/null +++ b/R/sql-query-execution.R @@ -0,0 +1,243 @@ +# https://docs.databricks.com/api/workspace/statementexecution +# https://docs.databricks.com/en/sql/admin/sql-execution-tutorial.html#language-curl + +#' Execute SQL Query +#' +#' @details Refer to the +#' [web documentation](https://docs.databricks.com/api/workspace/statementexecution/executestatement) +#' for detailed material on interaction of the various parameters and general recommendations +#' +#' @param statement String, the SQL statement to execute. The statement can +#' optionally be parameterized, see `parameters`. +#' @param warehouse_id String, ID of warehouse upon which to execute a statement. +#' @param catalog String, sets default catalog for statement execution, similar +#' to `USE CATALOG` in SQL. +#' @param schema String, sets default schema for statement execution, similar +#' to `USE SCHEMA` in SQL. +#' @param parameters List of Named Lists, parameters to pass into a SQL +#' statement containing parameter markers. +#' +#' A parameter consists of a name, a value, and *optionally* a type. +#' To represent a `NULL` value, the value field may be omitted or set to `NULL` +#' explicitly. +#' +#' See [docs](https://docs.databricks.com/api/workspace/statementexecution/executestatement) +#' for more details. +#' @param row_limit Integer, applies the given row limit to the statement's +#' result set, but unlike the `LIMIT` clause in SQL, it also sets the +#' `truncated` field in the response to indicate whether the result was trimmed +#' due to the limit or not. +#' @param byte_limit Integer, applies the given byte limit to the statement's +#' result size. Byte counts are based on internal data representations and +#' might not match the final size in the requested format. If the result was +#' truncated due to the byte limit, then `truncated` in the response is set to +#' true. When using `EXTERNAL_LINKS` disposition, a default byte_limit of +#' 100 GiB is applied if `byte_limit` is not explicitly set. +#' @param disposition One of `"INLINE"` (default) or `"EXTERNAL_LINKS"`. See +#' [docs](https://docs.databricks.com/api/workspace/statementexecution/executestatement) +#' for details. +#' @param format One of `"JSON_ARRAY"` (default), `"ARROW_STREAM"`, or `"CSV"`. +#' See [docs](https://docs.databricks.com/api/workspace/statementexecution/executestatement) +#' for details. +#' @param wait_timeout String, default is `"10s"`. The time in seconds the call +#' will wait for the statement's result set as `Ns`, where `N` can be set to +#' `0` or to a value between `5` and `50`. +#' When set to `0s`, the statement will execute in asynchronous mode and the +#' call will not wait for the execution to finish. In this case, the call +#' returns directly with `PENDING` state and a statement ID which can be used +#' for polling with [db_sql_exec_status()]. +#' +#' When set between `5` and `50` seconds, the call will behave synchronously up +#' to this timeout and wait for the statement execution to finish. If the +#' execution finishes within this time, the call returns immediately with a +#' manifest and result data (or a `FAILED` state in case of an execution error). +#' +#' If the statement takes longer to execute, `on_wait_timeout` determines what +#' should happen after the timeout is reached. +#' +#' @param on_wait_timeout One of `"CONTINUE"` (default) or `"CANCEL"`. +#' When `wait_timeout` > `0s`, the call will block up to the specified time. +#' If the statement execution doesn't finish within this time, +#' `on_wait_timeout` determines whether the execution should continue or be +#' canceled. +#' +#' When set to `CONTINUE`, the statement execution continues asynchronously and +#' the call returns a statement ID which can be used for polling with +#' [db_sql_exec_status()]. +#' +#' When set to `CANCEL`, the statement execution is canceled and the call +#' returns with a `CANCELED` state. +#' @inheritParams auth_params +#' @inheritParams db_sql_warehouse_create +#' +#' @family SQL Execution APIs +#' +#' @export +db_sql_exec_query <- function(statement, warehouse_id, + catalog = NULL, schema = NULL, parameters = NULL, + row_limit = NULL, byte_limit = NULL, + disposition = c("INLINE", "EXTERNAL_LINKS"), + format = c("JSON_ARRAY", "ARROW_STREAM", "CSV"), + wait_timeout = "10s", + on_wait_timeout = c("CONTINUE", "CANCEL"), + host = db_host(), token = db_token(), + perform_request = TRUE) { + + disposition <- match.arg(disposition) + format <- match.arg(format) + on_wait_timeout <- match.arg(on_wait_timeout) + + body <- list( + statement = statement, + warehouse_id = warehouse_id, + catalog = catalog, + schema = schema, + parameters = parameters, + row_limit = row_limit, + byte_limit = byte_limit, + disposition = disposition, + format = format, + wait_timeout = wait_timeout, + on_wait_timeout = on_wait_timeout + ) + + req <- db_request( + endpoint = "sql/statements", + method = "POST", + version = "2.0", + body = body, + host = host, + token = token + ) + + if (perform_request) { + db_perform_request(req) + } else { + req + } + +} + +#' Cancel SQL Query +#' +#' @details +#' Requests that an executing statement be canceled. Callers must poll for +#' status to see the terminal state. +#' +#' [Read more on Databricks API docs](https://docs.databricks.com/api/workspace/statementexecution/cancelexecution) +#' +#' @param statement_id String, query execution `statement_id` +#' @inheritParams auth_params +#' @inheritParams db_sql_warehouse_create +#' +#' @family SQL Execution APIs +#' +#' @export +db_sql_exec_cancel <- function(statement_id, + host = db_host(), token = db_token(), + perform_request = TRUE) { + + req <- db_request( + endpoint = paste0("sql/statements/", statement_id, "/cancel"), + method = "POST", + version = "2.0", + host = host, + token = token + ) + + if (perform_request) { + db_perform_request(req) + } else { + req + } + +} + + +#' Get SQL Query Status +#' +#' @details +#' This request can be used to poll for the statement's status. +#' When the `status.state` field is `SUCCEEDED` it will also return the result +#' manifest and the first chunk of the result data. +#' +#' When the statement is in the terminal states `CANCELED`, `CLOSED` or +#' `FAILED`, it returns HTTP `200` with the state set. +#' +#' After at least 12 hours in terminal state, the statement is removed from the +#' warehouse and further calls will receive an HTTP `404` response. +#' +#' [Read more on Databricks API docs](https://docs.databricks.com/api/workspace/statementexecution/getstatement) +#' +#' @inheritParams auth_params +#' @inheritParams db_sql_exec_cancel +#' @inheritParams db_sql_warehouse_create +#' +#' @family SQL Execution APIs +#' +#' @export +db_sql_exec_status <- function(statement_id, + host = db_host(), token = db_token(), + perform_request = TRUE) { + + req <- db_request( + endpoint = paste0("sql/statements/", statement_id), + method = "GET", + version = "2.0", + host = host, + token = token + ) + + if (perform_request) { + db_perform_request(req) + } else { + req + } + +} + + +#' Get SQL Query Results +#' +#' @details +#' After the statement execution has `SUCCEEDED`, this request can be used to +#' fetch any chunk by index. +#' +#' Whereas the first chunk with chunk_index = `0` is typically fetched with +#' [db_sql_exec_result()] or [db_sql_exec_status()], this request can be used +#' to fetch subsequent chunks +#' +#' The response structure is identical to the nested result element described +#' in the [db_sql_exec_result()] request, and similarly includes the +#' `next_chunk_index` and `next_chunk_internal_link` fields for simple +#' iteration through the result set. +#' +#' [Read more on Databricks API docs](https://docs.databricks.com/api/workspace/statementexecution/getstatementresultchunkn) +#' +#' @param chunk_index Integer, chunk index to fetch result. Starts from `0`. +#' @inheritParams db_sql_exec_cancel +#' @inheritParams auth_params +#' @inheritParams db_sql_warehouse_create +#' +#' @family SQL Execution APIs +#' +#' @export +db_sql_exec_result <- function(statement_id, chunk_index, + host = db_host(), token = db_token(), + perform_request = TRUE) { + + req <- db_request( + endpoint = paste0("sql/statements/", statement_id, "/result/chunks/", chunk_index), + method = "GET", + version = "2.0", + host = host, + token = token + ) + + if (perform_request) { + db_perform_request(req) + } else { + req + } + +} diff --git a/R/zzz.R b/R/zzz.R index 419fffe..bffbcce 100644 --- a/R/zzz.R +++ b/R/zzz.R @@ -1,3 +1,17 @@ +#' Databricks SQL Connector (Python) +#' +#' @description Access the Databricks SQL connector from Python via +#' `{reticulate}`. +#' +#' @details This requires that the connector has been installed via +#' [install_db_sql_connector()]. +#' +#' For more documentation of the methods, refer to the +#' [python documentation](https://github.com/databricks/databricks-sql-python). +#' +#' @export +py_db_sql_connector <- NULL + .onLoad <- function(libname, pkgname) { if (requireNamespace("knitr", quietly = TRUE)) { knitr::knit_engines$set( @@ -13,4 +27,10 @@ db_sh = db_engine_sh ) } + + py_db_sql_connector <<- reticulate::import("databricks.sql", delay_load = TRUE) + + venv <- determine_brickster_venv() + reticulate::use_virtualenv(virtualenv = venv, required = FALSE) + } diff --git a/README.md b/README.md index a479dc8..917e61f 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,8 @@ - Integrate with RStudio Connections Pane (`open_workspace()`) +- Exposes the [`databricks-sql-connector`](https://github.com/databricks/databricks-sql-python) via `{reticulate}` + - Utility functions to streamline workloads ## Installation @@ -27,7 +29,7 @@ Docs website has [an article](https://zacdav-db.github.io/brickster/articles/set ## API Coverage | API | Available | Version | -|----------------------------------------------------|----------|----------| +|----------------------------------------------|-------------|-------------| | [DBFS](https://docs.databricks.com/dev-tools/api/latest/dbfs.html) | Yes | 2.0 | | [Secrets](https://docs.databricks.com/dev-tools/api/latest/secrets.html) | Yes | 2.0 | | [Repos](https://docs.databricks.com/dev-tools/api/latest/repos.html) | Yes | 2.0 | @@ -39,5 +41,6 @@ Docs website has [an article](https://zacdav-db.github.io/brickster/articles/set | [Query History](https://docs.databricks.com/sql/api/query-history.html) | Yes | 2.0 | | [Jobs](https://docs.databricks.com/dev-tools/api/latest/jobs.html) | Yes | 2.1 | | [Volumes (Files)](https://docs.databricks.com/api/workspace/files) | Yes | 2.0 | +| [SQL Statement Execution](https://docs.databricks.com/api/workspace/statementexecution) | Yes | 2.0 | | [REST 1.2 Commands](https://docs.databricks.com/dev-tools/api/1.2/index.html) | Partially | 1.2 | | [Unity Catalog](https://api-docs.databricks.com/rest/latest/unity-catalog-api-specification-2-1.html) | Partially | 2.1 | diff --git a/_pkgdown.yml b/_pkgdown.yml index 47559c7..1648343 100644 --- a/_pkgdown.yml +++ b/_pkgdown.yml @@ -87,6 +87,13 @@ reference: contents: starts_with("db_repo", internal = TRUE) - title: Secrets contents: starts_with("db_secrets", internal = TRUE) +- title: SQL Statement Execution + contents: starts_with("db_sql_exec", internal = TRUE) +- title: SQL Connector + contents: + - install_db_sql_connector + - db_sql_client + - DatabricksSqlClient - title: Warehouses contents: - starts_with("db_sql", internal = TRUE) diff --git a/man/DatabricksSqlClient.Rd b/man/DatabricksSqlClient.Rd new file mode 100644 index 0000000..3dacb60 --- /dev/null +++ b/man/DatabricksSqlClient.Rd @@ -0,0 +1,418 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/sql-connector.R +\name{DatabricksSqlClient} +\alias{DatabricksSqlClient} +\title{Databricks SQL Connector} +\description{ +Wraps the \href{https://github.com/databricks/databricks-sql-python}{\code{databricks-sql-connector}} +using \href{https://rstudio.github.io/reticulate/}{reticulate}. + +\href{https://docs.databricks.com/en/dev-tools/python-sql-connector.html#api-reference}{API reference on Databricks docs} +} +\examples{ + +## ------------------------------------------------ +## Method `DatabricksSqlClient$columns` +## ------------------------------------------------ + +\dontrun{ + client$columns(catalog_name = "defa\%") + client$columns(catalog_name = "default", table_name = "gold_\%") +} + +## ------------------------------------------------ +## Method `DatabricksSqlClient$catalogs` +## ------------------------------------------------ + +\dontrun{ + client$catalogs() +} + +## ------------------------------------------------ +## Method `DatabricksSqlClient$schemas` +## ------------------------------------------------ + +\dontrun{ + client$schemas(catalog_name = "main") +} + +## ------------------------------------------------ +## Method `DatabricksSqlClient$execute` +## ------------------------------------------------ + +\dontrun{ + client$execute("select 1") + client$execute("select * from x.y.z limit 100") + client$execute( + operation = "select * from x.y.z where a < \%(threshold)s limit 1000", + parameters = list(threshold = 100) + ) +} + +## ------------------------------------------------ +## Method `DatabricksSqlClient$execute_many` +## ------------------------------------------------ + +\dontrun{ + client$execute_many( + operation = "select * from x.y.z where a < \%(threshold)s limit 1000", + seq_of_parameters = list( + list(threshold = 100), + list(threshold = 200), + list(threshold = 300) + ) + ) +} +} +\section{Methods}{ +\subsection{Public methods}{ +\itemize{ +\item \href{#method-db_sql_client-new}{\code{DatabricksSqlClient$new()}} +\item \href{#method-db_sql_client-columns}{\code{DatabricksSqlClient$columns()}} +\item \href{#method-db_sql_client-catalogs}{\code{DatabricksSqlClient$catalogs()}} +\item \href{#method-db_sql_client-schemas}{\code{DatabricksSqlClient$schemas()}} +\item \href{#method-db_sql_client-tables}{\code{DatabricksSqlClient$tables()}} +\item \href{#method-db_sql_client-execute}{\code{DatabricksSqlClient$execute()}} +\item \href{#method-db_sql_client-execute_many}{\code{DatabricksSqlClient$execute_many()}} +\item \href{#method-db_sql_client-clone}{\code{DatabricksSqlClient$clone()}} +} +} +\if{html}{\out{
}} +\if{html}{\out{}} +\if{latex}{\out{\hypertarget{method-db_sql_client-new}{}}} +\subsection{Method \code{new()}}{ +Creates a new instance of this \link[R6:R6Class]{R6} class. + +Note that this object is typically constructed via \code{\link[=db_sql_client]{db_sql_client()}}. +\subsection{Usage}{ +\if{html}{\out{
}}\preformatted{DatabricksSqlClient$new( + host, + token, + http_path, + catalog, + schema, + use_cloud_fetch, + session_configuration, + ... +)}\if{html}{\out{
}} +} + +\subsection{Arguments}{ +\if{html}{\out{
}} +\describe{ +\item{\code{host}}{(\code{character(1)})\cr +See \code{\link[=db_sql_client]{db_sql_client()}}.} + +\item{\code{token}}{(\code{character(1)})\cr +See \code{\link[=db_sql_client]{db_sql_client()}}.} + +\item{\code{http_path}}{(\code{character(1)})\cr +See \code{\link[=db_sql_client]{db_sql_client()}}.} + +\item{\code{catalog}}{(\code{character(1)})\cr +See \code{\link[=db_sql_client]{db_sql_client()}}.} + +\item{\code{schema}}{(\code{character(1)})\cr +See \code{\link[=db_sql_client]{db_sql_client()}}.} + +\item{\code{use_cloud_fetch}}{(\code{logical(1)})\cr +See \code{\link[=db_sql_client]{db_sql_client()}}.} + +\item{\code{session_configuration}}{(\code{list(...)})\cr +See \code{\link[=db_sql_client]{db_sql_client()}}.} + +\item{\code{...}}{Parameters passed to \href{https://docs.databricks.com/en/dev-tools/python-sql-connector.html#methods}{connection method}} +} +\if{html}{\out{
}} +} +\subsection{Returns}{ +\link{DatabricksSqlClient}. +} +} +\if{html}{\out{
}} +\if{html}{\out{}} +\if{latex}{\out{\hypertarget{method-db_sql_client-columns}{}}} +\subsection{Method \code{columns()}}{ +Execute a metadata query about the columns. +\subsection{Usage}{ +\if{html}{\out{
}}\preformatted{DatabricksSqlClient$columns( + catalog_name = NULL, + schema_name = NULL, + table_name = NULL, + column_name = NULL, + as_tibble = TRUE +)}\if{html}{\out{
}} +} + +\subsection{Arguments}{ +\if{html}{\out{
}} +\describe{ +\item{\code{catalog_name}}{(\code{character(1)})\cr +A catalog name to retrieve information about. +The \verb{\%} character is interpreted as a wildcard.} + +\item{\code{schema_name}}{(\code{character(1)})\cr +A schema name to retrieve information about. +The \verb{\%} character is interpreted as a wildcard.} + +\item{\code{table_name}}{(\code{character(1)})\cr +A table name to retrieve information about. +The \verb{\%} character is interpreted as a wildcard.} + +\item{\code{column_name}}{(\code{character(1)})\cr +A column name to retrieve information about. +The \verb{\%} character is interpreted as a wildcard.} + +\item{\code{as_tibble}}{(\code{logical(1)})\cr +If \code{TRUE} (default) will return \link[tibble:tibble]{tibble::tibble}, otherwise returns +\link[arrow:Table-class]{arrow::Table}.} +} +\if{html}{\out{
}} +} +\subsection{Returns}{ +\link[tibble:tibble]{tibble::tibble} or \link[arrow:Table-class]{arrow::Table}. +} +\subsection{Examples}{ +\if{html}{\out{
}} +\preformatted{\dontrun{ + client$columns(catalog_name = "defa\%") + client$columns(catalog_name = "default", table_name = "gold_\%") +} +} +\if{html}{\out{
}} + +} + +} +\if{html}{\out{
}} +\if{html}{\out{}} +\if{latex}{\out{\hypertarget{method-db_sql_client-catalogs}{}}} +\subsection{Method \code{catalogs()}}{ +Execute a metadata query about the catalogs. +\subsection{Usage}{ +\if{html}{\out{
}}\preformatted{DatabricksSqlClient$catalogs(as_tibble = TRUE)}\if{html}{\out{
}} +} + +\subsection{Arguments}{ +\if{html}{\out{
}} +\describe{ +\item{\code{as_tibble}}{(\code{logical(1)})\cr +If \code{TRUE} (default) will return \link[tibble:tibble]{tibble::tibble}, otherwise returns +\link[arrow:Table-class]{arrow::Table}.} +} +\if{html}{\out{
}} +} +\subsection{Returns}{ +\link[tibble:tibble]{tibble::tibble} or \link[arrow:Table-class]{arrow::Table}. +} +\subsection{Examples}{ +\if{html}{\out{
}} +\preformatted{\dontrun{ + client$catalogs() +} +} +\if{html}{\out{
}} + +} + +} +\if{html}{\out{
}} +\if{html}{\out{}} +\if{latex}{\out{\hypertarget{method-db_sql_client-schemas}{}}} +\subsection{Method \code{schemas()}}{ +Execute a metadata query about the schemas. +\subsection{Usage}{ +\if{html}{\out{
}}\preformatted{DatabricksSqlClient$schemas( + catalog_name = NULL, + schema_name = NULL, + as_tibble = TRUE +)}\if{html}{\out{
}} +} + +\subsection{Arguments}{ +\if{html}{\out{
}} +\describe{ +\item{\code{catalog_name}}{(\code{character(1)})\cr +A catalog name to retrieve information about. +The \verb{\%} character is interpreted as a wildcard.} + +\item{\code{schema_name}}{(\code{character(1)})\cr +A schema name to retrieve information about. +The \verb{\%} character is interpreted as a wildcard.} + +\item{\code{as_tibble}}{(\code{logical(1)})\cr +If \code{TRUE} (default) will return \link[tibble:tibble]{tibble::tibble}, otherwise returns +\link[arrow:Table-class]{arrow::Table}.} +} +\if{html}{\out{
}} +} +\subsection{Returns}{ +\link[tibble:tibble]{tibble::tibble} or \link[arrow:Table-class]{arrow::Table}. +} +\subsection{Examples}{ +\if{html}{\out{
}} +\preformatted{\dontrun{ + client$schemas(catalog_name = "main") +} +} +\if{html}{\out{
}} + +} + +} +\if{html}{\out{
}} +\if{html}{\out{}} +\if{latex}{\out{\hypertarget{method-db_sql_client-tables}{}}} +\subsection{Method \code{tables()}}{ +Execute a metadata query about tables and views +\subsection{Usage}{ +\if{html}{\out{
}}\preformatted{DatabricksSqlClient$tables( + catalog_name = NULL, + schema_name = NULL, + table_name = NULL, + table_types = NULL, + as_tibble = TRUE +)}\if{html}{\out{
}} +} + +\subsection{Arguments}{ +\if{html}{\out{
}} +\describe{ +\item{\code{catalog_name}}{(\code{character(1)})\cr +A catalog name to retrieve information about. +The \verb{\%} character is interpreted as a wildcard.} + +\item{\code{schema_name}}{(\code{character(1)})\cr +A schema name to retrieve information about. +The \verb{\%} character is interpreted as a wildcard.} + +\item{\code{table_name}}{(\code{character(1)})\cr +A table name to retrieve information about. +The \verb{\%} character is interpreted as a wildcard.} + +\item{\code{table_types}}{(\code{character()})\cr +A list of table types to match, for example \code{"TABLE"} or \code{"VIEW"}.} + +\item{\code{as_tibble}}{(\code{logical(1)})\cr +If \code{TRUE} (default) will return \link[tibble:tibble]{tibble::tibble}, otherwise returns +\link[arrow:Table-class]{arrow::Table}.} +} +\if{html}{\out{
}} +} +\subsection{Returns}{ +\link[tibble:tibble]{tibble::tibble} or \link[arrow:Table-class]{arrow::Table}. +} +} +\if{html}{\out{
}} +\if{html}{\out{}} +\if{latex}{\out{\hypertarget{method-db_sql_client-execute}{}}} +\subsection{Method \code{execute()}}{ +Prepares and then runs a database query or command. +\subsection{Usage}{ +\if{html}{\out{
}}\preformatted{DatabricksSqlClient$execute(operation, parameters = NULL, as_tibble = TRUE)}\if{html}{\out{
}} +} + +\subsection{Arguments}{ +\if{html}{\out{
}} +\describe{ +\item{\code{operation}}{(\code{character(1)})\cr +The query or command to prepare and then run.} + +\item{\code{parameters}}{(\code{list()})\cr +Optional. A sequence of parameters to use with the operation parameter.} + +\item{\code{as_tibble}}{(\code{logical(1)})\cr +If \code{TRUE} (default) will return \link[tibble:tibble]{tibble::tibble}, otherwise returns +\link[arrow:Table-class]{arrow::Table}.} +} +\if{html}{\out{
}} +} +\subsection{Returns}{ +\link[tibble:tibble]{tibble::tibble} or \link[arrow:Table-class]{arrow::Table}. +} +\subsection{Examples}{ +\if{html}{\out{
}} +\preformatted{\dontrun{ + client$execute("select 1") + client$execute("select * from x.y.z limit 100") + client$execute( + operation = "select * from x.y.z where a < \%(threshold)s limit 1000", + parameters = list(threshold = 100) + ) +} +} +\if{html}{\out{
}} + +} + +} +\if{html}{\out{
}} +\if{html}{\out{}} +\if{latex}{\out{\hypertarget{method-db_sql_client-execute_many}{}}} +\subsection{Method \code{execute_many()}}{ +Prepares and then runs a database query or command using all parameter +sequences in the seq_of_parameters argument. Only the final result set +is retained. +\subsection{Usage}{ +\if{html}{\out{
}}\preformatted{DatabricksSqlClient$execute_many( + operation, + seq_of_parameters = NULL, + as_tibble = TRUE +)}\if{html}{\out{
}} +} + +\subsection{Arguments}{ +\if{html}{\out{
}} +\describe{ +\item{\code{operation}}{(\code{character(1)})\cr +The query or command to prepare and then run.} + +\item{\code{seq_of_parameters}}{(\code{list(list())})\cr +A sequence of many sets of parameter values to use with the operation +parameter.} + +\item{\code{as_tibble}}{(\code{logical(1)})\cr +If \code{TRUE} (default) will return \link[tibble:tibble]{tibble::tibble}, otherwise returns +\link[arrow:Table-class]{arrow::Table}.} +} +\if{html}{\out{
}} +} +\subsection{Returns}{ +\link[tibble:tibble]{tibble::tibble} or \link[arrow:Table-class]{arrow::Table}. +} +\subsection{Examples}{ +\if{html}{\out{
}} +\preformatted{\dontrun{ + client$execute_many( + operation = "select * from x.y.z where a < \%(threshold)s limit 1000", + seq_of_parameters = list( + list(threshold = 100), + list(threshold = 200), + list(threshold = 300) + ) + ) +} +} +\if{html}{\out{
}} + +} + +} +\if{html}{\out{
}} +\if{html}{\out{}} +\if{latex}{\out{\hypertarget{method-db_sql_client-clone}{}}} +\subsection{Method \code{clone()}}{ +The objects of this class are cloneable with this method. +\subsection{Usage}{ +\if{html}{\out{
}}\preformatted{DatabricksSqlClient$clone(deep = FALSE)}\if{html}{\out{
}} +} + +\subsection{Arguments}{ +\if{html}{\out{
}} +\describe{ +\item{\code{deep}}{Whether to make a deep clone.} +} +\if{html}{\out{
}} +} +} +} diff --git a/man/db_sql_client.Rd b/man/db_sql_client.Rd new file mode 100644 index 0000000..7adfcf2 --- /dev/null +++ b/man/db_sql_client.Rd @@ -0,0 +1,68 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/sql-connector.R +\name{db_sql_client} +\alias{db_sql_client} +\title{Create Databricks SQL Connector Client} +\usage{ +db_sql_client( + id, + catalog = NULL, + schema = NULL, + compute_type = c("warehouse", "cluster"), + use_cloud_fetch = FALSE, + session_configuration = list(), + host = db_host(), + token = db_token(), + workspace_id = db_wsid(), + ... +) +} +\arguments{ +\item{id}{String, ID of either the SQL warehouse or all purpose cluster. +Important to set \code{compute_type} to the associated type of \code{id}.} + +\item{catalog}{Initial catalog to use for the connection. Defaults to \code{NULL} +in which case the default catalog will be used.} + +\item{schema}{Initial schema to use for the connection. Defaults to \code{NULL} +in which case the default catalog will be used.} + +\item{compute_type}{One of \code{"warehouse"} (default) or \code{"cluster"}, corresponding to +associated compute type of the resource specified in \code{id}.} + +\item{use_cloud_fetch}{Boolean (default is \code{FALSE}). \code{TRUE} to send fetch +requests directly to the cloud object store to download chunks of data. +\code{FALSE} to send fetch requests directly to Databricks. + +If \code{use_cloud_fetch} is set to \code{TRUE} but network access is blocked, then +the fetch requests will fail.} + +\item{session_configuration}{A optional named list of Spark session +configuration parameters. Setting a configuration is equivalent to using the +\verb{SET key=val} SQL command. +Run the SQL command \code{SET -v} to get a full list of available configurations.} + +\item{host}{Databricks workspace URL, defaults to calling \code{\link[=db_host]{db_host()}}.} + +\item{token}{Databricks workspace token, defaults to calling \code{\link[=db_token]{db_token()}}.} + +\item{workspace_id}{String, workspace Id used to build the http path for the +connection. This defaults to using \code{\link[=db_wsid]{db_wsid()}} to get \code{DATABRICKS_WSID} +environment variable. Not required if \code{compute_type} is \code{"cluster"}.} + +\item{...}{passed onto \code{\link[=DatabricksSqlClient]{DatabricksSqlClient()}}.} +} +\value{ +\code{\link[=DatabricksSqlClient]{DatabricksSqlClient()}} +} +\description{ +Create Databricks SQL Connector Client +} +\details{ +Create client using Databricks SQL Connector. +} +\examples{ +\dontrun{ + client <- db_sql_client(id = "", use_cloud_fetch = TRUE) +} +} diff --git a/man/db_sql_exec_cancel.Rd b/man/db_sql_exec_cancel.Rd new file mode 100644 index 0000000..323123b --- /dev/null +++ b/man/db_sql_exec_cancel.Rd @@ -0,0 +1,39 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/sql-query-execution.R +\name{db_sql_exec_cancel} +\alias{db_sql_exec_cancel} +\title{Cancel SQL Query} +\usage{ +db_sql_exec_cancel( + statement_id, + host = db_host(), + token = db_token(), + perform_request = TRUE +) +} +\arguments{ +\item{statement_id}{String, query execution \code{statement_id}} + +\item{host}{Databricks workspace URL, defaults to calling \code{\link[=db_host]{db_host()}}.} + +\item{token}{Databricks workspace token, defaults to calling \code{\link[=db_token]{db_token()}}.} + +\item{perform_request}{If \code{TRUE} (default) the request is performed, if +\code{FALSE} the httr2 request is returned \emph{without} being performed.} +} +\description{ +Cancel SQL Query +} +\details{ +Requests that an executing statement be canceled. Callers must poll for +status to see the terminal state. + +\href{https://docs.databricks.com/api/workspace/statementexecution/cancelexecution}{Read more on Databricks API docs} +} +\seealso{ +Other SQL Execution APIs: +\code{\link{db_sql_exec_query}()}, +\code{\link{db_sql_exec_result}()}, +\code{\link{db_sql_exec_status}()} +} +\concept{SQL Execution APIs} diff --git a/man/db_sql_exec_query.Rd b/man/db_sql_exec_query.Rd new file mode 100644 index 0000000..db22301 --- /dev/null +++ b/man/db_sql_exec_query.Rd @@ -0,0 +1,116 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/sql-query-execution.R +\name{db_sql_exec_query} +\alias{db_sql_exec_query} +\title{Execute SQL Query} +\usage{ +db_sql_exec_query( + statement, + warehouse_id, + catalog = NULL, + schema = NULL, + parameters = NULL, + row_limit = NULL, + byte_limit = NULL, + disposition = c("INLINE", "EXTERNAL_LINKS"), + format = c("JSON_ARRAY", "ARROW_STREAM", "CSV"), + wait_timeout = "10s", + on_wait_timeout = c("CONTINUE", "CANCEL"), + host = db_host(), + token = db_token(), + perform_request = TRUE +) +} +\arguments{ +\item{statement}{String, the SQL statement to execute. The statement can +optionally be parameterized, see \code{parameters}.} + +\item{warehouse_id}{String, ID of warehouse upon which to execute a statement.} + +\item{catalog}{String, sets default catalog for statement execution, similar +to \verb{USE CATALOG} in SQL.} + +\item{schema}{String, sets default schema for statement execution, similar +to \verb{USE SCHEMA} in SQL.} + +\item{parameters}{List of Named Lists, parameters to pass into a SQL +statement containing parameter markers. + +A parameter consists of a name, a value, and \emph{optionally} a type. +To represent a \code{NULL} value, the value field may be omitted or set to \code{NULL} +explicitly. + +See \href{https://docs.databricks.com/api/workspace/statementexecution/executestatement}{docs} +for more details.} + +\item{row_limit}{Integer, applies the given row limit to the statement's +result set, but unlike the \code{LIMIT} clause in SQL, it also sets the +\code{truncated} field in the response to indicate whether the result was trimmed +due to the limit or not.} + +\item{byte_limit}{Integer, applies the given byte limit to the statement's +result size. Byte counts are based on internal data representations and +might not match the final size in the requested format. If the result was +truncated due to the byte limit, then \code{truncated} in the response is set to +true. When using \code{EXTERNAL_LINKS} disposition, a default byte_limit of +100 GiB is applied if \code{byte_limit} is not explicitly set.} + +\item{disposition}{One of \code{"INLINE"} (default) or \code{"EXTERNAL_LINKS"}. See +\href{https://docs.databricks.com/api/workspace/statementexecution/executestatement}{docs} +for details.} + +\item{format}{One of \code{"JSON_ARRAY"} (default), \code{"ARROW_STREAM"}, or \code{"CSV"}. +See \href{https://docs.databricks.com/api/workspace/statementexecution/executestatement}{docs} +for details.} + +\item{wait_timeout}{String, default is \code{"10s"}. The time in seconds the call +will wait for the statement's result set as \code{Ns}, where \code{N} can be set to +\code{0} or to a value between \code{5} and \code{50}. +When set to \verb{0s}, the statement will execute in asynchronous mode and the +call will not wait for the execution to finish. In this case, the call +returns directly with \code{PENDING} state and a statement ID which can be used +for polling with \code{\link[=db_sql_exec_status]{db_sql_exec_status()}}. + +When set between \code{5} and \code{50} seconds, the call will behave synchronously up +to this timeout and wait for the statement execution to finish. If the +execution finishes within this time, the call returns immediately with a +manifest and result data (or a \code{FAILED} state in case of an execution error). + +If the statement takes longer to execute, \code{on_wait_timeout} determines what +should happen after the timeout is reached.} + +\item{on_wait_timeout}{One of \code{"CONTINUE"} (default) or \code{"CANCEL"}. +When \code{wait_timeout} > \verb{0s}, the call will block up to the specified time. +If the statement execution doesn't finish within this time, +\code{on_wait_timeout} determines whether the execution should continue or be +canceled. + +When set to \code{CONTINUE}, the statement execution continues asynchronously and +the call returns a statement ID which can be used for polling with +\code{\link[=db_sql_exec_status]{db_sql_exec_status()}}. + +When set to \code{CANCEL}, the statement execution is canceled and the call +returns with a \code{CANCELED} state.} + +\item{host}{Databricks workspace URL, defaults to calling \code{\link[=db_host]{db_host()}}.} + +\item{token}{Databricks workspace token, defaults to calling \code{\link[=db_token]{db_token()}}.} + +\item{perform_request}{If \code{TRUE} (default) the request is performed, if +\code{FALSE} the httr2 request is returned \emph{without} being performed.} +} +\description{ +Execute SQL Query +} +\details{ +Refer to the +\href{https://docs.databricks.com/api/workspace/statementexecution/executestatement}{web documentation} +for detailed material on interaction of the various parameters and general recommendations +} +\seealso{ +Other SQL Execution APIs: +\code{\link{db_sql_exec_cancel}()}, +\code{\link{db_sql_exec_result}()}, +\code{\link{db_sql_exec_status}()} +} +\concept{SQL Execution APIs} diff --git a/man/db_sql_exec_result.Rd b/man/db_sql_exec_result.Rd new file mode 100644 index 0000000..4ce8f41 --- /dev/null +++ b/man/db_sql_exec_result.Rd @@ -0,0 +1,51 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/sql-query-execution.R +\name{db_sql_exec_result} +\alias{db_sql_exec_result} +\title{Get SQL Query Results} +\usage{ +db_sql_exec_result( + statement_id, + chunk_index, + host = db_host(), + token = db_token(), + perform_request = TRUE +) +} +\arguments{ +\item{statement_id}{String, query execution \code{statement_id}} + +\item{chunk_index}{Integer, chunk index to fetch result. Starts from \code{0}.} + +\item{host}{Databricks workspace URL, defaults to calling \code{\link[=db_host]{db_host()}}.} + +\item{token}{Databricks workspace token, defaults to calling \code{\link[=db_token]{db_token()}}.} + +\item{perform_request}{If \code{TRUE} (default) the request is performed, if +\code{FALSE} the httr2 request is returned \emph{without} being performed.} +} +\description{ +Get SQL Query Results +} +\details{ +After the statement execution has \code{SUCCEEDED}, this request can be used to +fetch any chunk by index. + +Whereas the first chunk with chunk_index = \code{0} is typically fetched with +\code{\link[=db_sql_exec_result]{db_sql_exec_result()}} or \code{\link[=db_sql_exec_status]{db_sql_exec_status()}}, this request can be used +to fetch subsequent chunks + +The response structure is identical to the nested result element described +in the \code{\link[=db_sql_exec_result]{db_sql_exec_result()}} request, and similarly includes the +\code{next_chunk_index} and \code{next_chunk_internal_link} fields for simple +iteration through the result set. + +\href{https://docs.databricks.com/api/workspace/statementexecution/getstatementresultchunkn}{Read more on Databricks API docs} +} +\seealso{ +Other SQL Execution APIs: +\code{\link{db_sql_exec_cancel}()}, +\code{\link{db_sql_exec_query}()}, +\code{\link{db_sql_exec_status}()} +} +\concept{SQL Execution APIs} diff --git a/man/db_sql_exec_status.Rd b/man/db_sql_exec_status.Rd new file mode 100644 index 0000000..12cb1f2 --- /dev/null +++ b/man/db_sql_exec_status.Rd @@ -0,0 +1,46 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/sql-query-execution.R +\name{db_sql_exec_status} +\alias{db_sql_exec_status} +\title{Get SQL Query Status} +\usage{ +db_sql_exec_status( + statement_id, + host = db_host(), + token = db_token(), + perform_request = TRUE +) +} +\arguments{ +\item{statement_id}{String, query execution \code{statement_id}} + +\item{host}{Databricks workspace URL, defaults to calling \code{\link[=db_host]{db_host()}}.} + +\item{token}{Databricks workspace token, defaults to calling \code{\link[=db_token]{db_token()}}.} + +\item{perform_request}{If \code{TRUE} (default) the request is performed, if +\code{FALSE} the httr2 request is returned \emph{without} being performed.} +} +\description{ +Get SQL Query Status +} +\details{ +This request can be used to poll for the statement's status. +When the \code{status.state} field is \code{SUCCEEDED} it will also return the result +manifest and the first chunk of the result data. + +When the statement is in the terminal states \code{CANCELED}, \code{CLOSED} or +\code{FAILED}, it returns HTTP \code{200} with the state set. + +After at least 12 hours in terminal state, the statement is removed from the +warehouse and further calls will receive an HTTP \code{404} response. + +\href{https://docs.databricks.com/api/workspace/statementexecution/getstatement}{Read more on Databricks API docs} +} +\seealso{ +Other SQL Execution APIs: +\code{\link{db_sql_exec_cancel}()}, +\code{\link{db_sql_exec_query}()}, +\code{\link{db_sql_exec_result}()} +} +\concept{SQL Execution APIs} diff --git a/man/determine_brickster_venv.Rd b/man/determine_brickster_venv.Rd new file mode 100644 index 0000000..e648452 --- /dev/null +++ b/man/determine_brickster_venv.Rd @@ -0,0 +1,15 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/databricks-helpers.R +\name{determine_brickster_venv} +\alias{determine_brickster_venv} +\title{Determine brickster virtualenv} +\usage{ +determine_brickster_venv() +} +\description{ +Determine brickster virtualenv +} +\details{ +Returns \code{NULL} when running within Databricks, +otherwise \code{"r-brickster"} +} diff --git a/man/install_db_sql_connector.Rd b/man/install_db_sql_connector.Rd new file mode 100644 index 0000000..0589e30 --- /dev/null +++ b/man/install_db_sql_connector.Rd @@ -0,0 +1,39 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/sql-connector.R +\name{install_db_sql_connector} +\alias{install_db_sql_connector} +\title{Install Databricks SQL Connector (Python)} +\usage{ +install_db_sql_connector( + envname = determine_brickster_venv(), + method = "auto", + ... +) +} +\arguments{ +\item{envname}{The name, or full path, of the environment in which Python +packages are to be installed. When \code{NULL} (the default), the active +environment as set by the \code{RETICULATE_PYTHON_ENV} variable will be used; +if that is unset, then the \code{r-reticulate} environment will be used.} + +\item{method}{Installation method. By default, "auto" automatically finds a +method that will work in the local environment. Change the default to force +a specific installation method. Note that the "virtualenv" method is not +available on Windows.} + +\item{...}{Additional arguments passed to \code{\link[reticulate:conda_install]{conda_install()}} +or \code{\link[reticulate:virtualenv_install]{virtualenv_install()}}.} +} +\description{ +Install Databricks SQL Connector (Python) +} +\details{ +Installs \href{https://github.com/databricks/databricks-sql-python}{\code{databricks-sql-connector}}. +Environemnt is resolved by \code{\link[=determine_brickster_venv]{determine_brickster_venv()}} which defaults to +\code{r-brickster} virtualenv. + +When running within Databricks it will use the existing python environment. +} +\examples{ +\dontrun{install_db_sql_connector()} +} diff --git a/man/lib_maven.Rd b/man/lib_maven.Rd index 3565f0c..6013a25 100644 --- a/man/lib_maven.Rd +++ b/man/lib_maven.Rd @@ -13,7 +13,7 @@ lib_maven(coordinates, repo = NULL, exclusions = NULL) \item{repo}{Maven repo to install the Maven package from. If omitted, both Maven Central Repository and Spark Packages are searched.} -\item{exclusions}{List of dependences to exclude. For example: +\item{exclusions}{List of dependencies to exclude. For example: \code{list("slf4j:slf4j", "*:hadoop-client")}. \href{https://maven.apache.org/guides/introduction/introduction-to-optional-and-excludes-dependencies.html}{Maven dependency exclusions}.} } diff --git a/man/py_db_sql_connector.Rd b/man/py_db_sql_connector.Rd new file mode 100644 index 0000000..87d6dc6 --- /dev/null +++ b/man/py_db_sql_connector.Rd @@ -0,0 +1,24 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/zzz.R +\docType{data} +\name{py_db_sql_connector} +\alias{py_db_sql_connector} +\title{Databricks SQL Connector (Python)} +\format{ +An object of class \code{python.builtin.module} (inherits from \code{python.builtin.object}) of length 0. +} +\usage{ +py_db_sql_connector +} +\description{ +Access the Databricks SQL connector from Python via +\code{{reticulate}}. +} +\details{ +This requires that the connector has been installed via +\code{\link[=install_db_sql_connector]{install_db_sql_connector()}}. + +For more documentation of the methods, refer to the +\href{https://github.com/databricks/databricks-sql-python}{python documentation}. +} +\keyword{datasets} diff --git a/tests/testthat/helper-skip.R b/tests/testthat/helper-skip.R index 743ef36..1e444a1 100644 --- a/tests/testthat/helper-skip.R +++ b/tests/testthat/helper-skip.R @@ -39,3 +39,14 @@ skip_unless_credentials_set <- function() { skip("Test only runs when credentials are available") } } + +skip_without_venv <- function(env) { + + env_available <- reticulate::virtualenv_exists(env) + + if (!env_available) { + skip("Test only runs when venv available") + } +} + + diff --git a/tests/testthat/test-auth.R b/tests/testthat/test-auth.R index ed6c640..72d1b1d 100644 --- a/tests/testthat/test-auth.R +++ b/tests/testthat/test-auth.R @@ -30,11 +30,11 @@ test_that("auth functions - baseline behaviour", { expect_identical( db_host(id = "mock", prefix = "dev-"), - "https://dev-mock.cloud.databricks.com" + "dev-mock.cloud.databricks.com" ) expect_identical( db_host(id = "mock"), - "https://mock.cloud.databricks.com" + "mock.cloud.databricks.com" ) }) @@ -144,6 +144,45 @@ test_that("auth functions - reading .databrickscfg", { }) + +test_that("auth functions - host handling", { + + expect_identical( + db_host(id = "mock", prefix = "dev-"), + "dev-mock.cloud.databricks.com" + ) + + expect_identical( + db_host(id = "mock"), + "mock.cloud.databricks.com" + ) + + expect_identical( + db_host(id = "mock", prefix = "dev-"), + "dev-mock.cloud.databricks.com" + ) + + # input and output pairs to check + hostname_mapping <- list( + "https://mock.cloud.databricks.com" = "mock.cloud.databricks.com", + "https://mock.cloud.databricks.com/" = "mock.cloud.databricks.com", + "http://mock.cloud.databricks.com" = "mock.cloud.databricks.com", + "mock.cloud.databricks.com" = "mock.cloud.databricks.com", + "mock.cloud.databricks.com/" = "mock.cloud.databricks.com", + "mock.cloud.databricks.com//" = "mock.cloud.databricks.com", + "://mock.cloud.databricks.com" = NULL, + "//mock.cloud.databricks.com" = "mock.cloud.databricks.com", + "tps://mock.cloud.databricks.com" = "mock.cloud.databricks.com" + ) + + purrr::iwalk(hostname_mapping, function(output, input) { + Sys.setenv("DATABRICKS_HOST" = input) + expect_no_error(db_host()) + expect_identical(db_host(), output) + }) + +}) + Sys.setenv("DATABRICKS_HOST" = existing_host) Sys.setenv("DATABRICKS_TOKEN" = existing_token) Sys.setenv("DATABRICKS_WSID" = existing_wsid) diff --git a/tests/testthat/test-databricks-helpers.R b/tests/testthat/test-databricks-helpers.R new file mode 100644 index 0000000..639cafd --- /dev/null +++ b/tests/testthat/test-databricks-helpers.R @@ -0,0 +1,13 @@ +test_that("databricks helpers - runtime detection", { + + Sys.setenv("DATABRICKS_RUNTIME_VERSION" = "") + expect_no_error(on_databricks()) + expect_identical(on_databricks(), FALSE) + expect_identical(determine_brickster_venv(), "r-brickster") + + Sys.setenv("DATABRICKS_RUNTIME_VERSION" = "14.0") + expect_no_error(on_databricks()) + expect_identical(on_databricks(), TRUE) + expect_null(determine_brickster_venv()) + +}) diff --git a/tests/testthat/test-request-helpers.R b/tests/testthat/test-request-helpers.R index 891e53e..b659717 100644 --- a/tests/testthat/test-request-helpers.R +++ b/tests/testthat/test-request-helpers.R @@ -1,6 +1,6 @@ test_that("request helpers - building requests", { - host <- "https://some_url" + host <- "some_url" token <- "some_token" endpoint <- "clusters/create" endpoint_version <- "2.0" @@ -13,7 +13,7 @@ test_that("request helpers - building requests", { method = method, version = endpoint_version, body = body, - host = paste0(host, "/"), + host = host, token = token ) }) @@ -21,11 +21,11 @@ test_that("request helpers - building requests", { expect_s3_class(req, "httr2_request") expect_identical( req$url, - paste(host, "api", endpoint_version, endpoint, sep = "/") + paste("https://", host, "/api/", endpoint_version, "/", endpoint, sep = "") ) expect_identical(req$method, method) expect_identical(req$body$data, body) - expect_equal(req$options$useragent, "brickster/1.0") + expect_no_error(req$options$useragent) expect_equal(req$policies$retry_max_tries, 3) expect_equal(req$headers$Authorization, paste("Bearer", token)) diff --git a/tests/testthat/test-sql-connector.R b/tests/testthat/test-sql-connector.R new file mode 100644 index 0000000..4d679a4 --- /dev/null +++ b/tests/testthat/test-sql-connector.R @@ -0,0 +1,91 @@ +skip_unless_credentials_set() + +test_that("SQL Connector Helpers", { + + expect_no_error({ + warehouse_path <- generate_http_path( + id = "123", + is_warehouse = TRUE, + workspace_id = "456" + ) + }) + + expect_no_error({ + cluster_path <- generate_http_path( + id = "123", + is_warehouse = TRUE, + workspace_id = "456" + ) + }) + + +}) + +skip_on_cran() +skip_unless_authenticated() +skip_unless_aws_workspace() +skip_without_venv(env = "r-brickster") + +test_that("SQL Connector", { + + # create a small serverless sql warehouse to issue queries against + expect_no_error({ + random_id <- sample.int(100000, 1) + test_warehouse <- db_sql_warehouse_create( + name = paste0("brickster_test_warehouse_", random_id), + cluster_size = "2X-Small", + warehouse_type = "PRO", + enable_serverless_compute = TRUE + ) + }) + + expect_no_error({ + client <- db_sql_client( + id = test_warehouse$id, + workspace_id = db_current_workspace_id() + ) + }) + + expect_no_error({ + res1 <- client$execute(operation = "select 1") + }) + expect_s3_class(res1, "tbl_df") + expect_identical(res1, tibble::tibble(`1` = 1L)) + + expect_no_error({ + catalogs <- client$catalogs() + }) + expect_s3_class(catalogs, "tbl_df") + + expect_no_error({ + schemas <- client$schemas(catalog_name = "main") + }) + expect_s3_class(catalogs, "tbl_df") + expect_identical(unique(schemas$TABLE_CATALOG)[1], "main") + + expect_no_error({ + tables <- client$tables( + catalog_name = "main", + schema_name = "information_schema" + ) + }) + expect_s3_class(tables, "tbl_df") + + expect_no_error({ + columns <- client$columns( + catalog_name = "samples", + schema_name = "nyctaxi", + table_name = "trips" + ) + }) + expect_s3_class(columns, "tbl_df") + + # cleanup/delete the warehouse used for testing + expect_no_error({ + db_sql_warehouse_delete( + id = test_warehouse$id + ) + }) + + +}) diff --git a/tests/testthat/test-sql-execution.R b/tests/testthat/test-sql-execution.R new file mode 100644 index 0000000..90d715b --- /dev/null +++ b/tests/testthat/test-sql-execution.R @@ -0,0 +1,103 @@ +skip_unless_credentials_set() + +test_that("SQL Execution API - don't perform", { + + resp_query <- db_sql_exec_query( + statement = "select 1", + warehouse_id = "some_warehouse_id", + perform_request = FALSE + ) + expect_s3_class(resp_query, "httr2_request") + + resp_cancel <- db_sql_exec_cancel( + statement_id = "some_statement_id", + perform_request = FALSE + ) + expect_s3_class(resp_cancel, "httr2_request") + + resp_result <- db_sql_exec_result( + statement_id = "some_statement_id", + chunk_index = 0, + perform_request = FALSE + ) + expect_s3_class(resp_result, "httr2_request") + + resp_status <- db_sql_exec_status( + statement_id = "some_statement_id", + perform_request = FALSE + ) + expect_s3_class(resp_status, "httr2_request") + +}) + +skip_on_cran() +skip_unless_authenticated() +skip_unless_aws_workspace() + +test_that("SQL Execution API", { + + # create a small serverless sql warehouse to issue queries against + expect_no_error({ + random_id <- sample.int(100000, 1) + test_warehouse <- db_sql_warehouse_create( + name = paste0("brickster_test_warehouse_", random_id), + cluster_size = "2X-Small", + warehouse_type = "PRO", + enable_serverless_compute = TRUE + ) + }) + + + expect_no_error({ + resp_query <- db_sql_exec_query( + statement = "select 1", + warehouse_id = test_warehouse$id, + wait_timeout = "0s" + ) + }) + expect_type(resp_query, "list") + expect_true(!is.null(resp_query$statement_id)) + expect_true(!is.null(resp_query$status$state)) + + expect_no_error({ + resp_status <- db_sql_exec_status( + statement_id = resp_query$statement_id + ) + }) + expect_type(resp_status, "list") + expect_true(!is.null(resp_query$statement_id)) + expect_true(!is.null(resp_query$status$state)) + + expect_no_error({ + # wait for results to be available + while (resp_status$status %in% c("PENDING", "RUNNING")) { + Sys.sleep(1) + resp_status <- db_sql_exec_status( + statement_id = resp_query$statement_id + ) + } + # get results + resp_result <- db_sql_exec_result( + statement_id = resp_query$statement_id, + chunk_index = 0 + ) + }) + expect_type(resp_status, "list") + expect_identical(resp_result$data_array[[1]][[1]], "1") + + expect_no_error({ + resp_cancel <- db_sql_exec_cancel( + statement_id = resp_query$statement_id + ) + }) + expect_type(resp_cancel, "list") + + # cleanup/delete the warehouse used for testing + expect_no_error({ + db_sql_warehouse_delete( + id = test_warehouse$id + ) + }) + + +})