Skip to content

Commit

Permalink
refactor(sql): make compilers usable with a base install (ibis-projec…
Browse files Browse the repository at this point in the history
  • Loading branch information
cpcloud authored Aug 5, 2024
1 parent 597817f commit 84a786d
Show file tree
Hide file tree
Showing 54 changed files with 733 additions and 629 deletions.
3 changes: 1 addition & 2 deletions ibis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ def load_backend(name: str) -> BaseBackend:
# - compile
# - has_operation
# - _from_url
# - _to_sqlglot
#
# We also copy over the docstring from `do_connect` to the proxy `connect`
# method, since that's where all the backend-specific kwargs are currently
Expand All @@ -121,7 +120,7 @@ def connect(*args, **kwargs):
proxy.has_operation = backend.has_operation
proxy.name = name
proxy._from_url = backend._from_url
proxy._to_sqlglot = backend._to_sqlglot

# Add any additional methods that should be exposed at the top level
for attr in getattr(backend, "_top_level_methods", ()):
setattr(proxy, attr, getattr(backend, attr))
Expand Down
13 changes: 0 additions & 13 deletions ibis/backends/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1032,14 +1032,9 @@ def _register_in_memory_table(self, op: ops.InMemoryTable):

def _run_pre_execute_hooks(self, expr: ir.Expr) -> None:
"""Backend-specific hooks to run before an expression is executed."""
self._define_udf_translation_rules(expr)
self._register_udfs(expr)
self._register_in_memory_tables(expr)

def _define_udf_translation_rules(self, expr: ir.Expr):
if self.supports_python_udfs:
raise NotImplementedError(self.name)

def compile(
self,
expr: ir.Expr,
Expand All @@ -1048,14 +1043,6 @@ def compile(
"""Compile an expression."""
return self.compiler.to_sql(expr, params=params)

def _to_sqlglot(self, expr: ir.Expr, **kwargs) -> sg.exp.Expression:
"""Convert an Ibis expression to a sqlglot expression.
Called by `ibis.to_sql`; gives the backend an opportunity to generate
nicer SQL for human consumption.
"""
raise NotImplementedError(f"Backend '{self.name}' backend doesn't support SQL")

def execute(self, expr: ir.Expr) -> Any:
"""Execute an expression."""

Expand Down
157 changes: 19 additions & 138 deletions ibis/backends/bigquery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from pydata_google_auth import cache

import ibis
import ibis.backends.sql.compilers as sc
import ibis.common.exceptions as com
import ibis.expr.operations as ops
import ibis.expr.schema as sch
Expand All @@ -32,9 +33,7 @@
schema_from_bigquery_table,
)
from ibis.backends.bigquery.datatypes import BigQuerySchema
from ibis.backends.bigquery.udf.core import PythonToJavaScriptTranslator
from ibis.backends.sql import SQLBackend
from ibis.backends.sql.compilers import BigQueryCompiler
from ibis.backends.sql.datatypes import BigQueryType

if TYPE_CHECKING:
Expand Down Expand Up @@ -150,7 +149,7 @@ def _force_quote_table(table: sge.Table) -> sge.Table:

class Backend(SQLBackend, CanCreateDatabase, CanCreateSchema):
name = "bigquery"
compiler = BigQueryCompiler()
compiler = sc.bigquery.compiler
supports_in_memory_tables = True
supports_python_udfs = False

Expand Down Expand Up @@ -652,68 +651,6 @@ def _get_schema_using_query(self, query: str) -> sch.Schema:
)
return BigQuerySchema.to_ibis(job.schema)

def _to_sqlglot(
self,
expr: ir.Expr,
limit: str | None = None,
params: Mapping[ir.Expr, Any] | None = None,
**kwargs,
) -> Any:
"""Compile an Ibis expression.
Parameters
----------
expr
Ibis expression
limit
For expressions yielding result sets; retrieve at most this number
of values/rows. Overrides any limit already set on the expression.
params
Named unbound parameters
kwargs
Keyword arguments passed to the compiler
Returns
-------
Any
The output of compilation. The type of this value depends on the
backend.
"""
self._define_udf_translation_rules(expr)
sql = super()._to_sqlglot(expr, limit=limit, params=params, **kwargs)

table_expr = expr.as_table()
geocols = [
name for name, typ in table_expr.schema().items() if typ.is_geospatial()
]

query = sql.transform(
_qualify_memtable,
dataset=getattr(self._session_dataset, "dataset_id", None),
project=getattr(self._session_dataset, "project", None),
).transform(_remove_null_ordering_from_unsupported_window)

if not geocols:
return query

# if there are any geospatial columns, we have to convert them to WKB,
# so interactive mode knows how to display them
#
# by default bigquery returns data to python as WKT, and there's really
# no point in supporting both if we don't need to.
compiler = self.compiler
quoted = compiler.quoted
f = compiler.f
return sg.select(
sge.Star(
replace=[
f.st_asbinary(sg.column(col, quoted=quoted)).as_(col, quoted=quoted)
for col in geocols
]
)
).from_(query.subquery())

def raw_sql(self, query: str, params=None, page_size: int | None = None):
query_parameters = [
bigquery_param(
Expand Down Expand Up @@ -747,19 +684,25 @@ def current_database(self) -> str | None:
return self.dataset

def compile(
self, expr: ir.Expr, limit: str | None = None, params=None, **kwargs: Any
self,
expr: ir.Expr,
limit: str | None = None,
params=None,
pretty: bool = True,
**kwargs: Any,
):
"""Compile an Ibis expression to a SQL string."""
query = self._to_sqlglot(expr, limit=limit, params=params, **kwargs)
udf_sources = []
for udf_node in expr.op().find(ops.ScalarUDF):
compile_func = getattr(
self, f"_compile_{udf_node.__input_type__.name.lower()}_udf"
)
if sql := compile_func(udf_node):
udf_sources.append(sql.sql(self.name, pretty=True))

sql = ";\n".join([*udf_sources, query.sql(dialect=self.name, pretty=True)])
session_dataset = self._session_dataset
query = self.compiler.to_sqlglot(
expr,
limit=limit,
params=params,
session_dataset_id=getattr(session_dataset, "dataset_id", None),
session_project=getattr(session_dataset, "project", None),
**kwargs,
)
queries = query if isinstance(query, list) else [query]
sql = ";\n".join(query.sql(self.dialect, pretty=pretty) for query in queries)
self._log(sql)
return sql

Expand Down Expand Up @@ -1202,68 +1145,6 @@ def _clean_up_cached_table(self, name):
force=True,
)

def _get_udf_source(self, udf_node: ops.ScalarUDF):
name = type(udf_node).__name__
type_mapper = self.compiler.udf_type_mapper

body = PythonToJavaScriptTranslator(udf_node.__func__).compile()
config = udf_node.__config__
libraries = config.get("libraries", [])

signature = [
sge.ColumnDef(
this=sg.to_identifier(name, quoted=self.compiler.quoted),
kind=type_mapper.from_ibis(param.annotation.pattern.dtype),
)
for name, param in udf_node.__signature__.parameters.items()
]

lines = ['"""']

if config.get("strict", True):
lines.append('"use strict";')

lines += [
body,
"",
f"return {udf_node.__func_name__}({', '.join(udf_node.argnames)});",
'"""',
]

func = sge.Create(
kind="FUNCTION",
this=sge.UserDefinedFunction(
this=sg.to_identifier(name), expressions=signature, wrapped=True
),
# not exactly what I had in mind, but it works
#
# quoting is too simplistic to handle multiline strings
expression=sge.Var(this="\n".join(lines)),
exists=False,
properties=sge.Properties(
expressions=[
sge.TemporaryProperty(),
sge.ReturnsProperty(this=type_mapper.from_ibis(udf_node.dtype)),
sge.StabilityProperty(
this="IMMUTABLE" if config.get("determinism") else "VOLATILE"
),
sge.LanguageProperty(this=sg.to_identifier("js")),
]
+ [
sge.Property(
this=sg.to_identifier("library"),
value=self.compiler.f.array(*libraries),
)
]
* bool(libraries)
),
)

return func

def _compile_python_udf(self, udf_node: ops.ScalarUDF) -> None:
return self._get_udf_source(udf_node)

def _register_udfs(self, expr: ir.Expr) -> None:
"""No op because UDFs made with CREATE TEMPORARY FUNCTION must be followed by a query."""

Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

15 changes: 1 addition & 14 deletions ibis/backends/bigquery/tests/system/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,11 +199,9 @@ def test_parted_column(con, kind):
assert t.columns == [expected_column, "string_col", "int_col"]


def test_cross_project_query(public, snapshot):
def test_cross_project_query(public):
table = public.table("posts_questions")
expr = table[table.tags.contains("ibis")][["title", "tags"]]
result = expr.compile()
snapshot.assert_match(result, "out.sql")
n = 5
df = expr.limit(n).execute()
assert len(df) == n
Expand All @@ -226,17 +224,6 @@ def test_exists_table_different_project(con):
assert "foobar" not in con.list_tables(database=dataset)


def test_multiple_project_queries(con, snapshot):
so = con.table(
"posts_questions",
database=("bigquery-public-data", "stackoverflow"),
)
trips = con.table("trips", database="nyc-tlc.yellow")
join = so.join(trips, so.tags == trips.rate_code)[[so.title]]
result = join.compile()
snapshot.assert_match(result, "out.sql")


def test_multiple_project_queries_execute(con):
posts_questions = con.table(
"posts_questions", database="bigquery-public-data.stackoverflow"
Expand Down
5 changes: 4 additions & 1 deletion ibis/backends/bigquery/tests/unit/udf/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@

import pytest

from ibis.backends.bigquery.udf.core import PythonToJavaScriptTranslator, SymbolTable
from ibis.backends.sql.compilers.bigquery.udf.core import (
PythonToJavaScriptTranslator,
SymbolTable,
)


def test_symbol_table():
Expand Down
2 changes: 1 addition & 1 deletion ibis/backends/bigquery/tests/unit/udf/test_find.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import ast

from ibis.backends.bigquery.udf.find import find_names
from ibis.backends.sql.compilers.bigquery.udf.find import find_names
from ibis.util import is_iterable


Expand Down
8 changes: 4 additions & 4 deletions ibis/backends/clickhouse/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from clickhouse_connect.driver.external import ExternalData

import ibis
import ibis.backends.sql.compilers as sc
import ibis.common.exceptions as com
import ibis.config
import ibis.expr.operations as ops
Expand All @@ -26,7 +27,6 @@
from ibis.backends import BaseBackend, CanCreateDatabase
from ibis.backends.clickhouse.converter import ClickHousePandasData
from ibis.backends.sql import SQLBackend
from ibis.backends.sql.compilers import ClickHouseCompiler
from ibis.backends.sql.compilers.base import C

if TYPE_CHECKING:
Expand All @@ -44,7 +44,7 @@ def _to_memtable(v):

class Backend(SQLBackend, CanCreateDatabase):
name = "clickhouse"
compiler = ClickHouseCompiler()
compiler = sc.clickhouse.compiler

# ClickHouse itself does, but the client driver does not
supports_temporary_tables = False
Expand Down Expand Up @@ -732,7 +732,7 @@ def create_table(
expression = None

if obj is not None:
expression = self._to_sqlglot(obj)
expression = self.compiler.to_sqlglot(obj)
external_tables.update(self._collect_in_memory_tables(obj))

code = sge.Create(
Expand All @@ -759,7 +759,7 @@ def create_view(
database: str | None = None,
overwrite: bool = False,
) -> ir.Table:
expression = self._to_sqlglot(obj)
expression = self.compiler.to_sqlglot(obj)
src = sge.Create(
this=sg.table(name, db=database),
kind="VIEW",
Expand Down
Loading

0 comments on commit 84a786d

Please sign in to comment.