Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-23316 Hybrid Clock#update can be optimized in case where retur… #4727

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,14 @@ public interface ClockService {
* @param requestTime Timestamp from request.
* @return New local hybrid timestamp that is on the clock (it is ahead of both the old clock time and the request time).
*/
HybridTimestamp updateClock(HybridTimestamp requestTime);
HybridTimestamp updateAndGetClock(HybridTimestamp requestTime);

/**
* Update the local timestamp in case a timestamp from the request is bigger than the current value of the hybrid clock.
*
* @param requestTime Timestamp from request.
*/
void updateClock(HybridTimestamp requestTime);

/**
* Wait for the clock to reach the given timestamp.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,13 @@ public long currentLong() {
}

@Override
public HybridTimestamp updateClock(HybridTimestamp requestTime) {
return clock.update(requestTime);
public HybridTimestamp updateAndGetClock(HybridTimestamp requestTime) {
return clock.updateAndGetNow(requestTime);
}

@Override
public void updateClock(HybridTimestamp requestTime) {
clock.update(requestTime);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@
package org.apache.ignite.internal.hlc;

/**
* Used to track updates of a {@link HybridClock}: it gets notified each time {@link HybridClock#update(HybridTimestamp)},
* Used to track updates of a {@link HybridClock}: it gets notified each time {@link HybridClock#updateAndGetNow(HybridTimestamp)},
* is invoked.
*/
@FunctionalInterface
public interface ClockUpdateListener {
/**
* Called when the clock's current time advances due to a call to {@link HybridClock#update(HybridTimestamp)}.
* Called when the clock's current timestamp jumps up to the value received from another node due to a call to
* {@link HybridClock#updateAndGetNow(HybridTimestamp)} or {@link HybridClock#update(HybridTimestamp)}.
*
* <p>This does NOT get called when the clock current time gets advanced by a call to
* {@link HybridClock#now()}/{@link HybridClock#nowLong()}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,14 @@ public interface HybridClock {
* @param requestTime Timestamp from request.
* @return New local hybrid timestamp that is on the clock (it is ahead of both the old clock time and the request time).
*/
HybridTimestamp update(HybridTimestamp requestTime);
HybridTimestamp updateAndGetNow(HybridTimestamp requestTime);

/**
* Update the local timestamp in case a timestamp from the request is bigger than the current value of the hybrid clock.
*
* @param requestTime Timestamp from request.
*/
void update(HybridTimestamp requestTime);

/**
* Adds an update listener to self.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,9 @@
import static org.apache.ignite.internal.hlc.HybridTimestamp.LOGICAL_TIME_BITS_SIZE;
import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;

import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.tostring.S;
Expand All @@ -38,15 +37,10 @@ public class HybridClockImpl implements HybridClock {
/**
* Var handle for {@link #latestTime}.
*/
private static final VarHandle LATEST_TIME;

static {
try {
LATEST_TIME = MethodHandles.lookup().findVarHandle(HybridClockImpl.class, "latestTime", long.class);
} catch (NoSuchFieldException | IllegalAccessException e) {
throw new ExceptionInInitializerError(e);
}
}
private static final AtomicLongFieldUpdater<HybridClockImpl> LATEST_TIME = AtomicLongFieldUpdater.newUpdater(
HybridClockImpl.class,
"latestTime"
);

private volatile long latestTime;

Expand Down Expand Up @@ -77,6 +71,10 @@ public long nowLong() {
// Read the latest time after accessing UTC time to reduce contention.
long oldLatestTime = latestTime;

if (oldLatestTime >= now) {
return LATEST_TIME.incrementAndGet(this);
}

long newLatestTime = max(oldLatestTime + 1, now);

if (LATEST_TIME.compareAndSet(this, oldLatestTime, newLatestTime)) {
Expand Down Expand Up @@ -125,19 +123,57 @@ public HybridTimestamp current() {
* @return The resulting timestamp (guaranteed to exceed both previous clock 'currentTs' and the supplied external ts).
*/
@Override
public HybridTimestamp update(HybridTimestamp requestTime) {
public HybridTimestamp updateAndGetNow(HybridTimestamp requestTime) {
long requestTimeLong = requestTime.longValue();

while (true) {
long now = currentTime();

// Read the latest time after accessing UTC time to reduce contention.
long oldLatestTime = this.latestTime;

long newLatestTime = max(requestTime.longValue() + 1, max(now, oldLatestTime + 1));
if (oldLatestTime >= requestTimeLong && oldLatestTime >= now) {
return hybridTimestamp(LATEST_TIME.incrementAndGet(this));
}

if (now > requestTimeLong) {
if (LATEST_TIME.compareAndSet(this, oldLatestTime, now)) {
return hybridTimestamp(now);
}
} else {
long newLatestTime = requestTimeLong + 1;

if (LATEST_TIME.compareAndSet(this, oldLatestTime, newLatestTime)) {
notifyUpdateListeners(newLatestTime);

if (LATEST_TIME.compareAndSet(this, oldLatestTime, newLatestTime)) {
notifyUpdateListeners(newLatestTime);
return hybridTimestamp(newLatestTime);
}
}
}
}

@Override
public void update(HybridTimestamp requestTime) {
long requestTimeLong = requestTime.longValue();

while (true) {
long now = currentTime();

if (requestTimeLong < now) {
return;
}

// Read the latest time after accessing UTC time to reduce contention.
long oldLatestTime = this.latestTime;

if (requestTimeLong <= oldLatestTime) {
return;
}

if (LATEST_TIME.compareAndSet(this, oldLatestTime, requestTimeLong)) {
notifyUpdateListeners(requestTimeLong);

return hybridTimestamp(newLatestTime);
return;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public void testNow() {
}

/**
* Tests a {@link HybridClock#update(HybridTimestamp)}.
* Tests a {@link HybridClock#updateAndGetNow(HybridTimestamp)}.
*/
@Test
public void testTick() {
Expand All @@ -81,25 +81,25 @@ public void testTick() {
HybridClock clock = new HybridClockImpl();

assertTimestampEquals(100, new HybridTimestamp(100, 1),
() -> clock.update(new HybridTimestamp(50, 1)));
() -> clock.updateAndGetNow(new HybridTimestamp(50, 1)));

assertTimestampEquals(100, new HybridTimestamp(100, 2),
() -> clock.update(new HybridTimestamp(60, 1000)));
() -> clock.updateAndGetNow(new HybridTimestamp(60, 1000)));

assertTimestampEquals(200, new HybridTimestamp(200, 0),
() -> clock.update(new HybridTimestamp(70, 1)));
() -> clock.updateAndGetNow(new HybridTimestamp(70, 1)));

assertTimestampEquals(50, new HybridTimestamp(200, 1),
() -> clock.update(new HybridTimestamp(70, 1)));
() -> clock.updateAndGetNow(new HybridTimestamp(70, 1)));

assertTimestampEquals(500, new HybridTimestamp(500, 0),
() -> clock.update(new HybridTimestamp(70, 1)));
() -> clock.updateAndGetNow(new HybridTimestamp(70, 1)));

assertTimestampEquals(500, new HybridTimestamp(600, 1),
() -> clock.update(new HybridTimestamp(600, 0)));
() -> clock.updateAndGetNow(new HybridTimestamp(600, 0)));

assertTimestampEquals(500, new HybridTimestamp(600, 2),
() -> clock.update(new HybridTimestamp(600, 0)));
() -> clock.updateAndGetNow(new HybridTimestamp(600, 0)));
}

private void assertTimestampEquals(long sysTime, HybridTimestamp expTs, Supplier<HybridTimestamp> clo) {
Expand Down Expand Up @@ -146,7 +146,7 @@ void updateListenerGetsNotifiedOnExternalUpdate() {

HybridTimestamp ts = clock.now().addPhysicalTime(TimeUnit.DAYS.toMillis(365));

HybridTimestamp afterUpdate = clock.update(ts);
HybridTimestamp afterUpdate = clock.updateAndGetNow(ts);

verify(updateListener).onUpdate(afterUpdate.longValue());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public HybridTimestamp current() {
* @return The hybrid timestamp.
*/
@Override
public HybridTimestamp update(HybridTimestamp requestTime) {
public HybridTimestamp updateAndGetNow(HybridTimestamp requestTime) {
while (true) {
long now = currentTime();

Expand All @@ -126,6 +126,32 @@ public HybridTimestamp update(HybridTimestamp requestTime) {
}
}

@Override
public void update(HybridTimestamp requestTime) {
long requestTimeLong = requestTime.longValue();

while (true) {
long now = currentTime();

if (requestTimeLong < now) {
return;
}

// Read the latest time after accessing UTC time to reduce contention.
long oldLatestTime = this.latestTime;

if (requestTimeLong <= oldLatestTime) {
return;
}

if (LATEST_TIME.compareAndSet(this, oldLatestTime, requestTimeLong)) {
notifyUpdateListeners(requestTimeLong);

return;
}
}
}

@Override
public void addUpdateListener(ClockUpdateListener listener) {
updateListeners.add(listener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,13 @@ public long currentLong() {
}

@Override
public HybridTimestamp updateClock(HybridTimestamp requestTime) {
return clock.update(requestTime);
public HybridTimestamp updateAndGetClock(HybridTimestamp requestTime) {
return clock.updateAndGetNow(requestTime);
}

@Override
public void updateClock(HybridTimestamp requestTime) {
clock.update(requestTime);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -585,8 +585,8 @@ public HybridTimestamp clockNow() {
return clock.now();
}

public HybridTimestamp clockUpdate(HybridTimestamp timestamp) {
return clock.update(timestamp);
public void clockUpdate(HybridTimestamp timestamp) {
clock.update(timestamp);
}

private boolean initSnapshotStorage() {
Expand Down Expand Up @@ -2244,7 +2244,7 @@ public Message handleAppendEntriesRequest(final AppendEntriesRequest request, fi
.term(this.currTerm);

if (request.timestamp() != null) {
rb.timestamp(clock.update(request.timestamp()));
rb.timestamp(clock.updateAndGetNow(request.timestamp()));
}

return rb.build();
Expand All @@ -2265,7 +2265,7 @@ public Message handleAppendEntriesRequest(final AppendEntriesRequest request, fi
.term(request.term() + 1);

if (request.timestamp() != null) {
rb.timestamp(clock.update(request.timestamp()));
rb.timestamp(clock.updateAndGetNow(request.timestamp()));
}

return rb.build();
Expand Down Expand Up @@ -2298,7 +2298,7 @@ public Message handleAppendEntriesRequest(final AppendEntriesRequest request, fi
.lastLogIndex(lastLogIndex);

if (request.timestamp() != null) {
rb.timestamp(clock.update(request.timestamp()));
rb.timestamp(clock.updateAndGetNow(request.timestamp()));
}

return rb.build();
Expand All @@ -2312,7 +2312,7 @@ public Message handleAppendEntriesRequest(final AppendEntriesRequest request, fi
.term(this.currTerm)
.lastLogIndex(this.logManager.getLastLogIndex());
if (request.timestamp() != null) {
respBuilder.timestamp(clock.update(request.timestamp()));
respBuilder.timestamp(clock.updateAndGetNow(request.timestamp()));
}
doUnlock = false;
this.writeLock.unlock();
Expand All @@ -2333,7 +2333,7 @@ public Message handleAppendEntriesRequest(final AppendEntriesRequest request, fi
.term(this.currTerm);

if (request.timestamp() != null) {
rb.timestamp(clock.update(request.timestamp()));
rb.timestamp(clock.updateAndGetNow(request.timestamp()));
}

return rb.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1007,7 +1007,7 @@ private void sendReplicaUnavailableErrorResponse(
groupId,
clusterNetSvc.topologyService().localMember())
)
.timestamp(clockService.updateClock(requestTimestamp))
.timestamp(clockService.updateAndGetClock(requestTimestamp))
.build(),
correlationId);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public class AbstractMultiNodeBenchmark {

@Nullable
protected String clusterConfiguration() {
return null;
return "";
}

/**
Expand Down