Skip to content

Commit 7e41e49

Browse files
authored
- Fix InternalDelete elision from HashBucketEntry (set entire word to 0) (#866)
- Fix iterators to jump ahead to BeginAddress if they're below it, rather than throw - Remove unused HashBucketEntry.Pending - Remove unnecessary ReadCacheEvictChain call in DetachAndReattachReadCacheChain
1 parent 60e162f commit 7e41e49

13 files changed

+207
-56
lines changed

cs/src/core/Allocator/BlittableScanIterator.cs

+1-4
Original file line numberDiff line numberDiff line change
@@ -93,10 +93,7 @@ public unsafe bool GetNext(out RecordInfo recordInfo)
9393
var headAddress = hlog.HeadAddress;
9494

9595
if (currentAddress < hlog.BeginAddress && !forceInMemory)
96-
{
97-
epoch?.Suspend();
98-
throw new FasterException("Iterator address is less than log BeginAddress " + hlog.BeginAddress);
99-
}
96+
currentAddress = hlog.BeginAddress;
10097

10198
// If currentAddress < headAddress and we're not buffering and not guaranteeing the records are in memory, fail.
10299
if (frameSize == 0 && currentAddress < headAddress && !forceInMemory)

cs/src/core/Allocator/GenericScanIterator.cs

+1-4
Original file line numberDiff line numberDiff line change
@@ -90,10 +90,7 @@ public unsafe bool GetNext(out RecordInfo recordInfo)
9090
var headAddress = hlog.HeadAddress;
9191

9292
if (currentAddress < hlog.BeginAddress)
93-
{
94-
epoch?.Suspend();
95-
throw new FasterException("Iterator address is less than log BeginAddress " + hlog.BeginAddress);
96-
}
93+
currentAddress = hlog.BeginAddress;
9794

9895
// If currentAddress < headAddress and we're not buffering, fail.
9996
if (frameSize == 0 && currentAddress < headAddress)

cs/src/core/Allocator/VarLenBlittableScanIterator.cs

+1-4
Original file line numberDiff line numberDiff line change
@@ -96,10 +96,7 @@ public unsafe bool GetNext(out RecordInfo recordInfo)
9696
long headAddress = hlog.HeadAddress;
9797

9898
if (currentAddress < hlog.BeginAddress && !forceInMemory)
99-
{
100-
epoch?.Suspend();
101-
throw new FasterException("Iterator address is less than log BeginAddress " + hlog.BeginAddress);
102-
}
99+
currentAddress = hlog.BeginAddress;
103100

104101
// If currentAddress < headAddress and we're not buffering and not guaranteeing the records are in memory, fail.
105102
if (frameSize == 0 && currentAddress < headAddress && !forceInMemory)

cs/src/core/Index/FASTER/FASTERBase.cs

+1-15
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
using System.Runtime.CompilerServices;
77
using System.Runtime.InteropServices;
88
using System.Threading;
9-
using FASTER.core;
109
using Microsoft.Extensions.Logging;
1110

1211
namespace FASTER.core
@@ -251,18 +250,6 @@ public ushort Tag
251250
}
252251
}
253252

254-
public bool Pending
255-
{
256-
readonly get => (word & Constants.kPendingBitMask) != 0;
257-
set
258-
{
259-
if (value)
260-
word |= Constants.kPendingBitMask;
261-
else
262-
word &= ~Constants.kPendingBitMask;
263-
}
264-
}
265-
266253
public bool Tentative
267254
{
268255
readonly get => (word & Constants.kTentativeBitMask) != 0;
@@ -291,7 +278,7 @@ public override string ToString()
291278
{
292279
var addrRC = this.ReadCache ? "(rc)" : string.Empty;
293280
static string bstr(bool value) => value ? "T" : "F";
294-
return $"addr {this.AbsoluteAddress}{addrRC}, tag {Tag}, tent {bstr(Tentative)}, pend {bstr(Pending)}";
281+
return $"addr {this.AbsoluteAddress}{addrRC}, tag {Tag}, tent {bstr(Tentative)}";
295282
}
296283
}
297284

@@ -486,7 +473,6 @@ internal void FindOrCreateTag(ref HashEntryInfo hei, long BeginAddress)
486473
hei.entry = default;
487474
hei.entry.Tag = hei.tag;
488475
hei.entry.Address = Constants.kTempInvalidAddress;
489-
hei.entry.Pending = false;
490476
hei.entry.Tentative = true;
491477

492478
// Insert the tag into this slot. Failure means another session inserted a key into that slot, so continue the loop to find another free slot.

cs/src/core/Index/FASTER/Implementation/HashEntryInfo.cs

-1
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,6 @@ internal bool TryCAS(long newLogicalAddress, ushort tag)
8686
{
8787
Tag = tag,
8888
Address = newLogicalAddress & Constants.kAddressMask,
89-
Pending = this.entry.Pending,
9089
Tentative = false
9190
// .ReadCache is included in newLogicalAddress
9291
};

cs/src/core/Index/FASTER/Implementation/InternalDelete.cs

+4-5
Original file line numberDiff line numberDiff line change
@@ -111,13 +111,12 @@ internal OperationStatus InternalDelete<Input, Output, Context, FasterSession>(r
111111
if (WriteDefaultOnDelete)
112112
recordValue = default;
113113

114-
// Try to update hash chain and completely elide record iff previous address points to invalid address, to avoid re-enabling a prior version of this record.
115-
if (stackCtx.hei.Address == stackCtx.recSrc.LogicalAddress && !fasterSession.IsManualLocking && srcRecordInfo.PreviousAddress < hlog.BeginAddress)
114+
// Try to update hash chain to elide record iff previous address points to invalid address, to avoid re-enabling a prior version of this record.
115+
if (stackCtx.hei.Address == stackCtx.recSrc.LogicalAddress && srcRecordInfo.PreviousAddress < hlog.BeginAddress)
116116
{
117117
// Ignore return value; this is a performance optimization to keep the hash table clean if we can, so if we fail it just means
118-
// the hashtable entry has already been updated by someone else.
119-
var address = (srcRecordInfo.PreviousAddress == Constants.kTempInvalidAddress) ? Constants.kInvalidAddress : srcRecordInfo.PreviousAddress;
120-
stackCtx.hei.TryCAS(address, tag: 0);
118+
// the hashtable entry has already been updated by someone else. Set the entry.word to zero.
119+
stackCtx.hei.TryCAS(Constants.kInvalidAddress, tag: 0);
121120
}
122121

123122
status = OperationStatusUtils.AdvancedOpCode(OperationStatus.SUCCESS, StatusCode.InPlaceUpdatedRecord);

cs/src/core/Index/FASTER/Implementation/ReadCache.cs

+11-16
Original file line numberDiff line numberDiff line change
@@ -127,32 +127,30 @@ private bool DetachAndReattachReadCacheChain(ref Key key, ref OperationStackCont
127127
goto Success;
128128

129129
// Traverse from highestRcAddress to the splice point, invalidating any record matching the key.
130-
for (bool found = false; /* tested in loop */; /* incremented in loop */)
130+
for (bool foundKey = false; /* tested in loop */; /* incremented in loop */)
131131
{
132132
var physicalAddress = readcache.GetPhysicalAddress(AbsoluteAddress(lowestRcAddress));
133133
ref RecordInfo recordInfo = ref readcache.GetInfo(physicalAddress);
134-
if (!found && !recordInfo.Invalid && comparer.Equals(ref key, ref readcache.GetKey(physicalAddress)))
134+
if (!foundKey && !recordInfo.Invalid && comparer.Equals(ref key, ref readcache.GetKey(physicalAddress)))
135135
{
136-
found = true;
136+
foundKey = true;
137137
recordInfo.SetInvalidAtomic(); // Atomic needed due to other threads (e.g. ReadCacheEvict) possibly being in this chain before we detached it.
138138
}
139139

140140
// See if we're at the last entry in the readcache prefix.
141141
if (!IsReadCache(recordInfo.PreviousAddress) || AbsoluteAddress(recordInfo.PreviousAddress) < readcache.HeadAddress)
142142
{
143143
// Splice the new recordInfo into the local chain. Use atomic due to other threads (e.g. ReadCacheEvict) possibly being in this
144-
// before we detached it, and setting the record to Invalid (no other thread will be updating anything else in the chain, though).
144+
// before we detached it, and setting the record to Invalid (no other thread will be updating anything else in the chain, though;
145+
// we own the chain by virtue of the CAS above, and nobody else can change recordInfo.PreviousAddress).
145146
while (!recordInfo.TryUpdateAddress(recordInfo.PreviousAddress, newLogicalAddress))
146147
Thread.Yield();
147148

148-
// Now try to CAS the chain into the HashBucketEntry. If it fails, give up; we lose those readcache records.
149-
// Trying to handle conflicts would require evaluating whether other threads had inserted keys in our chain, and it's too rare to worry about.
150-
if (stackCtx.hei.TryCAS(highestRcAddress))
151-
{
152-
// If we go below readcache.HeadAddress ReadCacheEvict may race past us, so make sure the lowest address is still in range.
153-
while (lowestRcAddress < readcache.HeadAddress)
154-
lowestRcAddress = ReadCacheEvictChain(readcache.HeadAddress, ref stackCtx.hei);
155-
}
149+
// Now try to CAS the chain into the HashBucketEntry. Ignore the result; if it fails, we just lose these readcache records.
150+
// Conflict handling would require evaluating whether other threads had inserted keys in our chain, and it's too rare to worry about.
151+
// We have ensured all records in this readcache prefix chain are >= readcache.HeadAddress, so even if readcache.HeadAddress changes,
152+
// ReadCacheEvict will not be called for any records in this readcache prefix chain until we return from here and release the epoch.
153+
stackCtx.hei.TryCAS(highestRcAddress);
156154
goto Success;
157155
}
158156
lowestRcAddress = recordInfo.PreviousAddress;
@@ -347,14 +345,13 @@ internal void ReadCacheEvict(long rcLogicalAddress, long rcToLogicalAddress)
347345
}
348346
}
349347

350-
private long ReadCacheEvictChain(long rcToLogicalAddress, ref HashEntryInfo hei)
348+
private void ReadCacheEvictChain(long rcToLogicalAddress, ref HashEntryInfo hei)
351349
{
352350
// Traverse the chain of readcache entries for this key, looking "ahead" to .PreviousAddress to see if it is less than readcache.HeadAddress.
353351
// nextPhysicalAddress remains Constants.kInvalidAddress if hei.Address is < HeadAddress; othrwise, it is the lowest-address readcache record
354352
// remaining following this eviction, and its .PreviousAddress is updated to each lower record in turn until we hit a non-readcache record.
355353
long nextPhysicalAddress = Constants.kInvalidAddress;
356354
HashBucketEntry entry = new() { word = hei.entry.word };
357-
long lowestAddress = entry.Address;
358355
while (entry.ReadCache)
359356
{
360357
var la = entry.AbsoluteAddress;
@@ -393,10 +390,8 @@ private long ReadCacheEvictChain(long rcToLogicalAddress, ref HashEntryInfo hei)
393390
ri.PreviousAddress = Constants.kTempInvalidAddress; // The record is no longer in the chain
394391
else
395392
hei.SetToCurrent();
396-
lowestAddress = entry.Address;
397393
entry.word = hei.entry.word;
398394
}
399-
return lowestAddress;
400395
}
401396
}
402397
}

cs/src/core/Index/FASTER/Implementation/RecordSource.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ internal struct RecordSource<Key, Value>
3030
internal long PhysicalAddress;
3131

3232
/// <summary>
33-
/// The highest logical address (in the main log, i.e. below readcache) for this key; if we have a readcache prefix chain, this is the splice point.
33+
/// The highest logical address in the main log (i.e. below readcache) for this key; if we have a readcache prefix chain, this is the splice point.
3434
/// </summary>
3535
internal long LatestLogicalAddress;
3636

cs/src/core/Index/Recovery/Recovery.cs

-2
Original file line numberDiff line numberDiff line change
@@ -971,7 +971,6 @@ private unsafe bool RecoverFromPage(long startRecoveryAddress,
971971
{
972972
hei.entry.Address = pageLogicalAddress + pointer;
973973
hei.entry.Tag = hei.tag;
974-
hei.entry.Pending = false;
975974
hei.entry.Tentative = false;
976975
hei.bucket->bucket_entries[hei.slot] = hei.entry.word;
977976
}
@@ -983,7 +982,6 @@ private unsafe bool RecoverFromPage(long startRecoveryAddress,
983982
{
984983
hei.entry.Address = info.PreviousAddress;
985984
hei.entry.Tag = hei.tag;
986-
hei.entry.Pending = false;
987985
hei.entry.Tentative = false;
988986
hei.bucket->bucket_entries[hei.slot] = hei.entry.word;
989987
}

cs/test/BlittableLogScanTests.cs

+47-2
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
using System;
55
using FASTER.core;
66
using NUnit.Framework;
7-
using static FASTER.test.BlittableIterationTests;
87
using static FASTER.test.TestUtils;
98

109
namespace FASTER.test
@@ -20,7 +19,7 @@ internal class BlittableFASTERScanTests
2019
public void Setup()
2120
{
2221
DeleteDirectory(MethodTestDir, wait:true);
23-
log = Devices.CreateLogDevice(MethodTestDir + "/BlittableFASTERScanTests.log", deleteOnClose: true);
22+
log = Devices.CreateLogDevice(MethodTestDir + "/test.log", deleteOnClose: true);
2423
fht = new FasterKV<KeyStruct, ValueStruct>
2524
(1L << 20, new LogSettings { LogDevice = log, MemorySizeBits = 15, PageSizeBits = 9 }, lockingMode: LockingMode.None);
2625
}
@@ -100,6 +99,52 @@ void scanAndVerify(ScanBufferingMode sbm)
10099
scanAndVerify(ScanBufferingMode.DoublePageBuffering);
101100
}
102101

102+
[Test]
103+
[Category("FasterKV")]
104+
[Category("Smoke")]
105+
106+
public void BlittableScanJumpToBeginAddressTest()
107+
{
108+
using var session = fht.For(new Functions()).NewSession<Functions>();
109+
110+
const int numRecords = 200;
111+
const int numTailRecords = 10;
112+
long shiftBeginAddressTo = 0;
113+
int shiftToKey = 0;
114+
for (int i = 0; i < numRecords; i++)
115+
{
116+
if (i == numRecords - numTailRecords)
117+
{
118+
shiftBeginAddressTo = fht.Log.TailAddress;
119+
shiftToKey = i;
120+
}
121+
var key = new KeyStruct { kfield1 = i, kfield2 = i + 1 };
122+
var value = new ValueStruct { vfield1 = i, vfield2 = i + 1 };
123+
session.Upsert(ref key, ref value, Empty.Default, 0);
124+
}
125+
126+
using var iter = fht.Log.Scan(fht.Log.HeadAddress, fht.Log.TailAddress);
127+
128+
for (int i = 0; i < 100; ++i)
129+
{
130+
Assert.IsTrue(iter.GetNext(out var recordInfo));
131+
Assert.AreEqual(i, iter.GetKey().kfield1);
132+
Assert.AreEqual(i, iter.GetValue().vfield1);
133+
}
134+
135+
fht.Log.ShiftBeginAddress(shiftBeginAddressTo);
136+
137+
for (int i = 0; i < numTailRecords; ++i)
138+
{
139+
Assert.IsTrue(iter.GetNext(out var recordInfo));
140+
if (i == 0)
141+
Assert.AreEqual(fht.Log.BeginAddress, iter.CurrentAddress);
142+
var expectedKey = numRecords - numTailRecords + i;
143+
Assert.AreEqual(expectedKey, iter.GetKey().kfield1);
144+
Assert.AreEqual(expectedKey, iter.GetValue().vfield1);
145+
}
146+
}
147+
103148
class LogObserver : IObserver<IFasterScanIterator<KeyStruct, ValueStruct>>
104149
{
105150
int val = 0;

cs/test/GenericLogScanTests.cs

+53
Original file line numberDiff line numberDiff line change
@@ -134,5 +134,58 @@ public void OnNext(IFasterScanIterator<MyKey, MyValue> iter)
134134
}
135135
}
136136
}
137+
138+
[Test]
139+
[Category("FasterKV")]
140+
[Category("Smoke")]
141+
142+
public void BlittableScanJumpToBeginAddressTest()
143+
{
144+
log = Devices.CreateLogDevice($"{MethodTestDir}/test.log");
145+
objlog = Devices.CreateLogDevice($"{MethodTestDir}/test.obj.log");
146+
fht = new(128,
147+
logSettings: new LogSettings { LogDevice = log, ObjectLogDevice = objlog, MutableFraction = 0.1, MemorySizeBits = 20, PageSizeBits = 15, SegmentSizeBits = 18 },
148+
serializerSettings: new SerializerSettings<MyKey, MyValue> { keySerializer = () => new MyKeySerializer(), valueSerializer = () => new MyValueSerializer() },
149+
lockingMode: LockingMode.None);
150+
151+
using var session = fht.For(new MyFunctions()).NewSession<MyFunctions>();
152+
153+
const int numRecords = 200;
154+
const int numTailRecords = 10;
155+
long shiftBeginAddressTo = 0;
156+
int shiftToKey = 0;
157+
for (int i = 0; i < numRecords; i++)
158+
{
159+
if (i == numRecords - numTailRecords)
160+
{
161+
shiftBeginAddressTo = fht.Log.TailAddress;
162+
shiftToKey = i;
163+
}
164+
var key = new MyKey { key = i };
165+
var value = new MyValue { value = i };
166+
session.Upsert(ref key, ref value, Empty.Default, 0);
167+
}
168+
169+
using var iter = fht.Log.Scan(fht.Log.HeadAddress, fht.Log.TailAddress);
170+
171+
for (int i = 0; i < 100; ++i)
172+
{
173+
Assert.IsTrue(iter.GetNext(out var recordInfo));
174+
Assert.AreEqual(i, iter.GetKey().key);
175+
Assert.AreEqual(i, iter.GetValue().value);
176+
}
177+
178+
fht.Log.ShiftBeginAddress(shiftBeginAddressTo);
179+
180+
for (int i = 0; i < numTailRecords; ++i)
181+
{
182+
Assert.IsTrue(iter.GetNext(out var recordInfo));
183+
if (i == 0)
184+
Assert.AreEqual(fht.Log.BeginAddress, iter.CurrentAddress);
185+
var expectedKey = numRecords - numTailRecords + i;
186+
Assert.AreEqual(expectedKey, iter.GetKey().key);
187+
Assert.AreEqual(expectedKey, iter.GetValue().value);
188+
}
189+
}
137190
}
138191
}

cs/test/RecoverContinueTests.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
using NUnit.Framework;
99
using System.Threading.Tasks;
1010

11-
namespace FASTER.test.recovery.sumstore.recover_continue
11+
namespace FASTER.test.recovery.sumstore.cntinue
1212
{
1313
[TestFixture]
1414
internal class RecoverContinueTests
@@ -24,7 +24,7 @@ internal class RecoverContinueTests
2424
public void Setup()
2525
{
2626
TestUtils.DeleteDirectory(TestUtils.MethodTestDir, wait: true);
27-
log = Devices.CreateLogDevice(TestUtils.MethodTestDir + "/RecoverContinueTests.log", deleteOnClose: true);
27+
log = Devices.CreateLogDevice(TestUtils.MethodTestDir + "/test.log", deleteOnClose: true);
2828
checkpointDir = TestUtils.MethodTestDir + "/checkpoints3";
2929
Directory.CreateDirectory(checkpointDir);
3030

0 commit comments

Comments
 (0)