Skip to content

Commit d1ec484

Browse files
Update data classes to java records
Signed-off-by: Varun Bharadwaj <varunbharadwaj1995@gmail.com>
1 parent f41ccfa commit d1ec484

File tree

17 files changed

+125
-154
lines changed

17 files changed

+125
-154
lines changed

doc-tools/missing-doclet/src/main/java/org/opensearch/missingdoclet/MissingDoclet.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,7 @@ private void check(Element element) {
235235
case INTERFACE:
236236
case ENUM:
237237
case ANNOTATION_TYPE:
238+
case RECORD:
238239
if (level(element) >= CLASS) {
239240
checkComment(element);
240241
for (var subElement : element.getEnclosedElements()) {
@@ -343,7 +344,7 @@ private boolean isGenerated(Element element) {
343344
if (!isGenerated && element.getEnclosingElement() != null) {
344345
// check if enclosing element is generated
345346
return isGenerated(element.getEnclosingElement());
346-
}
347+
}
347348

348349
return isGenerated;
349350
}

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/RemoteStoreKafkaIT.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ public void testPauseAndResumeIngestion() throws Exception {
177177
waitForState(() -> {
178178
GetIngestionStateResponse ingestionState = getIngestionState(indexName);
179179
return Arrays.stream(ingestionState.getShardStates())
180-
.allMatch(state -> state.isPollerPaused() && state.getPollerState().equalsIgnoreCase("paused"));
180+
.allMatch(state -> state.isPollerPaused() && state.pollerState().equalsIgnoreCase("paused"));
181181
});
182182

183183
// verify ingestion state is persisted
@@ -195,7 +195,7 @@ public void testPauseAndResumeIngestion() throws Exception {
195195
waitForState(() -> {
196196
GetIngestionStateResponse ingestionState = getIngestionState(indexName);
197197
return Arrays.stream(ingestionState.getShardStates())
198-
.allMatch(state -> state.isPollerPaused() && state.getPollerState().equalsIgnoreCase("paused"));
198+
.allMatch(state -> state.isPollerPaused() && state.pollerState().equalsIgnoreCase("paused"));
199199
});
200200

201201
// resume ingestion
@@ -207,7 +207,7 @@ public void testPauseAndResumeIngestion() throws Exception {
207207
return Arrays.stream(ingestionState.getShardStates())
208208
.allMatch(
209209
state -> state.isPollerPaused() == false
210-
&& (state.getPollerState().equalsIgnoreCase("polling") || state.getPollerState().equalsIgnoreCase("processing"))
210+
&& (state.pollerState().equalsIgnoreCase("polling") || state.pollerState().equalsIgnoreCase("processing"))
211211
);
212212
});
213213
waitForSearchableDocs(4, Arrays.asList(nodeB, nodeC));
@@ -225,9 +225,9 @@ public void testGetIngestionState() throws ExecutionException, InterruptedExcept
225225
assertEquals(1, ingestionState.getSuccessfulShards());
226226
assertEquals(1, ingestionState.getTotalShards());
227227
assertEquals(1, ingestionState.getShardStates().length);
228-
assertEquals(0, ingestionState.getShardStates()[0].getShardId());
229-
assertEquals("POLLING", ingestionState.getShardStates()[0].getPollerState());
230-
assertEquals("DROP", ingestionState.getShardStates()[0].getErrorPolicy());
228+
assertEquals(0, ingestionState.getShardStates()[0].shardId());
229+
assertEquals("POLLING", ingestionState.getShardStates()[0].pollerState());
230+
assertEquals("DROP", ingestionState.getShardStates()[0].errorPolicy());
231231
assertFalse(ingestionState.getShardStates()[0].isPollerPaused());
232232

233233
GetIngestionStateResponse ingestionStateForInvalidShard = getIngestionState(new String[] { indexName }, new int[] { 1 });

server/src/main/java/org/opensearch/action/admin/indices/streamingingestion/IngestionStateShardFailure.java

+6-49
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,9 @@
99
package org.opensearch.action.admin.indices.streamingingestion;
1010

1111
import org.opensearch.common.annotation.ExperimentalApi;
12-
import org.opensearch.core.common.Strings;
1312
import org.opensearch.core.common.io.stream.StreamInput;
1413
import org.opensearch.core.common.io.stream.StreamOutput;
1514
import org.opensearch.core.common.io.stream.Writeable;
16-
import org.opensearch.core.xcontent.MediaTypeRegistry;
1715
import org.opensearch.core.xcontent.ToXContentFragment;
1816
import org.opensearch.core.xcontent.XContentBuilder;
1917

@@ -22,46 +20,26 @@
2220
import java.util.HashMap;
2321
import java.util.List;
2422
import java.util.Map;
25-
import java.util.Objects;
2623

2724
/**
2825
* Indicates ingestion failures at index and shard level.
2926
*
3027
* @opensearch.experimental
3128
*/
3229
@ExperimentalApi
33-
public class IngestionStateShardFailure implements Writeable, ToXContentFragment {
30+
public record IngestionStateShardFailure(String index, int shard, String errorMessage) implements Writeable, ToXContentFragment {
31+
3432
private static final String SHARD = "shard";
3533
private static final String ERROR = "error";
3634

37-
private final String index;
38-
private final int shard;
39-
private String errorMessage;
40-
41-
public IngestionStateShardFailure(String index, int shard, String errorMessage) {
42-
this.index = index;
43-
this.shard = shard;
44-
this.errorMessage = errorMessage;
45-
}
46-
4735
public IngestionStateShardFailure(StreamInput in) throws IOException {
48-
this.index = in.readString();
49-
this.shard = in.readInt();
50-
this.errorMessage = in.readString();
51-
}
52-
53-
public String getIndex() {
54-
return index;
55-
}
56-
57-
public int getShard() {
58-
return shard;
36+
this(in.readString(), in.readVInt(), in.readString());
5937
}
6038

6139
@Override
6240
public void writeTo(StreamOutput out) throws IOException {
6341
out.writeString(index);
64-
out.writeInt(shard);
42+
out.writeVInt(shard);
6543
out.writeString(errorMessage);
6644
}
6745

@@ -73,36 +51,15 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
7351
return builder.endObject();
7452
}
7553

76-
@Override
77-
public boolean equals(Object o) {
78-
if (this == o) return true;
79-
if (o == null || getClass() != o.getClass()) return false;
80-
IngestionStateShardFailure that = (IngestionStateShardFailure) o;
81-
return Objects.equals(index, that.index) && shard == that.shard && Objects.equals(errorMessage, that.errorMessage);
82-
}
83-
84-
@Override
85-
public int hashCode() {
86-
int result = Objects.hashCode(index);
87-
result = 31 * result + Objects.hashCode(shard);
88-
result = 31 * result + Objects.hashCode(errorMessage);
89-
return result;
90-
}
91-
92-
@Override
93-
public String toString() {
94-
return Strings.toString(MediaTypeRegistry.JSON, this);
95-
}
96-
9754
/**
9855
* Groups provided shard ingestion state failures by index name.
9956
*/
10057
public static Map<String, List<IngestionStateShardFailure>> groupShardFailuresByIndex(IngestionStateShardFailure[] shardFailures) {
10158
Map<String, List<IngestionStateShardFailure>> shardFailuresByIndex = new HashMap<>();
10259

10360
for (IngestionStateShardFailure shardFailure : shardFailures) {
104-
shardFailuresByIndex.computeIfAbsent(shardFailure.getIndex(), (index) -> new ArrayList<>());
105-
shardFailuresByIndex.get(shardFailure.getIndex()).add(shardFailure);
61+
shardFailuresByIndex.computeIfAbsent(shardFailure.index(), (index) -> new ArrayList<>());
62+
shardFailuresByIndex.get(shardFailure.index()).add(shardFailure);
10663
}
10764

10865
return shardFailuresByIndex;

server/src/main/java/org/opensearch/action/admin/indices/streamingingestion/state/GetIngestionStateRequest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public GetIngestionStateRequest(String[] index) {
4242
public GetIngestionStateRequest(StreamInput in) throws IOException {
4343
super(in);
4444
this.index = in.readStringArray();
45-
this.shards = in.readIntArray();
45+
this.shards = in.readVIntArray();
4646
}
4747

4848
@Override
@@ -58,7 +58,7 @@ public ActionRequestValidationException validate() {
5858
public void writeTo(StreamOutput out) throws IOException {
5959
super.writeTo(out);
6060
out.writeStringArray(index);
61-
out.writeIntArray(shards);
61+
out.writeVIntArray(shards);
6262
}
6363

6464
public String[] getIndex() {

server/src/main/java/org/opensearch/action/admin/indices/streamingingestion/state/ShardIngestionState.java

+10-41
Original file line numberDiff line numberDiff line change
@@ -28,33 +28,22 @@
2828
* @opensearch.experimental
2929
*/
3030
@ExperimentalApi
31-
public class ShardIngestionState implements Writeable, ToXContentFragment {
31+
public record ShardIngestionState(String index, int shardId, String pollerState, String errorPolicy, boolean isPollerPaused)
32+
implements
33+
Writeable,
34+
ToXContentFragment {
35+
3236
private static final String SHARD = "shard";
3337
private static final String POLLER_STATE = "poller_state";
3438
private static final String ERROR_POLICY = "error_policy";
3539
private static final String POLLER_PAUSED = "poller_paused";
3640

37-
private String index;
38-
private int shardId;
39-
@Nullable
40-
private String pollerState;
41-
@Nullable
42-
private String errorPolicy;
43-
44-
// Indicates if the poller is paused. This can be different from pollerState if the poller is yet to update its
45-
// state after being paused.
46-
private boolean isPollerPaused;
47-
4841
public ShardIngestionState() {
49-
shardId = -1;
42+
this("", -1, "", "", false);
5043
}
5144

5245
public ShardIngestionState(StreamInput in) throws IOException {
53-
index = in.readString();
54-
shardId = in.readInt();
55-
pollerState = in.readOptionalString();
56-
errorPolicy = in.readOptionalString();
57-
isPollerPaused = in.readBoolean();
46+
this(in.readString(), in.readVInt(), in.readOptionalString(), in.readOptionalString(), in.readBoolean());
5847
}
5948

6049
public ShardIngestionState(
@@ -71,30 +60,10 @@ public ShardIngestionState(
7160
this.isPollerPaused = isPollerPaused;
7261
}
7362

74-
public int getShardId() {
75-
return shardId;
76-
}
77-
78-
public String getPollerState() {
79-
return pollerState;
80-
}
81-
82-
public String getIndex() {
83-
return index;
84-
}
85-
86-
public boolean isPollerPaused() {
87-
return isPollerPaused;
88-
}
89-
90-
public String getErrorPolicy() {
91-
return errorPolicy;
92-
}
93-
9463
@Override
9564
public void writeTo(StreamOutput out) throws IOException {
9665
out.writeString(index);
97-
out.writeInt(shardId);
66+
out.writeVInt(shardId);
9867
out.writeOptionalString(pollerState);
9968
out.writeOptionalString(errorPolicy);
10069
out.writeBoolean(isPollerPaused);
@@ -118,8 +87,8 @@ public static Map<String, List<ShardIngestionState>> groupShardStateByIndex(Shar
11887
Map<String, List<ShardIngestionState>> shardIngestionStatesByIndex = new HashMap<>();
11988

12089
for (ShardIngestionState state : shardIngestionStates) {
121-
shardIngestionStatesByIndex.computeIfAbsent(state.getIndex(), (index) -> new ArrayList<>());
122-
shardIngestionStatesByIndex.get(state.getIndex()).add(state);
90+
shardIngestionStatesByIndex.computeIfAbsent(state.index(), (index) -> new ArrayList<>());
91+
shardIngestionStatesByIndex.get(state.index()).add(state);
12392
}
12493

12594
return shardIngestionStatesByIndex;

server/src/main/java/org/opensearch/action/admin/indices/streamingingestion/state/TransportUpdateIngestionStateAction.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.opensearch.cluster.block.ClusterBlockException;
1616
import org.opensearch.cluster.block.ClusterBlockLevel;
1717
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
18+
import org.opensearch.cluster.metadata.IngestionStatus;
1819
import org.opensearch.cluster.routing.ShardRouting;
1920
import org.opensearch.cluster.routing.ShardsIterator;
2021
import org.opensearch.cluster.service.ClusterService;
@@ -132,7 +133,7 @@ protected ShardIngestionState shardOperation(UpdateIngestionStateRequest request
132133
try {
133134
if (request.getIngestionPaused() != null) {
134135
// update pause/resume state
135-
indexShard.updateShardIngestionState(request.getIngestionPaused());
136+
indexShard.updateShardIngestionState(new IngestionStatus(request.getIngestionPaused()));
136137
}
137138

138139
return indexShard.getIngestionState();

server/src/main/java/org/opensearch/action/admin/indices/streamingingestion/state/UpdateIngestionStateRequest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public UpdateIngestionStateRequest(String[] index, int[] shards) {
4444
public UpdateIngestionStateRequest(StreamInput in) throws IOException {
4545
super(in);
4646
this.index = in.readStringArray();
47-
this.shards = in.readIntArray();
47+
this.shards = in.readVIntArray();
4848
this.ingestionPaused = in.readOptionalBoolean();
4949
}
5050

@@ -61,7 +61,7 @@ public ActionRequestValidationException validate() {
6161
public void writeTo(StreamOutput out) throws IOException {
6262
super.writeTo(out);
6363
out.writeStringArray(index);
64-
out.writeIntArray(shards);
64+
out.writeVIntArray(shards);
6565
out.writeOptionalBoolean(ingestionPaused);
6666
}
6767

0 commit comments

Comments
 (0)