Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-23213 #4755

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@
import static org.mockito.Mockito.withSettings;

import java.io.Serializable;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import org.apache.ignite.configuration.ConfigurationValue;
import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
import org.apache.ignite.internal.cluster.management.ClusterState;
Expand All @@ -38,6 +40,8 @@
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.metastorage.configuration.MetaStorageConfiguration;
import org.apache.ignite.internal.metastorage.dsl.Condition;
import org.apache.ignite.internal.metastorage.dsl.Operation;
import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
import org.apache.ignite.internal.metastorage.server.ReadOperationForCompactionTracker;
import org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
Expand Down Expand Up @@ -78,6 +82,9 @@ public class StandaloneMetaStorageManager extends MetaStorageManagerImpl {

private static final MockSettings LENIENT_SETTINGS = withSettings().strictness(Strictness.LENIENT);

@Nullable
private Consumer<Boolean> afterInvokeInterceptor;

/** Creates standalone MetaStorage manager. */
public static StandaloneMetaStorageManager create() {
return create(TEST_NODE_NAME);
Expand Down Expand Up @@ -218,6 +225,34 @@ public static void configureCmgManagerToStartMetastorage(ClusterManagementGroupM
when(cmgManagerMock.clusterState()).thenReturn(completedFuture(clusterState));
}

public void setAfterInvokeInterceptor(@Nullable Consumer<Boolean> afterInvokeInterceptor) {
this.afterInvokeInterceptor = afterInvokeInterceptor;
}

@Override
public CompletableFuture<Boolean> invoke(Condition cond, Operation success, Operation failure) {
return super.invoke(cond, success, failure)
.thenApply(res -> {
if (afterInvokeInterceptor != null) {
afterInvokeInterceptor.accept(res);
}

return res;
});
}

@Override
public CompletableFuture<Boolean> invoke(Condition cond, List<Operation> success, List<Operation> failure) {
return super.invoke(cond, success, failure)
.thenApply(res -> {
if (afterInvokeInterceptor != null) {
afterInvokeInterceptor.accept(res);
}

return res;
});
}

private static RaftManager mockRaftManager() {
ArgumentCaptor<RaftGroupListener> listenerCaptor = ArgumentCaptor.forClass(RaftGroupListener.class);
RaftManager raftManager = mock(RaftManager.class, LENIENT_SETTINGS);
Expand Down
1 change: 1 addition & 0 deletions modules/placement-driver/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ dependencies {
testImplementation(testFixtures(project(':ignite-metastorage')))
testImplementation(testFixtures(project(':ignite-vault')))
testImplementation(testFixtures(project(':ignite-configuration')))
testImplementation(testFixtures(project(':ignite-runner')))
testImplementation libs.hamcrest.core
testImplementation libs.mockito.core
testImplementation libs.mockito.junit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ private void updateLeaseBatchInternal() {
long outdatedLeaseThreshold = now.getPhysical() + leaseExpirationInterval / 2;

Leases leasesCurrent = leaseTracker.leasesCurrent();
Map<ReplicationGroupId, Boolean> toBeNegotiated = new HashMap<>();
Map<ReplicationGroupId, LeaseAgreement> toBeNegotiated = new HashMap<>();
Map<ReplicationGroupId, Lease> renewedLeases = new HashMap<>(leasesCurrent.leaseByGroupId());

Map<ReplicationGroupId, TokenizedAssignments> tokenizedStableAssignmentsMap = assignmentsTracker.stableAssignments();
Expand Down Expand Up @@ -509,11 +509,11 @@ private void updateLeaseBatchInternal() {
// leaseholders at all.
if (isLeaseOutdated(lease)) {
// New lease is granted.
writeNewLease(grpId, candidate, renewedLeases);
Lease newLease = writeNewLease(grpId, candidate, renewedLeases);

boolean force = !lease.isProlongable() && lease.proposedCandidate() != null;

toBeNegotiated.put(grpId, force);
toBeNegotiated.put(grpId, new LeaseAgreement(newLease, force));
} else if (lease.isProlongable() && candidate.id().equals(lease.getLeaseholderId())) {
// Old lease is renewed.
prolongLease(grpId, lease, renewedLeases, leaseExpirationInterval);
Expand Down Expand Up @@ -550,36 +550,29 @@ private void updateLeaseBatchInternal() {
if (e != null) {
LOG.error("Lease update invocation failed", e);

cancelAgreements(toBeNegotiated.keySet());

return;
}

if (!success) {
LOG.warn("Lease update invocation failed because of concurrent update.");

cancelAgreements(toBeNegotiated.keySet());
LOG.warn("Lease update invocation failed because of outdated lease data on this node.");

return;
}

for (Map.Entry<ReplicationGroupId, Boolean> entry : toBeNegotiated.entrySet()) {
Lease lease = renewedLeases.get(entry.getKey());
boolean force = entry.getValue();

leaseNegotiator.negotiate(lease, force);
for (Map.Entry<ReplicationGroupId, LeaseAgreement> entry : toBeNegotiated.entrySet()) {
leaseNegotiator.negotiate(entry.getValue());
}
});
}

private void chooseCandidateAndCreateNewLease(
ReplicationGroupId grpId,
Lease lease,
Lease existingLease,
LeaseAgreement agreement,
Set<Assignment> stableAssignments,
Set<Assignment> pendingAssignments,
Map<ReplicationGroupId, Lease> renewedLeases,
Map<ReplicationGroupId, Boolean> toBeNegotiated
Map<ReplicationGroupId, LeaseAgreement> toBeNegotiated
) {
String proposedCandidate = null;

Expand All @@ -588,7 +581,7 @@ private void chooseCandidateAndCreateNewLease(
}

if (proposedCandidate == null) {
proposedCandidate = lease.isProlongable() ? lease.getLeaseholder() : lease.proposedCandidate();
proposedCandidate = existingLease.isProlongable() ? existingLease.getLeaseholder() : existingLease.proposedCandidate();
}

ClusterNode candidate = nextLeaseHolder(stableAssignments, pendingAssignments, grpId, proposedCandidate);
Expand All @@ -600,23 +593,11 @@ private void chooseCandidateAndCreateNewLease(
}

// New lease is granted.
writeNewLease(grpId, candidate, renewedLeases);
Lease newLease = writeNewLease(grpId, candidate, renewedLeases);

// TODO https://issues.apache.org/jira/browse/IGNITE-23213 Depending on the solution we may refactor this.
boolean force = Objects.equals(lease.getLeaseholder(), candidate.name()) && !agreement.isCancelled();
toBeNegotiated.put(grpId, force);
}
boolean force = Objects.equals(existingLease.getLeaseholder(), candidate.name()) && !agreement.isCancelled();

/**
* Cancel all the given agreements. This should be done if the new leases that were to be negotiated had been not written to meta
* storage.
*
* @param groupIds Group ids.
*/
private void cancelAgreements(Collection<ReplicationGroupId> groupIds) {
for (ReplicationGroupId groupId : groupIds) {
leaseNegotiator.cancelAgreement(groupId);
}
toBeNegotiated.put(grpId, new LeaseAgreement(newLease, force));
}

/**
Expand All @@ -625,8 +606,9 @@ private void cancelAgreements(Collection<ReplicationGroupId> groupIds) {
* @param grpId Replication group id.
* @param candidate Lease candidate.
* @param renewedLeases Leases to renew.
* @return Created lease.
*/
private void writeNewLease(
private Lease writeNewLease(
ReplicationGroupId grpId,
ClusterNode candidate,
Map<ReplicationGroupId, Lease> renewedLeases
Expand All @@ -641,10 +623,9 @@ private void writeNewLease(

renewedLeases.put(grpId, renewedLease);

// Lease agreement should be created synchronously before negotiation begins.
leaseNegotiator.createAgreement(grpId, renewedLease);

leaseUpdateStatistics.onLeaseCreate();

return renewedLease;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import static java.nio.ByteOrder.LITTLE_ENDIAN;
import static java.util.Collections.emptyMap;
import static java.util.Collections.unmodifiableMap;
import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.apache.ignite.internal.hlc.HybridTimestamp.MIN_VALUE;
Expand Down Expand Up @@ -219,7 +218,7 @@ public CompletableFuture<Void> onUpdate(WatchEvent event) {
}
}

leases = new Leases(unmodifiableMap(leasesMap), leasesBytes);
leases = new Leases(leasesMap, leasesBytes);

for (Lease expiredLease : expiredLeases) {
fireEventPrimaryReplicaExpired(event.revision(), expiredLease);
Expand Down Expand Up @@ -372,7 +371,7 @@ private void loadLeasesBusyAsync(long recoveryRevision) {
}
});

leases = new Leases(unmodifiableMap(leasesMap), leasesBytes);
leases = new Leases(leasesMap, leasesBytes);
}

LOG.info("Leases cache recovered [leases={}]", leases);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.ignite.internal.placementdriver.leases;

import static java.util.Collections.unmodifiableMap;

import java.util.Map;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.tostring.IgniteToStringExclude;
Expand Down Expand Up @@ -44,7 +46,7 @@ public Leases(Map<ReplicationGroupId, Lease> leaseByGroupId, byte[] leasesBytes)

/** Returns leases grouped by replication group. */
public Map<ReplicationGroupId, Lease> leaseByGroupId() {
return leaseByGroupId;
return unmodifiableMap(leaseByGroupId);
}

/** Returns an array of byte leases from the metastore. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,32 +46,38 @@ public class LeaseAgreement {
* The agreement, which has not try negotiating yet. We assume that it is {@link #ready()} and not {@link #isAccepted()}
* which allows both initiation and retries of negotiation.
*/
static final LeaseAgreement UNDEFINED_AGREEMENT = new LeaseAgreement(null, nullCompletedFuture());
static final LeaseAgreement UNDEFINED_AGREEMENT = new LeaseAgreement(null, nullCompletedFuture(), false);

/** Lease. */
private final Lease lease;

/** Forced agreement. */
private final boolean forced;

/** Future to {@link LeaseGrantedMessageResponse} response. */
private final CompletableFuture<LeaseGrantedMessageResponse> responseFut;

/**
* The constructor.
*
* @param lease Lease.
* @param forced Forced agreement.
*/
public LeaseAgreement(Lease lease) {
this(lease, new CompletableFuture<>());
public LeaseAgreement(Lease lease, boolean forced) {
this(lease, new CompletableFuture<>(), forced);
}

/**
* The constructor for private use.
*
* @param lease Lease.
* @param remoteNodeResponseFuture The future of response from the remote node which is negotiating the agreement.
* @param forced Forced agreement.
*/
private LeaseAgreement(Lease lease, CompletableFuture<LeaseGrantedMessageResponse> remoteNodeResponseFuture) {
private LeaseAgreement(Lease lease, CompletableFuture<LeaseGrantedMessageResponse> remoteNodeResponseFuture, boolean forced) {
this.lease = lease;
this.responseFut = requireNonNull(remoteNodeResponseFuture);
this.forced = forced;
}

/**
Expand All @@ -83,6 +89,24 @@ public Lease getLease() {
return lease;
}

/**
* Group id of lease.
*
* @return Group id.
*/
public ReplicationGroupId groupId() {
return lease.replicationGroupId();
}

/**
* Gets a forced agreement flag.
*
* @return Forced agreement flag.
*/
public boolean forced() {
return forced;
}

/**
* Gets a accepted flag. The flag is true, when the lease is accepted by leaseholder.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class LeaseNegotiator {
private static final PlacementDriverMessagesFactory PLACEMENT_DRIVER_MESSAGES_FACTORY = new PlacementDriverMessagesFactory();

/** Lease agreements which are in progress of negotiation. */
private final Map<ReplicationGroupId, LeaseAgreement> leaseToNegotiate;
private final Map<ReplicationGroupId, LeaseAgreement> leaseToNegotiate = new ConcurrentHashMap<>();

/** Cluster service. */
private final ClusterService clusterService;
Expand All @@ -53,33 +53,30 @@ public class LeaseNegotiator {
*/
public LeaseNegotiator(ClusterService clusterService) {
this.clusterService = clusterService;

this.leaseToNegotiate = new ConcurrentHashMap<>();
}

/**
* Tries negotiating a lease with its leaseholder.
* The negotiation will achieve after the method is invoked. Use {@link #getAndRemoveIfReady(ReplicationGroupId)} to check a result.
*
* @param lease Lease to negotiate.
* @param force If the flag is true, the process tries to insist of apply the lease.
* @param agreement Lease agreement to negotiate.
*/
public void negotiate(Lease lease, boolean force) {
ReplicationGroupId groupId = lease.replicationGroupId();

LeaseAgreement agreement = leaseToNegotiate.get(groupId);
public void negotiate(LeaseAgreement agreement) {
Lease lease = agreement.getLease();

assert agreement != null : "Lease agreement should exist when negotiation begins [groupId=" + groupId + "].";
assert agreement != null : "Lease agreement should exist when negotiation begins [groupId=" + agreement.groupId() + "].";

long leaseInterval = lease.getExpirationTime().getPhysical() - lease.getStartTime().getPhysical();

leaseToNegotiate.put(agreement.groupId(), agreement);

clusterService.messagingService().invoke(
lease.getLeaseholder(),
PLACEMENT_DRIVER_MESSAGES_FACTORY.leaseGrantedMessage()
.groupId(groupId)
.groupId(agreement.groupId())
.leaseStartTime(lease.getStartTime())
.leaseExpirationTime(lease.getExpirationTime())
.force(force)
.force(agreement.forced())
.build(),
leaseInterval)
.whenComplete((msg, throwable) -> {
Expand Down Expand Up @@ -119,16 +116,6 @@ public LeaseAgreement getAndRemoveIfReady(ReplicationGroupId groupId) {
return res[0] == null ? UNDEFINED_AGREEMENT : res[0];
}

/**
* Creates an agreement.
*
* @param groupId Group id.
* @param lease Lease to negotiate.
*/
public void createAgreement(ReplicationGroupId groupId, Lease lease) {
leaseToNegotiate.put(groupId, new LeaseAgreement(lease));
}

/**
* Removes lease from list to negotiate.
*
Expand Down
Loading