diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java b/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java index 1471200ba7dcf..ec4cef1bd1fe3 100644 --- a/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java @@ -32,18 +32,17 @@ package org.opensearch.gateway; -import org.opensearch.Version; import org.apache.lucene.index.CorruptIndexException; +import org.opensearch.Version; import org.opensearch.action.admin.cluster.configuration.AddVotingConfigExclusionsAction; import org.opensearch.action.admin.cluster.configuration.AddVotingConfigExclusionsRequest; import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsAction; import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsRequest; +import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; import org.opensearch.action.admin.cluster.reroute.ClusterRerouteResponse; import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsGroup; import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsResponse; import org.opensearch.action.admin.indices.exists.indices.IndicesExistsResponse; -import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; -import org.opensearch.action.admin.cluster.reroute.ClusterRerouteResponse; import org.opensearch.action.admin.indices.recovery.RecoveryResponse; import org.opensearch.action.admin.indices.stats.IndexStats; import org.opensearch.action.admin.indices.stats.IndicesStatsResponse; @@ -1245,165 +1244,6 @@ private void createNIndices(int n, String prefix) { } } - public void testSingleShardFetchUsingBatchAction() { - String indexName = "test"; - int numOfShards = 1; - prepareIndex(indexName, numOfShards); - Map shardIdShardAttributesMap = prepareRequestMap(new String[] { indexName }, numOfShards); - - ClusterSearchShardsResponse searchShardsResponse = client().admin().cluster().prepareSearchShards(indexName).get(); - - TransportNodesListGatewayStartedShardsBatch.NodesGatewayStartedShardsBatch response; - response = ActionTestUtils.executeBlocking( - internalCluster().getInstance(TransportNodesListGatewayStartedShardsBatch.class), - new TransportNodesListGatewayStartedShardsBatch.Request(searchShardsResponse.getNodes(), shardIdShardAttributesMap) - ); - final Index index = resolveIndex(indexName); - final ShardId shardId = new ShardId(index, 0); - GatewayStartedShard gatewayStartedShard = response.getNodesMap() - .get(searchShardsResponse.getNodes()[0].getId()) - .getNodeGatewayStartedShardsBatch() - .get(shardId); - assertNodeGatewayStartedShardsHappyCase(gatewayStartedShard); - } - - public void testShardFetchMultiNodeMultiIndexesUsingBatchAction() { - // start node - internalCluster().startNode(); - String indexName1 = "test1"; - String indexName2 = "test2"; - int numShards = internalCluster().numDataNodes(); - // assign one primary shard each to the data nodes - prepareIndex(indexName1, numShards); - prepareIndex(indexName2, numShards); - Map shardIdShardAttributesMap = prepareRequestMap(new String[] { indexName1, indexName2 }, numShards); - ClusterSearchShardsResponse searchShardsResponse = client().admin().cluster().prepareSearchShards(indexName1, indexName2).get(); - assertEquals(internalCluster().numDataNodes(), searchShardsResponse.getNodes().length); - TransportNodesListGatewayStartedShardsBatch.NodesGatewayStartedShardsBatch response; - response = ActionTestUtils.executeBlocking( - internalCluster().getInstance(TransportNodesListGatewayStartedShardsBatch.class), - new TransportNodesListGatewayStartedShardsBatch.Request(searchShardsResponse.getNodes(), shardIdShardAttributesMap) - ); - for (ClusterSearchShardsGroup clusterSearchShardsGroup : searchShardsResponse.getGroups()) { - ShardId shardId = clusterSearchShardsGroup.getShardId(); - assertEquals(1, clusterSearchShardsGroup.getShards().length); - String nodeId = clusterSearchShardsGroup.getShards()[0].currentNodeId(); - GatewayStartedShard gatewayStartedShard = response.getNodesMap().get(nodeId).getNodeGatewayStartedShardsBatch().get(shardId); - assertNodeGatewayStartedShardsHappyCase(gatewayStartedShard); - } - } - - public void testShardFetchCorruptedShardsUsingBatchAction() throws Exception { - String indexName = "test"; - int numOfShards = 1; - prepareIndex(indexName, numOfShards); - Map shardIdShardAttributesMap = prepareRequestMap(new String[] { indexName }, numOfShards); - ClusterSearchShardsResponse searchShardsResponse = client().admin().cluster().prepareSearchShards(indexName).get(); - final Index index = resolveIndex(indexName); - final ShardId shardId = new ShardId(index, 0); - corruptShard(searchShardsResponse.getNodes()[0].getName(), shardId); - TransportNodesListGatewayStartedShardsBatch.NodesGatewayStartedShardsBatch response; - internalCluster().restartNode(searchShardsResponse.getNodes()[0].getName()); - response = ActionTestUtils.executeBlocking( - internalCluster().getInstance(TransportNodesListGatewayStartedShardsBatch.class), - new TransportNodesListGatewayStartedShardsBatch.Request(getDiscoveryNodes(), shardIdShardAttributesMap) - ); - DiscoveryNode[] discoveryNodes = getDiscoveryNodes(); - GatewayStartedShard gatewayStartedShard = response.getNodesMap() - .get(discoveryNodes[0].getId()) - .getNodeGatewayStartedShardsBatch() - .get(shardId); - assertNotNull(gatewayStartedShard.storeException()); - assertNotNull(gatewayStartedShard.allocationId()); - assertTrue(gatewayStartedShard.primary()); - } - - public void testSingleShardStoreFetchUsingBatchAction() throws ExecutionException, InterruptedException { - String indexName = "test"; - DiscoveryNode[] nodes = getDiscoveryNodes(); - TransportNodesListShardStoreMetadataBatch.NodesStoreFilesMetadataBatch response = prepareAndSendRequest( - new String[] { indexName }, - nodes - ); - Index index = resolveIndex(indexName); - ShardId shardId = new ShardId(index, 0); - TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadata nodeStoreFilesMetadata = response.getNodesMap() - .get(nodes[0].getId()) - .getNodeStoreFilesMetadataBatch() - .get(shardId); - assertNodeStoreFilesMetadataSuccessCase(nodeStoreFilesMetadata, shardId); - } - - public void testShardStoreFetchMultiNodeMultiIndexesUsingBatchAction() throws Exception { - internalCluster().startNodes(2); - String indexName1 = "test1"; - String indexName2 = "test2"; - DiscoveryNode[] nodes = getDiscoveryNodes(); - TransportNodesListShardStoreMetadataBatch.NodesStoreFilesMetadataBatch response = prepareAndSendRequest( - new String[] { indexName1, indexName2 }, - nodes - ); - ClusterSearchShardsResponse searchShardsResponse = client().admin().cluster().prepareSearchShards(indexName1, indexName2).get(); - for (ClusterSearchShardsGroup clusterSearchShardsGroup : searchShardsResponse.getGroups()) { - ShardId shardId = clusterSearchShardsGroup.getShardId(); - ShardRouting[] shardRoutings = clusterSearchShardsGroup.getShards(); - assertEquals(2, shardRoutings.length); - for (ShardRouting shardRouting : shardRoutings) { - TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadata nodeStoreFilesMetadata = response.getNodesMap() - .get(shardRouting.currentNodeId()) - .getNodeStoreFilesMetadataBatch() - .get(shardId); - assertNodeStoreFilesMetadataSuccessCase(nodeStoreFilesMetadata, shardId); - } - } - } - - public void testShardStoreFetchNodeNotConnectedUsingBatchAction() { - DiscoveryNode nonExistingNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); - String indexName = "test"; - TransportNodesListShardStoreMetadataBatch.NodesStoreFilesMetadataBatch response = prepareAndSendRequest( - new String[] { indexName }, - new DiscoveryNode[] { nonExistingNode } - ); - assertTrue(response.hasFailures()); - assertEquals(1, response.failures().size()); - assertEquals(nonExistingNode.getId(), response.failures().get(0).nodeId()); - } - - public void testShardStoreFetchCorruptedIndexUsingBatchAction() throws Exception { - internalCluster().startNodes(2); - String index1Name = "test1"; - String index2Name = "test2"; - prepareIndices(new String[] { index1Name, index2Name }, 1, 1); - Map shardAttributesMap = prepareRequestMap(new String[] { index1Name, index2Name }, 1); - Index index1 = resolveIndex(index1Name); - ShardId shardId1 = new ShardId(index1, 0); - ClusterSearchShardsResponse searchShardsResponse = client().admin().cluster().prepareSearchShards(index1Name).get(); - assertEquals(2, searchShardsResponse.getNodes().length); - - // corrupt test1 index shards - corruptShard(searchShardsResponse.getNodes()[0].getName(), shardId1); - corruptShard(searchShardsResponse.getNodes()[1].getName(), shardId1); - ClusterRerouteResponse clusterRerouteResponse = client().admin().cluster().prepareReroute().setRetryFailed(false).get(); - DiscoveryNode[] discoveryNodes = getDiscoveryNodes(); - TransportNodesListShardStoreMetadataBatch.NodesStoreFilesMetadataBatch response; - response = ActionTestUtils.executeBlocking( - internalCluster().getInstance(TransportNodesListShardStoreMetadataBatch.class), - new TransportNodesListShardStoreMetadataBatch.Request(shardAttributesMap, discoveryNodes) - ); - Map nodeStoreFilesMetadata = response.getNodesMap() - .get(discoveryNodes[0].getId()) - .getNodeStoreFilesMetadataBatch(); - // We don't store exception in case of corrupt index, rather just return an empty response - assertNull(nodeStoreFilesMetadata.get(shardId1).getStoreFileFetchException()); - assertEquals(shardId1, nodeStoreFilesMetadata.get(shardId1).storeFilesMetadata().shardId()); - assertTrue(nodeStoreFilesMetadata.get(shardId1).storeFilesMetadata().isEmpty()); - - Index index2 = resolveIndex(index2Name); - ShardId shardId2 = new ShardId(index2, 0); - assertNodeStoreFilesMetadataSuccessCase(nodeStoreFilesMetadata.get(shardId2), shardId2); - } - public void testDeleteRedIndexInBatchMode() throws Exception { internalCluster().startClusterManagerOnlyNodes( 1, @@ -1454,57 +1294,4 @@ public void testDeleteRedIndexInBatchMode() throws Exception { assertFalse(indexExistResponse.isExists()); } - private void prepareIndices(String[] indices, int numberOfPrimaryShards, int numberOfReplicaShards) { - for (String index : indices) { - createIndex( - index, - Settings.builder() - .put(SETTING_NUMBER_OF_SHARDS, numberOfPrimaryShards) - .put(SETTING_NUMBER_OF_REPLICAS, numberOfReplicaShards) - .build() - ); - index(index, "type", "1", Collections.emptyMap()); - flush(index); - } - } - - private TransportNodesListShardStoreMetadataBatch.NodesStoreFilesMetadataBatch prepareAndSendRequest( - String[] indices, - DiscoveryNode[] nodes - ) { - Map shardAttributesMap = null; - prepareIndices(indices, 1, 1); - shardAttributesMap = prepareRequestMap(indices, 1); - TransportNodesListShardStoreMetadataBatch.NodesStoreFilesMetadataBatch response; - return ActionTestUtils.executeBlocking( - internalCluster().getInstance(TransportNodesListShardStoreMetadataBatch.class), - new TransportNodesListShardStoreMetadataBatch.Request(shardAttributesMap, nodes) - ); - } - - private void assertNodeStoreFilesMetadataSuccessCase( - TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadata nodeStoreFilesMetadata, - ShardId shardId - ) { - assertNull(nodeStoreFilesMetadata.getStoreFileFetchException()); - TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata storeFileMetadata = nodeStoreFilesMetadata.storeFilesMetadata(); - assertFalse(storeFileMetadata.isEmpty()); - assertEquals(shardId, storeFileMetadata.shardId()); - assertNotNull(storeFileMetadata.peerRecoveryRetentionLeases()); - } - - private void assertNodeGatewayStartedShardsHappyCase(GatewayStartedShard gatewayStartedShard) { - assertNull(gatewayStartedShard.storeException()); - assertNotNull(gatewayStartedShard.allocationId()); - assertTrue(gatewayStartedShard.primary()); - } - - private void prepareIndex(String indexName, int numberOfPrimaryShards) { - createIndex( - indexName, - Settings.builder().put(SETTING_NUMBER_OF_SHARDS, numberOfPrimaryShards).put(SETTING_NUMBER_OF_REPLICAS, 0).build() - ); - index(indexName, "type", "1", Collections.emptyMap()); - flush(indexName); - } } diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java index a9339a70a187c..d6b364887b560 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java @@ -55,7 +55,6 @@ import org.opensearch.cluster.routing.allocation.command.AllocationCommands; import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders; import org.opensearch.cluster.routing.allocation.decider.Decision; -import org.opensearch.common.collect.ImmutableOpenMap; import org.opensearch.common.settings.Settings; import org.opensearch.gateway.GatewayAllocator; import org.opensearch.gateway.PriorityComparator; @@ -201,9 +200,9 @@ private ClusterState buildResult(ClusterState oldState, RoutingAllocation alloca if (restoreInProgress != null) { RestoreInProgress updatedRestoreInProgress = allocation.updateRestoreInfoWithRoutingChanges(restoreInProgress); if (updatedRestoreInProgress != restoreInProgress) { - ImmutableOpenMap.Builder customsBuilder = ImmutableOpenMap.builder(allocation.getCustoms()); + final Map customsBuilder = new HashMap<>(allocation.getCustoms()); customsBuilder.put(RestoreInProgress.TYPE, updatedRestoreInProgress); - newStateBuilder.customs(customsBuilder.build()); + newStateBuilder.customs(customsBuilder); } } return newStateBuilder.build(); diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/ExistingShardsAllocator.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/ExistingShardsAllocator.java index 99de9f464859c..fb2a37237f8b6 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/ExistingShardsAllocator.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/ExistingShardsAllocator.java @@ -81,26 +81,6 @@ public interface ExistingShardsAllocator { Setting.Property.NodeScope ); - /** - * Boolean setting to enable/disable batch allocation of unassigned shards already existing on disk. - * This will allow sending all Unassigned Shards to the ExistingShard Allocator to make decision to allocate - * in one or more go. - *

- * Enable this setting if your ExistingShardAllocator is implementing the - * {@link ExistingShardsAllocator#allocateAllUnassignedShards(RoutingAllocation, boolean)} method. - * The default implementation of this method is not optimized and assigns shards one by one. - *

- * If no plugin overrides {@link ExistingShardsAllocator} then default implementation will be use for it , i.e, - * {@link ShardsBatchGatewayAllocator}. - *

- * This setting is experimental at this point. - */ - Setting EXISTING_SHARDS_ALLOCATOR_BATCH_MODE = Setting.boolSetting( - "cluster.allocator.existing_shards_allocator.batch_enabled", - false, - Setting.Property.NodeScope - ); - /** * Called before starting a round of allocation, allowing the allocator to invalidate some caches if appropriate. */ diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 3afe1ee29171e..ed21d8707c22c 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -104,6 +104,7 @@ import org.opensearch.gateway.GatewayService; import org.opensearch.gateway.PersistedClusterStateService; import org.opensearch.gateway.ShardsBatchGatewayAllocator; +import org.opensearch.gateway.remote.RemoteClusterStateService; import org.opensearch.http.HttpTransportSettings; import org.opensearch.index.IndexModule; import org.opensearch.index.IndexSettings; @@ -270,7 +271,6 @@ public void apply(Settings value, Settings current, Settings previous) { EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING, EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING, ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE, - ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE_ENABLED, FilterAllocationDecider.CLUSTER_ROUTING_INCLUDE_GROUP_SETTING, FilterAllocationDecider.CLUSTER_ROUTING_EXCLUDE_GROUP_SETTING, FilterAllocationDecider.CLUSTER_ROUTING_REQUIRE_GROUP_SETTING,