forked from acromusashi/acromusashi-stream
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathElasticSearchBolt.java
176 lines (151 loc) · 5.6 KB
/
ElasticSearchBolt.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
/**
* Copyright (c) Acroquest Technology Co, Ltd. All Rights Reserved.
* Please read the associated COPYRIGHTS file for more details.
*
* THE SOFTWARE IS PROVIDED BY Acroquest Technolog Co., Ltd.,
* WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
* BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDER BE LIABLE FOR ANY
* CLAIM, DAMAGES SUFFERED BY LICENSEE AS A RESULT OF USING, MODIFYING
* OR DISTRIBUTING THIS SOFTWARE OR ITS DERIVATIVES.
*/
package acromusashi.stream.component.elasticsearch.bolt;
import java.text.MessageFormat;
import java.util.Map;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import acromusashi.stream.bolt.BaseConfigurationBolt;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
/**
* ElasticSearchに対してクエリを投入するBolt
*
* @author kimura
*/
public class ElasticSearchBolt extends BaseConfigurationBolt
{
/** serialVersionUID */
private static final long serialVersionUID = 4987555107871741041L;
/** logger */
private static final Logger logger = LoggerFactory.getLogger(ElasticSearchBolt.class);
/** デフォルトポート値 */
private static final int DEFAULT_PORT = 9300;
/** デフォルトの絞り込みクエリ値 */
private static final String DEFAULT_PERCOLATE = "*";
/** クラスタ名称 */
protected String clusterName;
/** ElasticSearchの投入先。「host1:port1;host2:port2;host3:port3...」という形式で定義 */
protected String servers;
/** 絞り込みクエリ値 */
protected String percolate = DEFAULT_PERCOLATE;
/** TupleをIndex Requestに変換するコンバータ */
protected EsTupleConverter converter;
/** ElasticSearchClient */
protected transient Client client;
/**
* Converterを指定してインスタンスを生成する。
*
* @param converter TupleをIndex Requestに変換するコンバータ
*/
public ElasticSearchBolt(EsTupleConverter converter)
{
this.converter = converter;
}
/**
* {@inheritDoc}
*/
@SuppressWarnings("rawtypes")
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector)
{
super.prepare(stormConf, context, collector);
// ElasticSearchClientを初期化
// Serversは「host1:port1;host2:port2;host3:port3...」形式のため分割して生成
Settings settings = ImmutableSettings.settingsBuilder().put("cluster.name",
this.clusterName).build();
TransportClient transportClient = new TransportClient(settings);
for (String server : this.servers.split(";"))
{
String[] components = server.trim().split(":");
String host = components[0];
int port = DEFAULT_PORT;
if (components.length > 1)
{
port = Integer.parseInt(components[1]);
}
transportClient = transportClient.addTransportAddress(new InetSocketTransportAddress(
host, port));
}
this.client = transportClient;
}
/**
* {@inheritDoc}
*/
@Override
public void execute(Tuple input)
{
String documentId = null;
String indexName = null;
String typeName = null;
String document = null;
try
{
documentId = this.converter.convertToId(input);
indexName = this.converter.convertToIndex(input);
typeName = this.converter.convertToType(input);
document = this.converter.convertToDocument(input);
IndexResponse response = this.client.prepareIndex(indexName, typeName, documentId).setSource(
document).setPercolate(this.percolate).execute().actionGet();
if (logger.isDebugEnabled() == true)
{
String logFormat = "Document Indexed. Id={0}, Type={1}, Index={2}, Version={3}";
logger.debug(MessageFormat.format(logFormat, response.getId(), typeName, indexName,
response.getVersion()));
}
}
catch (Exception ex)
{
String logFormat = "Document Index failed. Dispose Tuple. Id={0}, Type={1}, Index={2}";
logger.warn(MessageFormat.format(logFormat, documentId, typeName, indexName), ex);
}
getCollector().ack(input);
}
/**
* {@inheritDoc}
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer)
{
// 下流に送信を行わないため、未設定
}
/**
* @param clusterName the clusterName to set
*/
public void setClusterName(String clusterName)
{
this.clusterName = clusterName;
}
/**
* @param servers the servers to set
*/
public void setServers(String servers)
{
this.servers = servers;
}
/**
* @param percolate the percolate to set
*/
public void setPercolate(String percolate)
{
this.percolate = percolate;
}
}