Skip to content

Commit f899eec

Browse files
committed
initial cockroachdb support for node
1 parent 0a3f64e commit f899eec

12 files changed

+567
-129
lines changed

.github/workflows/main.yml

+3-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ jobs:
2525
options: --health-cmd="pg_isready" --health-interval=10s --health-timeout=5s --health-retries=3
2626
strategy:
2727
matrix:
28-
2928
haxe-version:
3029
- 4.0.5
3130
- 4.1.5
@@ -34,6 +33,9 @@ jobs:
3433
target:
3534
- node
3635
- php
36+
env:
37+
# excluded CockroachDb here because it is hard to create a GitHub Actions service with command/entrypoint override
38+
TEST_DB_TYPES: MySql,PostgreSql,Sqlite
3739
steps:
3840
- uses: actions/checkout@v2
3941
- uses: actions/setup-node@v1

Earthfile

+2-2
Original file line numberDiff line numberDiff line change
@@ -130,15 +130,15 @@ test-base:
130130

131131
test-node:
132132
FROM +test-base
133-
ARG TEST_DB_TYPES=MySql,PostgreSql,Sqlite
133+
ARG TEST_DB_TYPES=MySql,PostgreSql,CockroachDb,Sqlite
134134
ENV TEST_DB_TYPES="$TEST_DB_TYPES"
135135
WITH DOCKER --compose tests/docker-compose.yml
136136
RUN npm run test node
137137
END
138138

139139
test-php:
140140
FROM +test-base
141-
ARG TEST_DB_TYPES=MySql,PostgreSql,Sqlite
141+
ARG TEST_DB_TYPES=MySql,PostgreSql,CockroachDb,Sqlite
142142
ENV TEST_DB_TYPES="$TEST_DB_TYPES"
143143
WITH DOCKER --compose tests/docker-compose.yml
144144
RUN npm run test php

src/tink/sql/Driver.hx

+1
Original file line numberDiff line numberDiff line change
@@ -9,5 +9,6 @@ interface Driver {
99
enum DriverType {
1010
MySql;
1111
PostgreSql;
12+
CockroachDb;
1213
Sqlite;
1314
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package tink.sql.drivers;
2+
3+
typedef CockroachDbSettings = {
4+
@:optional var host(default, null):String;
5+
@:optional var port(default, null):Int;
6+
var user(default, null):String;
7+
var password(default, null):String;
8+
}
+197
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
package tink.sql.drivers.node;
2+
3+
import haxe.Int64;
4+
import js.node.stream.Readable.Readable;
5+
import haxe.DynamicAccess;
6+
import haxe.io.Bytes;
7+
import tink.sql.Query;
8+
import tink.sql.Info;
9+
import tink.sql.Types;
10+
import tink.sql.format.Sanitizer;
11+
import tink.streams.Stream;
12+
import tink.sql.format.CockroachDbFormatter;
13+
import tink.sql.drivers.node.PostgreSql;
14+
import tink.sql.drivers.node.externs.PostgreSql;
15+
16+
import #if haxe3 js.lib.Error #else js.Error #end as JsError;
17+
18+
using tink.CoreApi;
19+
20+
class CockroachDb implements Driver {
21+
public final type:Driver.DriverType = CockroachDb;
22+
23+
final settings:PostgreSqlNodeSettings;
24+
25+
public function new(settings) {
26+
this.settings = settings;
27+
}
28+
29+
public function open<Db>(name:String, info:DatabaseInfo):Connection.ConnectionPool<Db> {
30+
final pool = new Pool({
31+
user: settings.user,
32+
password: settings.password,
33+
host: settings.host,
34+
port: switch (settings.port) {
35+
case null: 26257;
36+
case v: v;
37+
},
38+
ssl: settings.ssl,
39+
max: switch settings.max {
40+
case null: 1;
41+
case v: v;
42+
},
43+
database: name,
44+
});
45+
46+
return new CockroachDbConnectionPool(info, pool);
47+
}
48+
}
49+
50+
class CockroachDbConnectionPool<Db> implements Connection.ConnectionPool<Db> {
51+
final pool:Pool;
52+
final info:DatabaseInfo;
53+
final formatter:CockroachDbFormatter;
54+
final parser:PostgreSqlResultParser<Db>;
55+
final streamBatch:Int = 50;
56+
57+
public function new(info, pool) {
58+
this.info = info;
59+
this.pool = pool;
60+
this.formatter = new CockroachDbFormatter();
61+
this.parser = new PostgreSqlResultParser();
62+
}
63+
64+
public function getFormatter()
65+
return formatter;
66+
67+
public function execute<Result>(query:Query<Db, Result>):Result {
68+
final cnx = getNativeConnection();
69+
return new CockroachDbConnection(info, cnx, true).execute(query);
70+
}
71+
72+
public function executeSql(sql:String):tink.core.Promise<tink.core.Noise> {
73+
final cnx = getNativeConnection();
74+
return new CockroachDbConnection(info, cnx, true).executeSql(sql);
75+
}
76+
77+
public function isolate():Pair<Connection<Db>, CallbackLink> {
78+
final cnx = getNativeConnection();
79+
return new Pair(
80+
(new CockroachDbConnection(info, cnx, false):Connection<Db>),
81+
(() -> cnx.handle(o -> switch o {
82+
case Success(native): native.release();
83+
case Failure(_): // nothing to do
84+
}):CallbackLink)
85+
);
86+
}
87+
88+
function getNativeConnection() {
89+
return new Promise((resolve, reject) -> {
90+
var cancelled = false;
91+
pool.connect().then(
92+
client -> {
93+
if(cancelled)
94+
client.release();
95+
else
96+
resolve(client);
97+
},
98+
err -> reject(Error.ofJsError(err))
99+
);
100+
() -> cancelled = true; // there is no mechanism to undo connect, so we set a flag and release the client as soon as it is obtained
101+
});
102+
}
103+
}
104+
105+
class CockroachDbConnection<Db> implements Connection<Db> implements Sanitizer {
106+
final client:Promise<Client>;
107+
final info:DatabaseInfo;
108+
final formatter:CockroachDbFormatter;
109+
final parser:PostgreSqlResultParser<Db>;
110+
final streamBatch:Int = 50;
111+
final autoRelease:Bool;
112+
113+
public function new(info, client, autoRelease) {
114+
this.info = info;
115+
this.client = client;
116+
this.formatter = new CockroachDbFormatter();
117+
this.parser = new PostgreSqlResultParser();
118+
this.autoRelease = autoRelease;
119+
}
120+
121+
public function value(v:Any):String {
122+
if (Int64.isInt64(v))
123+
return Int64.toStr(v);
124+
if (Std.is(v, Date))
125+
return '(${(v : Date).getTime() / 1000})::timestamp';
126+
if (Std.is(v, String))
127+
return Client.escapeLiteral(v);
128+
if (Std.is(v, Bytes))
129+
return "'\\x" + (cast v:Bytes).toHex() + "'";
130+
131+
return v;
132+
}
133+
134+
public function ident(s:String):String
135+
return Client.escapeIdentifier(s);
136+
137+
public function getFormatter()
138+
return formatter;
139+
140+
function toError<A>(error:JsError):Outcome<A, Error>
141+
return Failure(Error.withData(error.message, error));
142+
143+
public function execute<Result>(query:Query<Db,Result>):Result {
144+
inline function fetch() return run(queryOptions(query));
145+
return switch query {
146+
case Select(_) | Union(_):
147+
final parse:DynamicAccess<Any>->{} = parser.queryParser(query, formatter.isNested(query));
148+
stream(queryOptions(query)).map(parse);
149+
case Insert(_):
150+
fetch().next(res -> res.rows.length > 0 ? Promise.resolve(new Id(res.rows[0][0])) : (Promise.NOISE:Promise<Dynamic>));
151+
case Update(_):
152+
fetch().next(res -> {rowsAffected: res.rowCount});
153+
case Delete(_):
154+
fetch().next(res -> {rowsAffected: res.rowCount});
155+
case Transaction(_) | CreateTable(_, _) | DropTable(_) | AlterTable(_, _) | TruncateTable(_):
156+
fetch().next(r -> Noise);
157+
case _:
158+
throw query.getName() + " has not been implemented";
159+
}
160+
}
161+
162+
function queryOptions(query:Query<Db, Dynamic>): QueryOptions {
163+
final sql = formatter.format(query).toString(this);
164+
#if sql_debug
165+
trace(sql);
166+
#end
167+
return switch query {
168+
case Insert(_):
169+
{text: sql, rowMode: "array"};
170+
default:
171+
{text: sql};
172+
}
173+
}
174+
175+
176+
function stream(options: QueryOptions):Stream<Any, Error> {
177+
// TODO: use the 'row' event for streaming
178+
return client.next(
179+
client -> client.query(options)
180+
.toPromise()
181+
// don't use `Stream.ofIterator`, which may cause a `RangeError: Maximum call stack size exceeded` for large results
182+
.next(r -> Stream.ofNodeStream(r.command, Readable.from(cast r.rows), {onEnd: autoRelease ? () -> client.release() : null}))
183+
);
184+
}
185+
186+
function run(options: QueryOptions):Promise<Result>
187+
return client.next(
188+
client -> client.query(options)
189+
.toPromise()
190+
.asFuture()
191+
.withSideEffect(_ -> if(autoRelease) client.release())
192+
);
193+
194+
public function executeSql(sql:String):tink.core.Promise<tink.core.Noise> {
195+
return run({text: sql});
196+
}
197+
}

src/tink/sql/drivers/node/PostgreSql.hx

+1-122
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,8 @@
11
package tink.sql.drivers.node;
22

33
import haxe.Int64;
4-
import haxe.Constraints.Function;
54
import js.node.stream.Readable.Readable;
6-
import js.node.events.EventEmitter;
7-
import js.node.Buffer;
8-
import js.node.tls.SecureContext;
95
import haxe.DynamicAccess;
10-
import haxe.extern.EitherType;
116
import haxe.io.Bytes;
127
import tink.sql.Query;
138
import tink.sql.Info;
@@ -16,10 +11,9 @@ import tink.sql.Expr;
1611
import tink.sql.format.Sanitizer;
1712
import tink.streams.Stream;
1813
import tink.sql.format.PostgreSqlFormatter;
19-
import tink.sql.expr.ExprTyper;
2014
import tink.sql.parse.ResultParser;
21-
import js.lib.Promise as JsPromise;
2215
using tink.CoreApi.JsPromiseTools;
16+
import tink.sql.drivers.node.externs.PostgreSql;
2317

2418
import #if haxe3 js.lib.Error #else js.Error #end as JsError;
2519

@@ -234,118 +228,3 @@ class PostgreSqlConnection<Db> implements Connection<Db> implements Sanitizer {
234228
return run({text: sql});
235229
}
236230
}
237-
238-
private typedef TypeParsers = {
239-
function getTypeParser(dataTypeID:Int, format:String):String->Dynamic;
240-
}
241-
242-
typedef PostgresSslConfig = haxe.extern.EitherType<Bool, {
243-
?rejectUnauthorized:Bool,
244-
?sslca:String,
245-
?sslkey:String,
246-
?sslcert:String,
247-
?sslrootcert:String,
248-
}>;
249-
250-
private typedef ClientConfig = {
251-
?user:String,
252-
?host:String,
253-
?database:String,
254-
?password:String,
255-
?port:Int,
256-
?connectionString:String,
257-
?ssl:PostgresSslConfig,
258-
?types:TypeParsers,
259-
?statement_timeout:Int,
260-
?query_timeout:Int,
261-
?connectionTimeoutMillis:Int,
262-
?idle_in_transaction_session_timeout:Int,
263-
}
264-
265-
private typedef PoolConfig = {
266-
>ClientConfig,
267-
?connectionTimeoutMillis:Int,
268-
?idleTimeoutMillis:Int,
269-
?max:Int,
270-
}
271-
272-
private typedef QueryOptions = {
273-
text:String,
274-
?values:Array<Dynamic>,
275-
?name:String,
276-
?rowMode:String,
277-
?types:TypeParsers,
278-
}
279-
280-
private typedef Submittable = {
281-
function submit(connection:Dynamic):Void;
282-
}
283-
284-
@:jsRequire("pg")
285-
private extern class Pg {
286-
static public var types(default, null):{
287-
public function setTypeParser(oid:Int, parser:String->Dynamic):Void;
288-
public function getTypeParser(oid:Int, format:String):String->Dynamic;
289-
}
290-
}
291-
292-
// https://node-postgres.com/api/pool
293-
@:jsRequire("pg", "Pool")
294-
private extern class Pool extends EventEmitter<Pool> {
295-
public function new(?config:PoolConfig):Void;
296-
public function connect():JsPromise<Client>;
297-
@:overload(function(config:QueryOptions):JsPromise<Result>{})
298-
@:overload(function<S:Submittable>(s:S):S{})
299-
public function query(sql:String, ?values:Dynamic):JsPromise<Result>;
300-
public function end():JsPromise<Void>;
301-
public var totalCount:Int;
302-
public var idleCount:Int;
303-
public var waitingCount:Int;
304-
}
305-
306-
// https://node-postgres.com/api/client
307-
@:jsRequire("pg", "Client")
308-
private extern class Client extends EventEmitter<Client> {
309-
public function new(?config:ClientConfig):Void;
310-
public function connect():JsPromise<Void>;
311-
@:overload(function(config:QueryOptions):JsPromise<Result>{})
312-
@:overload(function<S:Submittable>(s:S):S{})
313-
public function query(sql:String, ?values:Dynamic):JsPromise<Result>;
314-
public function end():JsPromise<Void>;
315-
public function release(?err:Dynamic):JsPromise<Dynamic>;
316-
public function escapeIdentifier(str:String):String;
317-
public function escapeLiteral(str:String):String;
318-
319-
inline static public function escapeIdentifier(str:String):String return untyped Client.prototype.escapeIdentifier(str);
320-
inline static public function escapeLiteral(str:String):String return untyped Client.prototype.escapeLiteral(str);
321-
}
322-
323-
// https://node-postgres.com/api/result
324-
@:jsRequire("pg", "Result")
325-
private extern class Result {
326-
public var rows:Array<Dynamic>;
327-
public var fields:Array<{
328-
name:String,
329-
}>;
330-
public var command:String;
331-
public var rowCount:Int;
332-
}
333-
334-
#if false // not used
335-
// https://node-postgres.com/api/cursor
336-
@:jsRequire("pg-cursor")
337-
private extern class Cursor extends EventEmitter<Cursor> {
338-
public function new(text:String, values:Dynamic, ?config:{
339-
?rowMode:String,
340-
?types:TypeParsers,
341-
}):Void;
342-
public function read(rowCount:Int, callback:JsError->Array<Dynamic>->Result->Void):Void;
343-
public function close(?cb:?JsError->Void):Void;
344-
public function submit(connection:Dynamic):Void;
345-
}
346-
#end
347-
348-
private typedef GeoJSONOptions = {
349-
?shortCrs:Bool,
350-
?longCrs:Bool,
351-
}

0 commit comments

Comments
 (0)