Skip to content

Commit

Permalink
Merge pull request #2 from saint1991/poc/result
Browse files Browse the repository at this point in the history
Enable to receive query result
  • Loading branch information
saint1991 authored Oct 19, 2024
2 parents e72eabb + ccb1013 commit 3f655e7
Show file tree
Hide file tree
Showing 30 changed files with 1,506 additions and 617 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
target
gen
*.duckdb
*.duckdb
.env
datasets
5 changes: 3 additions & 2 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
"name":"Debug Python client",
"type":"debugpy",
"request":"launch",
"program":"${workspaceFolder}/client/client.py",
"program":"${workspaceFolder}/client/main.py",
"console":"integratedTerminal",
"env": {
"PYTHONPATH": "${workspaceFolder}/client/gen"
Expand All @@ -38,7 +38,8 @@
},
"args": [],
"env": {
"RUST_LOG": "info"
"RUST_LOG": "info",
"RUST_BACKTRACE": "1"
},
"cwd": "${workspaceFolder}"
}
Expand Down
8 changes: 8 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"protoc": {
"compile_on_save": false,
"options": [
"--proto_path=${workspaceFolder}/proto"
]
}
}
59 changes: 43 additions & 16 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 9 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,19 @@ edition = "2021"

[dependencies]
anyhow = { version = "1.0.89", features = ["backtrace", "std"] }
async-stream = { version = "0.3.5" }
duckdb = { version = "1.0.0", features = ["bundled"] }
async-stream = { version = "0.3.6" }
chrono = { version = "0.4.38" }
duckdb = { version = "1.0.0", features = ["bundled", "chrono"] }
env_logger = { version = "0.11.5" }
futures-core = { version = "0.3.30" }
futures-core = { version = "0.3.31" }
log = { version = "0.4.22" }
prost = { version = "0.13.2" }
prost = { version = "0.13.3" }
prost-types = { version = "0.13.3" }
thiserror = "1.0.64"
tokio = { version = "1.40.0", features = ["rt-multi-thread", "macros", "sync" ] }
tokio-stream = { version = "0.1.16" }
tonic = { version = "0.12.2" }
tonic = { version = "0.12.3" }


[build-dependencies]
tonic-build = { version = "0.12.2" }
tonic-build = { version = "0.12.3" }
2 changes: 1 addition & 1 deletion build.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
tonic_build::configure()
.build_client(false)
.compile(&["proto/service.proto"], &["proto"])?;
.compile_protos(&["proto/service.proto"], &["proto"])?;
Ok(())
}
4 changes: 2 additions & 2 deletions client/.gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
.env
gen
__pycache__
__pycache__
.venv
13 changes: 6 additions & 7 deletions client/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,22 +1,21 @@
# syntax=docker/dockerfile:1

FROM python:3.12-slim-bookworm AS grpc
FROM python:3.13-slim-bookworm AS grpc

RUN apt-get update -y \
&& apt-get install -y curl

ENV POETRY_HOME=/etc/poetry
RUN curl -sSL https://install.python-poetry.org | python3 -
ENV PATH=${POETRY_HOME}/bin:${PATH}
RUN curl -LsSf https://astral.sh/uv/install.sh | sh
ENV PATH=/root/.cargo/bin:${PATH}

RUN --mount=type=bind,source=client/pyproject.toml,target=/pyproject.toml \
--mount=type=bind,source=client/poetry.lock,target=/poetry.lock \
poetry install --only=main
--mount=type=bind,source=client/uv.lock,target=/uv.lock \
uv pip install --system -r /pyproject.toml

RUN --mount=type=bind,source=client/pyproject.toml,target=/pyproject.toml \
--mount=type=bind,source=proto,target=/proto \
mkdir -p /gen \
&& poetry run python -m grpc_tools.protoc -I/proto --python_out=/gen --grpc_python_out=/gen /proto/*.proto
&& python -m grpc_tools.protoc -I/proto --python_out=/gen --grpc_python_out=/gen /proto/*.proto


FROM scratch AS grpc-out
Expand Down
2 changes: 2 additions & 0 deletions client/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
## Python client

Empty file added client/gduck/__init__.py
Empty file.
55 changes: 33 additions & 22 deletions client/client.py → client/gduck/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,20 @@

import threading
from dataclasses import dataclass
from pathlib import Path
from queue import SimpleQueue
from types import TracebackType
from typing import Literal, Self
from typing import Self

import grpc
from error_pb2 import Error
from grpc._channel import _MultiThreadedRendezvous
from message_pb2 import Request
from query_pb2 import Query
from service_pb2 import Request, Response
from service_pb2_grpc import DbServiceStub

ConnectionMode = Literal["auto", "read_write", "read_only"]
from .request import ConnectionMode, Value, connect, ctas, execute, local_file, parquet, request, rows, value
from .response import parse_location, parse_rows, parse_value


@dataclass(frozen=True)
Expand Down Expand Up @@ -49,7 +53,10 @@ def __init__(self, responses: _MultiThreadedRendezvous, out: SimpleQueue, group:
def run(self) -> None:
try:
for response in self._responses:
self._out.put(response.result)
if response.HasField("success"):
self._out.put(response.success)
elif response.HasField("error"):
self._out.put(response.error)
except _MultiThreadedRendezvous as e:
if e.code() != grpc.StatusCode.CANCELLED:
raise e
Expand All @@ -74,10 +81,29 @@ def _request_generator(self):
while (request := self._requests.get()) != self._END_STREAM:
yield request

def query(self, query: str) -> None:
self._requests.put(self._query_request(query))
def _query(self, query: Query) -> Response.QueryResult | Error:
self._requests.put(request(query))
return self._results.get()

def execute(self, query: str, *params: tuple[Value]) -> None:
self._query(execute(query, *params))

def query_value(self, query: str, *params: tuple[Value]) -> Value:
result = self._query(value(query, *params))
return parse_value(result.value)

def query_rows(self, query: str, *params: tuple[Value]) -> Value:
result = self._query(rows(query, *params))
_, r = parse_rows(result.rows)
return r

def ctas(self, table_name: str, query: str, *params: tuple[Value]) -> None:
self._query(ctas(table_name, query, *params))

def local_parquet(self, file: Path, query: str, *params: tuple[Value]) -> Path:
result = self._query(parquet(local_file(file), query, *params))
return parse_location(result.parquet_file)

def __enter__(self) -> Self:
self._channel = grpc.insecure_channel(target=str(self._addr))

Expand All @@ -93,20 +119,5 @@ def __exit__(self, exc_type: type, exc_value: Exception, traceback: TracebackTyp
self._channel.close()
return False

@property
def mode(self) -> Request.Connect.Mode:
if self._mode == "auto":
return Request.Connect.Mode.MODE_AUTO
elif self._mode == "read_write":
return Request.Connect.Mode.MODE_READ_WRITE
elif self._mode == "read_only":
return Request.Connect.Mode.MODE_READ_ONLY
else:
raise ValueError(f"Unknown mode: {self._mode}")

def _connect_request(self) -> Request:
return Request(connect=Request.Connect(file_name=self._database_file, mode=self.mode))

@staticmethod
def _query_request(query: str) -> Request:
return Request(query=Request.Query(query=query))
return request(kind=connect(file_name=self._database_file, mode=self._mode))
Loading

0 comments on commit 3f655e7

Please sign in to comment.