Skip to content

Commit

Permalink
chore: add TestConnector
Browse files Browse the repository at this point in the history
  • Loading branch information
fengjiachun committed Jan 8, 2024
1 parent da9bec2 commit adb1463
Show file tree
Hide file tree
Showing 7 changed files with 115 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package io.greptime;

import io.greptime.models.WriteOk;
import io.greptime.options.GreptimeOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.math.BigDecimal;
Expand All @@ -34,16 +33,7 @@ public class StreamWritePOJOsQuickStart {
private static final Logger LOG = LoggerFactory.getLogger(StreamWritePOJOsQuickStart.class);

public static void main(String[] args) throws ExecutionException, InterruptedException {
// GreptimeDB has a default database named "greptime.public", we can use it as the test database
String database = "greptime.public";
// By default, GreptimeDB listens on port 4001 using the gRPC protocol.
// We can provide multiple endpoints that point to the same GreptimeDB cluster.
// The client will make calls to these endpoints based on a load balancing strategy.
String[] endpoints = {"127.0.0.1:4001"};
GreptimeOptions opts = GreptimeOptions.newBuilder(endpoints, database) //
.build();

GreptimeDB greptimeDB = GreptimeDB.create(opts);
GreptimeDB greptimeDB = TestConnector.connectToDefaultDB();

List<MyMetric1> myMetric1s = new ArrayList<>();
for (int i = 0; i < 10; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import io.greptime.models.Table;
import io.greptime.models.TableSchema;
import io.greptime.models.WriteOk;
import io.greptime.options.GreptimeOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.math.BigDecimal;
Expand All @@ -37,16 +36,7 @@ public class StreamWriteQuickStart {
private static final Logger LOG = LoggerFactory.getLogger(StreamWriteQuickStart.class);

public static void main(String[] args) throws ExecutionException, InterruptedException {
// GreptimeDB has a default database named "greptime.public", we can use it as the test database
String database = "greptime.public";
// By default, GreptimeDB listens on port 4001 using the gRPC protocol.
// We can provide multiple endpoints that point to the same GreptimeDB cluster.
// The client will make calls to these endpoints based on a load balancing strategy.
String[] endpoints = {"127.0.0.1:4001"};
GreptimeOptions opts = GreptimeOptions.newBuilder(endpoints, database) //
.build();

GreptimeDB greptimeDB = GreptimeDB.create(opts);
GreptimeDB greptimeDB = TestConnector.connectToDefaultDB();

TableSchema myMetric3Schema = TableSchema.newBuilder("my_metric3") //
.addColumn("tag1", SemanticType.Tag, DataType.String) //
Expand Down
93 changes: 93 additions & 0 deletions ingester-example/src/main/java/io/greptime/TestConnector.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright 2023 Greptime Team
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.greptime;

import io.greptime.common.util.SerializingExecutor;
import io.greptime.limit.LimitedPolicy;
import io.greptime.models.AuthInfo;
import io.greptime.options.GreptimeOptions;
import io.greptime.rpc.RpcOptions;

/**
* @author jiachun.fjc
*/
public class TestConnector {

public static GreptimeDB connectToDefaultDB() {
// GreptimeDB has a default database named "public" in the default catalog "greptime",
// we can use it as the test database
String database = "public";
// By default, GreptimeDB listens on port 4001 using the gRPC protocol.
// We can provide multiple endpoints that point to the same GreptimeDB cluster.
// The client will make calls to these endpoints based on a load balancing strategy.
String[] endpoints = {"127.0.0.1:4001"};
GreptimeOptions opts = GreptimeOptions.newBuilder(endpoints, database) //
// Optional, the default value is fine.
//
// Asynchronous thread pool, which is used to handle various asynchronous
// tasks in the SDK (You are using a purely asynchronous SDK). If you do not
// set it, there will be a default implementation, which you can reconfigure
// if the default implementation is not satisfied.
// The default implementation is: `SerializingExecutor`
.asyncPool(new SerializingExecutor("asyncPool"))
// Optional, the default value is fine.
//
// Sets the RPC options, in general, the default configuration is fine.
.rpcOptions(RpcOptions.newDefault())
// Optional, the default value is fine.
//
// In some case of failure, a retry of write can be attempted.
// The default is 1
.writeMaxRetries(1)
// Optional, the default value is fine.
//
// Write flow limit: maximum number of data rows in-flight.
// The default is 65536
.maxInFlightWriteRows(65536)
// Optional, the default value is fine.
//
// Write flow limit: the policy to use when the write flow limit is exceeded.
// All options:
// - `LimitedPolicy.DiscardPolicy`: discard the data if the limiter is full.
// - `LimitedPolicy.AbortPolicy`: abort if the limiter is full.
// - `LimitedPolicy.BlockingPolicy`: blocks if the limiter is full.
// - `LimitedPolicy.AbortOnBlockingTimeoutPolicy`: blocks the specified time if the limiter is full.
// The default is `LimitedPolicy.AbortOnBlockingTimeoutPolicy`
.writeLimitedPolicy(LimitedPolicy.defaultWriteLimitedPolicy())
// Optional, the default value is fine.
//
// The default rate limit for stream writer. It only takes effect when we do not specify the
// `maxPointsPerSecond` when creating a `StreamWriter`.
// The default is 10 * 65536
.defaultStreamMaxWritePointsPerSecond(10 * 65536)
// Optional, the default value is fine.
//
// Refresh frequency of route tables. The background refreshes all route tables
// periodically. By default, the route tables will not be refreshed.
.routeTableRefreshPeriodSeconds(-1)
// Sets authentication information. If the DB is not required to authenticate, we can ignore this.
.authInfo(new AuthInfo("username", "password"))
// Optional, the default value is fine.
//
// Sets the request router, The internal default implementation works well.
// You don't need to set it unless you have special requirements.
.router(null)
// A good start ^_^
.build();

return GreptimeDB.create(opts);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import io.greptime.models.Err;
import io.greptime.models.Result;
import io.greptime.models.WriteOk;
import io.greptime.options.GreptimeOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.math.BigDecimal;
Expand All @@ -37,16 +36,7 @@ public class WritePOJOsQuickStart {
private static final Logger LOG = LoggerFactory.getLogger(WritePOJOsQuickStart.class);

public static void main(String[] args) throws ExecutionException, InterruptedException {
// GreptimeDB has a default database named "greptime.public", we can use it as the test database
String database = "greptime.public";
// By default, GreptimeDB listens on port 4001 using the gRPC protocol.
// We can provide multiple endpoints that point to the same GreptimeDB cluster.
// The client will make calls to these endpoints based on a load balancing strategy.
String[] endpoints = {"127.0.0.1:4001"};
GreptimeOptions opts = GreptimeOptions.newBuilder(endpoints, database) //
.build();

GreptimeDB greptimeDB = GreptimeDB.create(opts);
GreptimeDB greptimeDB = TestConnector.connectToDefaultDB();

List<MyMetric1> myMetric1s = new ArrayList<>();
for (int i = 0; i < 10; i++) {
Expand Down
12 changes: 1 addition & 11 deletions ingester-example/src/main/java/io/greptime/WriteQuickStart.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import io.greptime.models.Table;
import io.greptime.models.TableSchema;
import io.greptime.models.WriteOk;
import io.greptime.options.GreptimeOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.math.BigDecimal;
Expand All @@ -42,16 +41,7 @@ public class WriteQuickStart {
private static final Logger LOG = LoggerFactory.getLogger(WriteQuickStart.class);

public static void main(String[] args) throws ExecutionException, InterruptedException {
// GreptimeDB has a default database named "greptime.public", we can use it as the test database
String database = "greptime.public";
// By default, GreptimeDB listens on port 4001 using the gRPC protocol.
// We can provide multiple endpoints that point to the same GreptimeDB cluster.
// The client will make calls to these endpoints based on a load balancing strategy.
String[] endpoints = {"127.0.0.1:4001"};
GreptimeOptions opts = GreptimeOptions.newBuilder(endpoints, database) //
.build();

GreptimeDB greptimeDB = GreptimeDB.create(opts);
GreptimeDB greptimeDB = TestConnector.connectToDefaultDB();

TableSchema myMetric3Schema = TableSchema.newBuilder("my_metric3") //
.addColumn("tag1", SemanticType.Tag, DataType.String) //
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public boolean init(GreptimeOptions opts) {
this.opts = GreptimeOptions.checkSelf(opts).copy();

if (Strings.isBlank(this.opts.getDatabase())) {
LOG.warn("The `database` is not specified, use default database: greptime.public");
LOG.warn("The `database` is not specified, use default (catalog-database): greptime-public");
}

this.routerClient = makeRouteClient(opts);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import io.greptime.common.Copiable;
import io.greptime.common.Endpoint;
import io.greptime.common.util.Ensures;
import io.greptime.common.util.Strings;
import io.greptime.limit.LimitedPolicy;
import io.greptime.models.AuthInfo;
import io.greptime.rpc.RpcOptions;
Expand Down Expand Up @@ -213,7 +212,13 @@ public Builder maxInFlightWriteRows(int maxInFlightWriteRows) {
}

/**
* Set write limited policy.
* Write flow limit: the policy to use when the write flow limit is exceeded.
* The options:
* - `LimitedPolicy.DiscardPolicy`: discard the data if the limiter is full.
* - `LimitedPolicy.AbortPolicy`: abort if the limiter is full.
* - `LimitedPolicy.BlockingPolicy`: blocks if the limiter is full.
* - `LimitedPolicy.AbortOnBlockingTimeoutPolicy`: blocks the specified time if the limiter is full.
* The default is `LimitedPolicy.AbortOnBlockingTimeoutPolicy`
*
* @param writeLimitedPolicy write limited policy
* @return this builder
Expand All @@ -224,7 +229,10 @@ public Builder writeLimitedPolicy(LimitedPolicy writeLimitedPolicy) {
}

/**
* The default rate limit for stream writer.
* The default rate limit for `StreamWriter`. It only takes effect when we do not specify the
* `maxPointsPerSecond` when creating a `StreamWriter`.
* The default is 10 * 65536
*
* @param defaultStreamMaxWritePointsPerSecond default max write points per second
* @return this builder
*/
Expand All @@ -235,7 +243,7 @@ public Builder defaultStreamMaxWritePointsPerSecond(int defaultStreamMaxWritePoi

/**
* Refresh frequency of route tables. The background refreshes all route tables
* periodically. By default, all route tables are refreshed every 30 seconds.
* periodically. By default, By default, the route tables will not be refreshed.
*
* @param routeTableRefreshPeriodSeconds refresh period for route tables cache
* @return this builder
Expand All @@ -246,7 +254,8 @@ public Builder routeTableRefreshPeriodSeconds(long routeTableRefreshPeriodSecond
}

/**
* Sets authentication information.
* Sets authentication information. If the DB is not required to authenticate,
* we can ignore this.
*
* @param authInfo the authentication information
* @return this builder
Expand All @@ -257,7 +266,9 @@ public Builder authInfo(AuthInfo authInfo) {
}

/**
* Sets the request router.
* Sets the request router, The internal default implementation works well.
* You don't need to set it unless you have special requirements.
*
* @param router the request router
* @return this builder
*/
Expand Down

0 comments on commit adb1463

Please sign in to comment.