Skip to content

Commit

Permalink
code improve
Browse files Browse the repository at this point in the history
  • Loading branch information
wgzhao committed Sep 11, 2023
1 parent a475602 commit 0173702
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 19 deletions.
1 change: 1 addition & 0 deletions docs/assets/jobs/doriswriter.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"username": "test",
"password": "123456",
"batchSize": 1024,
"column": ["siteid", "citycode", "username", "pv"],
"connection": [
{
"table": "table1",
Expand Down
34 changes: 19 additions & 15 deletions docs/writer/doriswriter.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Doris Writer

DorisWriter 插件用于向 [Doris](http://doris.incubator.apache.org/master/zh-CN/) 数据库以流式方式写入数据。 其实现上是通过访问 Doris http 连接(8030)
DorisWriter 插件用于向 [Doris](http://doris.incubator.apache.org/master/zh-CN/) 数据库以流式方式写入数据。 其实现上是通过访问
Doris http 连接(8030)
,然后通过 [stream load](http://doris.incubator.apache.org/master/zh-CN/administrator-guide/load-data/stream-load-manual.html)
加载数据到数据中,相比 `insert into` 方式效率要高不少,也是官方推荐的生产环境下的数据加载方式。

Expand Down Expand Up @@ -46,28 +47,31 @@ bin/addax.sh job/stream2doris.json

## 参数说明

| 配置项 | 是否必须 | 类型 | 默认值 | 描述 |
| :------------- | :------: | ------ | ------ | ---------------------------------------------------------------------------------------------------------------- |
| endpoint || string || Doris 的HTTP连接方式,只需要写到主机和端口即可,具体路径插件会自动拼装 | |
| username || string || HTTP 签名验证帐号 |
| password || string || HTTP 签名验证密码 |
| table || string || 所选取的需要同步的表名 |
| column || list || 所配置的表中需要同步的列名集合,详细描述见 [rdbmswriter](../rdbmswriter) |
| batchSize || int | 1024 | 定义了插件和数据库服务器端每次批量数据获取条数,调高该值可能导致 Addax 出现OOM或者目标数据库事务提交失败导致挂起 |
| lineDelimiter || string | `\n` | 每行的的分隔符,支持多个字节, 例如 `\x02\x03` |
| format || string | `csv` | 导入数据的格式, 可以使是 json 或者 csv |
| loadProps || map | `csv` | streamLoad 的请求参数,详情参照[StreamLoad介绍页面][1] |
| connectTimeout || int | -1 | StreamLoad单次请求的超时时间, 单位毫秒(ms) |
| 配置项 | 是否必须 | 类型 | 默认值 | 描述 |
| :------------- | :------: | ------ | -------- | ------------------------------------------------------------ |
| endpoint || string || Doris 的HTTP连接方式,只需要写到主机和端口即可,具体路径插件会自动拼装 | |
| username || string || HTTP 签名验证帐号 |
| password || string || HTTP 签名验证密码 |
| table || string || 所选取的需要同步的表名 |
| column || list || 所配置的表中需要同步的列名集合,详细描述见 [rdbmswriter](../rdbmswriter) |
| batchSize || int | 1024 | 定义了插件和数据库服务器端每次批量数据获取条数,调高该值可能导致 Addax 出现OOM或者目标数据库事务提交失败导致挂起 |
| lineDelimiter || string | `\n` | 每行的的分隔符,支持多个字节, 例如 `\x02\x03` |
| fieldDelimiter || string | `|` | 字段的分隔符 例如 `,` |
| format || string | `csv` | 导入数据的格式, 可以使是 json 或者 csv |
| loadProps || map | `csv` | streamLoad 的请求参数,详情参照[StreamLoad介绍页面][1] |
| connectTimeout || int | -1 | StreamLoad单次请求的超时时间, 单位毫秒(ms) |

[1]: https://doris.apache.org/master/zh-CN/administrator-guide/load-data/load-json-format.html#stream-load

## endpoint

`endpoint` 只是的任意一个 BE 的主机名及 `webserver_port` 端口,官方文档描述也可以填写 FE 主机名和 `http_port` 端口,但实际测试一直处于连接拒绝状态。
`endpoint` 只是的任意一个 BE 的主机名及 `webserver_port` 端口,官方文档描述也可以填写 FE 主机名和 `http_port`
端口,但实际测试一直处于连接拒绝状态。

### column

该插件中的 `column` 不是必须项,如果没有配置该项,或者配置为 `["*"]` , 则按照 reader 插件获取的字段值进行顺序拼装。 否则可以按照如下方式指定需要插入的字段
该插件中的 `column` 不是必须项,如果没有配置该项,或者配置为 `["*"]` , 则按照 reader 插件获取的字段值进行顺序拼装。
否则可以按照如下方式指定需要插入的字段

```json
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ public HttpUriRequest getRedirect(HttpRequest request, HttpResponse response, Ht
return new HttpGet(uri);
} else {
int status = response.getStatusLine().getStatusCode();
return (HttpUriRequest) (status == 307 ? RequestBuilder.copy(request).setUri(uri).build() : new HttpGet(uri));
return status == 307 ? RequestBuilder.copy(request).setUri(uri).build() : new HttpGet(uri);
}
}
});
Expand All @@ -225,14 +225,14 @@ public HttpUriRequest getRedirect(HttpRequest request, HttpResponse response, Ht
httpPut.setHeader(entry.getKey(), String.valueOf(entry.getValue()));
}
}
String format = configuration.getString(DorisKey.FORMAT, "csv");
String format = configuration.getString(DorisKey.FORMAT, DEFAULT_FORMAT);
// set other required headers
httpPut.setHeader(HttpHeaders.EXPECT, "100-continue");
httpPut.setHeader(HttpHeaders.AUTHORIZATION,
this.getBasicAuthHeader(configuration.getString(DorisKey.USERNAME), configuration.getString(DorisKey.PASSWORD)));
httpPut.setHeader("label", flushBatch.getLabel());
httpPut.setHeader("format", format);
httpPut.setHeader("line_delimiter", configuration.getString(DorisKey.LINE_DELIMITER, "\n"));
httpPut.setHeader("line_delimiter", configuration.getString(DorisKey.LINE_DELIMITER, DEFAULT_LINE_DELIMITER));

if ("csv".equalsIgnoreCase(format)) {
httpPut.setHeader("column_separator", configuration.getString(DorisKey.FIELD_DELIMITER, DEFAULT_SEPARATOR));
Expand Down Expand Up @@ -275,7 +275,7 @@ private String getStreamLoadLabel() {
/**
* loop to get target host
*
* @return
* @return the available endpoint
*/
private String getAvailableEndpoint() {
List<Object> connList = configuration.getList(Key.CONNECTION);
Expand Down

0 comments on commit 0173702

Please sign in to comment.