From f2510783f9ed70172bd1a2ab9faa40afc99d54d5 Mon Sep 17 00:00:00 2001 From: Nadav Gigi <95503908+NadavGigi@users.noreply.github.com> Date: Thu, 23 Jan 2025 13:17:20 +0200 Subject: [PATCH] Accelerate hash table iterator with value prefetching (#1568) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This PR builds upon the [previous entry prefetching optimization](https://github.com/valkey-io/valkey/pull/1501) to further enhance performance by implementing value prefetching for hashtable iterators. ## Implementation Modified `hashtableInitIterator` to accept a new flags parameter, allowing control over iterator behavior. Implemented conditional value prefetching within `hashtableNext` based on the new `HASHTABLE_ITER_PREFETCH_VALUES` flag. When the flag is set, hashtableNext now calls `prefetchBucketValues` at the start of each new bucket, preemptively loading the values of filled entries into the CPU cache. The actual prefetching of values is performed using type-specific callback functions implemented in `server.c`: - For `robj` the `hashtableObjectPrefetchValue` callback is used to prefetch the value if not embeded. This implementation is specifically focused on main database iterations at this stage. Applying it to hashtables that hold other object types should not be problematic, but its performance benefits for those cases will need to be proven through testing and benchmarking. ## Performance ### Setup: - 64cores Graviton 3 Amazon EC2 instance. - 50 mil keys with different value sizes. - Running valkey server over RAM file system. - crc checksum and comperssion off. ### Action - save command. ### Results The results regarding the duration of “save” command was taken from “info all” command. ``` +--------------------+------------------+------------------+ | Prefetching | Value size (byte)| Time (seconds) | +--------------------+------------------+------------------+ | No | 100 | 20.112279 | | Yes | 100 | 12.758519 | | No | 40 | 16.945366 | | Yes | 40 | 10.902022 | | No | 20 | 9.817000 | | Yes | 20 | 9.626821 | | No | 10 | 9.71510 | | Yes | 10 | 9.510565 | +--------------------+------------------+------------------+ ``` The results largely align with our expectations, showing significant improvements for larger values (100 bytes and 40 bytes) that are stored outside the robj. For smaller values (20 bytes and 10 bytes) that are embedded within the robj, we see almost no improvement, which is as expected. However, the small improvement observed even for these embedded values is somewhat surprising. Given that we are not actively prefetching these embedded values, this minor performance gain was not anticipated. perf record on save command **without** value prefetching: ``` --99.98%--rdbSaveDb | |--91.38%--rdbSaveKeyValuePair | | | |--42.72%--rdbSaveRawString | | | | | |--26.69%--rdbWriteRaw | | | | | | | --25.75%--rioFileWrite.lto_priv.0 | | | | | --15.41%--rdbSaveLen | | | | | |--7.58%--rdbWriteRaw | | | | | | | --7.08%--rioFileWrite.lto_priv.0 | | | | | | | --6.54%--_IO_fwrite | | | | | | | | --7.42%--rdbWriteRaw.constprop.1 | | | | | --7.18%--rioFileWrite.lto_priv.0 | | | | | --6.73%--_IO_fwrite | | | | | |--40.44%--rdbSaveStringObject | | | --7.62%--rdbSaveObjectType | | | --7.39%--rdbWriteRaw.constprop.1 | | | --7.04%--rioFileWrite.lto_priv.0 | | | --6.59%--_IO_fwrite | | --7.33%--hashtableNext.constprop.1 | --6.28%--prefetchNextBucketEntries.lto_priv.0 ``` perf record on save command **with** value prefetching: ``` rdbSaveRio | --99.93%--rdbSaveDb | |--79.81%--rdbSaveKeyValuePair | | | |--66.79%--rdbSaveRawString | | | | | |--42.31%--rdbWriteRaw | | | | | | | --40.74%--rioFileWrite.lto_priv.0 | | | | | --23.37%--rdbSaveLen | | | | | |--11.78%--rdbWriteRaw | | | | | | | --11.03%--rioFileWrite.lto_priv.0 | | | | | | | --10.30%--_IO_fwrite | | | | | | | | | --10.98%--rdbWriteRaw.constprop.1 | | | | | --10.44%--rioFileWrite.lto_priv.0 | | | | | --9.74%--_IO_fwrite | | | | | | |--11.33%--rdbSaveObjectType | | | | | --10.96%--rdbWriteRaw.constprop.1 | | | | | --10.51%--rioFileWrite.lto_priv.0 | | | | | --9.75%--_IO_fwrite | | | | | | --0.77%--rdbSaveStringObject | --18.39%--hashtableNext | |--10.04%--hashtableObjectPrefetchValue | --6.06%--prefetchNextBucketEntries ``` Conclusions: The prefetching strategy appears to be working as intended, shifting the performance bottleneck from data access to I/O operations. The significant reduction in rdbSaveStringObject time suggests that string objects(which are the values) are being accessed more efficiently. Signed-off-by: NadavGigi --- src/acl.c | 8 +-- src/aof.c | 4 +- src/cluster.c | 2 +- src/cluster_legacy.c | 2 +- src/db.c | 4 +- src/debug.c | 4 +- src/hashtable.c | 110 ++++++++++++++++++++++++-------------- src/hashtable.h | 13 +++-- src/kvstore.c | 22 +++----- src/kvstore.h | 5 +- src/latency.c | 4 +- src/module.c | 4 +- src/object.c | 8 +-- src/pubsub.c | 4 +- src/rdb.c | 6 +-- src/server.c | 29 ++++++---- src/sort.c | 2 +- src/t_hash.c | 2 +- src/t_set.c | 4 +- src/t_zset.c | 6 +-- src/unit/test_hashtable.c | 6 +-- src/unit/test_kvstore.c | 8 +-- 22 files changed, 147 insertions(+), 110 deletions(-) diff --git a/src/acl.c b/src/acl.c index 184fa54116..807ef744d2 100644 --- a/src/acl.c +++ b/src/acl.c @@ -655,7 +655,7 @@ void ACLChangeSelectorPerm(aclSelector *selector, struct serverCommand *cmd, int ACLResetFirstArgsForCommand(selector, id); if (cmd->subcommands_ht) { hashtableIterator iter; - hashtableInitSafeIterator(&iter, cmd->subcommands_ht); + hashtableInitIterator(&iter, cmd->subcommands_ht, HASHTABLE_ITER_SAFE); void *next; while (hashtableNext(&iter, &next)) { struct serverCommand *sub = next; @@ -673,7 +673,7 @@ void ACLChangeSelectorPerm(aclSelector *selector, struct serverCommand *cmd, int * found and the operation was performed. */ void ACLSetSelectorCommandBitsForCategory(hashtable *commands, aclSelector *selector, uint64_t cflag, int value) { hashtableIterator iter; - hashtableInitIterator(&iter, commands); + hashtableInitIterator(&iter, commands, 0); void *next; while (hashtableNext(&iter, &next)) { struct serverCommand *cmd = next; @@ -741,7 +741,7 @@ void ACLCountCategoryBitsForCommands(hashtable *commands, unsigned long *off, uint64_t cflag) { hashtableIterator iter; - hashtableInitIterator(&iter, commands); + hashtableInitIterator(&iter, commands, 0); void *next; while (hashtableNext(&iter, &next)) { struct serverCommand *cmd = next; @@ -2765,7 +2765,7 @@ sds getAclErrorMessage(int acl_res, user *user, struct serverCommand *cmd, sds e /* ACL CAT category */ void aclCatWithFlags(client *c, hashtable *commands, uint64_t cflag, int *arraylen) { hashtableIterator iter; - hashtableInitIterator(&iter, commands); + hashtableInitIterator(&iter, commands, 0); void *next; while (hashtableNext(&iter, &next)) { struct serverCommand *cmd = next; diff --git a/src/aof.c b/src/aof.c index 024cdb2771..c6828d4b6e 100644 --- a/src/aof.c +++ b/src/aof.c @@ -1891,7 +1891,7 @@ int rewriteSortedSetObject(rio *r, robj *key, robj *o) { } else if (o->encoding == OBJ_ENCODING_SKIPLIST) { zset *zs = o->ptr; hashtableIterator iter; - hashtableInitIterator(&iter, zs->ht); + hashtableInitIterator(&iter, zs->ht, 0); void *next; while (hashtableNext(&iter, &next)) { zskiplistNode *node = next; @@ -2217,7 +2217,7 @@ int rewriteAppendOnlyFileRio(rio *aof) { if (rioWrite(aof, selectcmd, sizeof(selectcmd) - 1) == 0) goto werr; if (rioWriteBulkLongLong(aof, j) == 0) goto werr; - kvs_it = kvstoreIteratorInit(db->keys); + kvs_it = kvstoreIteratorInit(db->keys, HASHTABLE_ITER_SAFE | HASHTABLE_ITER_PREFETCH_VALUES); /* Iterate this DB writing every entry */ void *next; while (kvstoreIteratorNext(kvs_it, &next)) { diff --git a/src/cluster.c b/src/cluster.c index 309279e0be..cedcd9ecb1 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -910,7 +910,7 @@ void clusterCommand(client *c) { unsigned int numkeys = maxkeys > keys_in_slot ? keys_in_slot : maxkeys; addReplyArrayLen(c, numkeys); kvstoreHashtableIterator *kvs_di = NULL; - kvs_di = kvstoreGetHashtableIterator(server.db->keys, slot); + kvs_di = kvstoreGetHashtableIterator(server.db->keys, slot, 0); for (unsigned int i = 0; i < numkeys; i++) { void *next; serverAssert(kvstoreHashtableIteratorNext(kvs_di, &next)); diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index 94d3532dfc..5e976d3060 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -6347,7 +6347,7 @@ unsigned int delKeysInSlot(unsigned int hashslot) { kvstoreHashtableIterator *kvs_di = NULL; void *next; - kvs_di = kvstoreGetHashtableSafeIterator(server.db->keys, hashslot); + kvs_di = kvstoreGetHashtableIterator(server.db->keys, hashslot, HASHTABLE_ITER_SAFE); while (kvstoreHashtableIteratorNext(kvs_di, &next)) { robj *valkey = next; enterExecutionUnit(1, 0); diff --git a/src/db.c b/src/db.c index 535d493954..f2a000030b 100644 --- a/src/db.c +++ b/src/db.c @@ -895,9 +895,9 @@ void keysCommand(client *c) { kvstoreHashtableIterator *kvs_di = NULL; kvstoreIterator *kvs_it = NULL; if (pslot != -1) { - kvs_di = kvstoreGetHashtableSafeIterator(c->db->keys, pslot); + kvs_di = kvstoreGetHashtableIterator(c->db->keys, pslot, HASHTABLE_ITER_SAFE); } else { - kvs_it = kvstoreIteratorInit(c->db->keys); + kvs_it = kvstoreIteratorInit(c->db->keys, HASHTABLE_ITER_SAFE); } void *next; while (kvs_di ? kvstoreHashtableIteratorNext(kvs_di, &next) : kvstoreIteratorNext(kvs_it, &next)) { diff --git a/src/debug.c b/src/debug.c index 915e0c264d..b7f8df04fa 100644 --- a/src/debug.c +++ b/src/debug.c @@ -207,7 +207,7 @@ void xorObjectDigest(serverDb *db, robj *keyobj, unsigned char *digest, robj *o) } else if (o->encoding == OBJ_ENCODING_SKIPLIST) { zset *zs = o->ptr; hashtableIterator iter; - hashtableInitIterator(&iter, zs->ht); + hashtableInitIterator(&iter, zs->ht, 0); void *next; while (hashtableNext(&iter, &next)) { @@ -291,7 +291,7 @@ void computeDatasetDigest(unsigned char *final) { for (int j = 0; j < server.dbnum; j++) { serverDb *db = server.db + j; if (kvstoreSize(db->keys) == 0) continue; - kvstoreIterator *kvs_it = kvstoreIteratorInit(db->keys); + kvstoreIterator *kvs_it = kvstoreIteratorInit(db->keys, HASHTABLE_ITER_SAFE | HASHTABLE_ITER_PREFETCH_VALUES); /* hash the DB id, so the same dataset moved in a different DB will lead to a different digest */ aux = htonl(j); diff --git a/src/hashtable.c b/src/hashtable.c index 3f1eff19c1..23097eb246 100644 --- a/src/hashtable.c +++ b/src/hashtable.c @@ -300,7 +300,7 @@ typedef struct { long index; uint16_t pos_in_bucket; uint8_t table; - uint8_t safe; + uint8_t flags; union { /* Unsafe iterator fingerprint for misuse detection. */ uint64_t fingerprint; @@ -936,6 +936,7 @@ static inline incrementalFind *incrementalFindFromOpaque(hashtableIncrementalFin /* Prefetches all filled entries in the given bucket to optimize future memory access. */ static void prefetchBucketEntries(bucket *b) { + if (!b->presence) return; for (int pos = 0; pos < numBucketPositions(b); pos++) { if (isPositionFilled(b, pos)) { valkey_prefetch(b->entries[pos]); @@ -979,6 +980,26 @@ static void prefetchNextBucketEntries(iter *iter, bucket *current_bucket) { } } +/* Prefetches the values associated with the entries in the given bucket by + * calling the entryPrefetchValue callback in the hashtableType */ +static void prefetchBucketValues(bucket *b, hashtable *ht) { + if (!b->presence) return; + assert(ht->type->entryPrefetchValue != NULL); + for (int pos = 0; pos < numBucketPositions(b); pos++) { + if (isPositionFilled(b, pos)) { + ht->type->entryPrefetchValue(b->entries[pos]); + } + } +} + +static inline int isSafe(iter *iter) { + return (iter->flags & HASHTABLE_ITER_SAFE); +} + +static inline int shouldPrefetchValues(iter *iter) { + return (iter->flags & HASHTABLE_ITER_PREFETCH_VALUES); +} + /* --- API functions --- */ /* Allocates and initializes a new hashtable specified by the given type. */ @@ -1792,31 +1813,32 @@ size_t hashtableScanDefrag(hashtable *ht, size_t cursor, hashtableScanFunction f /* --- Iterator --- */ -/* Initialize a iterator, that is not allowed to insert, delete or even lookup - * entries in the hashtable, because such operations can trigger incremental - * rehashing which moves entries around and confuses the iterator. Only - * hashtableNext is allowed. Each entry is returned exactly once. Call - * hashtableResetIterator when you are done. See also - * hashtableInitSafeIterator. */ -void hashtableInitIterator(hashtableIterator *iterator, hashtable *ht) { - iter *iter; - iter = iteratorFromOpaque(iterator); - iter->hashtable = ht; - iter->table = 0; - iter->index = -1; - iter->safe = 0; -} - -/* Initialize a safe iterator, which is allowed to modify the hash table while - * iterating. It pauses incremental rehashing to prevent entries from moving - * around. Call hashtableNext to fetch each entry. You must call - * hashtableResetIterator when you are done with a safe iterator. +/* Initialize an iterator for a hashtable. * - * It's allowed to insert and replace entries. Deleting entries is only allowed - * for the entry that was just returned by hashtableNext. Deleting other entries - * is possible, but doing so can cause internal fragmentation, so don't. + * The 'flags' argument can be used to tweak the behaviour. It's a bitwise-or + * (zero means no flags) of the following: + * + * - HASHTABLE_ITER_SAFE: Use a safe iterator that can handle + * modifications to the hash table while iterating. + * - HASHTABLE_ITER_PREFETCH_VALUES: Enables prefetching of entries values, + * which can improve performance in some scenarios. Because the hashtable is generic and + * doesn't care which object we store, the callback entryPrefetchValue must be set to help + * us prefetch necessary fields of specific object types stored in the hashtable. + * + * For a non-safe iterator (default, when HASHTABLE_ITER_SAFE is not set): + * It is not allowed to insert, delete or even lookup entries in the hashtable, + * because such operations can trigger incremental rehashing which moves entries + * around and confuses the iterator. Only hashtableNext is allowed. Each entry + * is returned exactly once. + * + * For a safe iterator (when HASHTABLE_ITER_SAFE is set): + * It is allowed to modify the hash table while iterating. It pauses incremental + * rehashing to prevent entries from moving around. It's allowed to insert and + * replace entries. Deleting entries is only allowed for the entry that was just + * returned by hashtableNext. Deleting other entries is possible, but doing so + * can cause internal fragmentation, so don't. * - * Guarantees: + * Guarantees for safe iterators: * * - Entries that are in the hash table for the entire iteration are returned * exactly once. @@ -1829,18 +1851,31 @@ void hashtableInitIterator(hashtableIterator *iterator, hashtable *ht) { * * - Entries that are inserted during the iteration may or may not be returned * by the iterator. + * + * Call hashtableNext to fetch each entry. You must call hashtableResetIterator + * when you are done with the iterator. */ -void hashtableInitSafeIterator(hashtableIterator *iterator, hashtable *ht) { - hashtableInitIterator(iterator, ht); +void hashtableInitIterator(hashtableIterator *iterator, hashtable *ht, uint8_t flags) { + iter *iter; + iter = iteratorFromOpaque(iterator); + iter->hashtable = ht; + iter->table = 0; + iter->index = -1; + iter->flags = flags; +} + +/* Reinitializes the iterator for the provided hashtable while + * preserving the flags from its previous initialization. */ +void hashtableReinitIterator(hashtableIterator *iterator, hashtable *ht) { iter *iter = iteratorFromOpaque(iterator); - iter->safe = 1; + hashtableInitIterator(iterator, ht, iter->flags); } /* Resets a stack-allocated iterator. */ void hashtableResetIterator(hashtableIterator *iterator) { iter *iter = iteratorFromOpaque(iterator); if (!(iter->index == -1 && iter->table == 0)) { - if (iter->safe) { + if (isSafe(iter)) { hashtableResumeRehashing(iter->hashtable); assert(iter->hashtable->pause_rehash >= 0); } else { @@ -1850,21 +1885,13 @@ void hashtableResetIterator(hashtableIterator *iterator) { } /* Allocates and initializes an iterator. */ -hashtableIterator *hashtableCreateIterator(hashtable *ht) { +hashtableIterator *hashtableCreateIterator(hashtable *ht, uint8_t flags) { iter *iter = zmalloc(sizeof(*iter)); hashtableIterator *opaque = iteratorToOpaque(iter); - hashtableInitIterator(opaque, ht); + hashtableInitIterator(opaque, ht, flags); return opaque; } -/* Allocates and initializes a safe iterator. */ -hashtableIterator *hashtableCreateSafeIterator(hashtable *ht) { - hashtableIterator *iterator = hashtableCreateIterator(ht); - iter *iter = iteratorFromOpaque(iterator); - iter->safe = 1; - return iterator; -} - /* Resets and frees the memory of an allocated iterator, i.e. one created using * hashtableCreate(Safe)Iterator. */ void hashtableReleaseIterator(hashtableIterator *iterator) { @@ -1880,7 +1907,7 @@ int hashtableNext(hashtableIterator *iterator, void **elemptr) { while (1) { if (iter->index == -1 && iter->table == 0) { /* It's the first call to next. */ - if (iter->safe) { + if (isSafe(iter)) { hashtablePauseRehashing(iter->hashtable); iter->last_seen_size = iter->hashtable->used[iter->table]; } else { @@ -1907,7 +1934,7 @@ int hashtableNext(hashtableIterator *iterator, void **elemptr) { iter->bucket = getChildBucket(iter->bucket); } else if (iter->pos_in_bucket >= ENTRIES_PER_BUCKET) { /* Bucket index done. */ - if (iter->safe) { + if (isSafe(iter)) { /* If entries in this bucket chain have been deleted, * they've left empty spaces in the buckets. The chain is * not automatically compacted when rehashing is paused. If @@ -1936,6 +1963,9 @@ int hashtableNext(hashtableIterator *iterator, void **elemptr) { } bucket *b = iter->bucket; if (iter->pos_in_bucket == 0) { + if (shouldPrefetchValues(iter)) { + prefetchBucketValues(b, iter->hashtable); + } prefetchNextBucketEntries(iter, b); } if (!isPositionFilled(b, iter->pos_in_bucket)) { diff --git a/src/hashtable.h b/src/hashtable.h index 4291cf5a5d..67e8a139f8 100644 --- a/src/hashtable.h +++ b/src/hashtable.h @@ -60,6 +60,8 @@ typedef struct { /* Callback to free an entry when it's overwritten or deleted. * Optional. */ void (*entryDestructor)(void *entry); + /* Callback to prefetch the value associated with a hashtable entry. */ + void (*entryPrefetchValue)(const void *entry); /* Callback to control when resizing should be allowed. */ int (*resizeAllowed)(size_t moreMem, double usedRatio); /* Invoked at the start of rehashing. */ @@ -91,6 +93,10 @@ typedef void (*hashtableScanFunction)(void *privdata, void *entry); /* Scan flags */ #define HASHTABLE_SCAN_EMIT_REF (1 << 0) +/* Iterator flags */ +#define HASHTABLE_ITER_SAFE (1 << 0) +#define HASHTABLE_ITER_PREFETCH_VALUES (1 << 1) + /* --- Prototypes --- */ /* Hash function (global seed) */ @@ -144,11 +150,10 @@ int hashtableIncrementalFindGetResult(hashtableIncrementalFindState *state, void /* Iteration & scan */ size_t hashtableScan(hashtable *ht, size_t cursor, hashtableScanFunction fn, void *privdata); size_t hashtableScanDefrag(hashtable *ht, size_t cursor, hashtableScanFunction fn, void *privdata, void *(*defragfn)(void *), int flags); -void hashtableInitIterator(hashtableIterator *iter, hashtable *ht); -void hashtableInitSafeIterator(hashtableIterator *iter, hashtable *ht); +void hashtableInitIterator(hashtableIterator *iter, hashtable *ht, uint8_t flags); +void hashtableReinitIterator(hashtableIterator *iterator, hashtable *ht); void hashtableResetIterator(hashtableIterator *iter); -hashtableIterator *hashtableCreateIterator(hashtable *ht); -hashtableIterator *hashtableCreateSafeIterator(hashtable *ht); +hashtableIterator *hashtableCreateIterator(hashtable *ht, uint8_t flags); void hashtableReleaseIterator(hashtableIterator *iter); int hashtableNext(hashtableIterator *iter, void **elemptr); diff --git a/src/kvstore.c b/src/kvstore.c index d6db4d3fe1..76bfb35d98 100644 --- a/src/kvstore.c +++ b/src/kvstore.c @@ -467,7 +467,7 @@ void kvstoreGetStats(kvstore *kvs, char *buf, size_t bufsize, int full) { hashtableStats *mainHtStats = NULL; hashtableStats *rehashHtStats = NULL; hashtable *ht; - kvstoreIterator *kvs_it = kvstoreIteratorInit(kvs); + kvstoreIterator *kvs_it = kvstoreIteratorInit(kvs, HASHTABLE_ITER_SAFE); while ((ht = kvstoreIteratorNextHashtable(kvs_it))) { hashtableStats *stats = hashtableGetStatsHt(ht, 0, full); if (!mainHtStats) { @@ -576,12 +576,12 @@ int kvstoreNumHashtables(kvstore *kvs) { /* Returns kvstore iterator that can be used to iterate through sub-hash tables. * * The caller should free the resulting kvs_it with kvstoreIteratorRelease. */ -kvstoreIterator *kvstoreIteratorInit(kvstore *kvs) { +kvstoreIterator *kvstoreIteratorInit(kvstore *kvs, uint8_t flags) { kvstoreIterator *kvs_it = zmalloc(sizeof(*kvs_it)); kvs_it->kvs = kvs; kvs_it->didx = -1; kvs_it->next_didx = kvstoreGetFirstNonEmptyHashtableIndex(kvs_it->kvs); /* Finds first non-empty hashtable index. */ - hashtableInitSafeIterator(&kvs_it->di, NULL); + hashtableInitIterator(&kvs_it->di, NULL, flags); return kvs_it; } @@ -625,7 +625,7 @@ int kvstoreIteratorNext(kvstoreIterator *kvs_it, void **next) { /* No current hashtable or reached the end of the hash table. */ hashtable *ht = kvstoreIteratorNextHashtable(kvs_it); if (!ht) return 0; - hashtableInitSafeIterator(&kvs_it->di, ht); + hashtableReinitIterator(&kvs_it->di, ht); return hashtableNext(&kvs_it->di, next); } } @@ -691,23 +691,15 @@ unsigned long kvstoreHashtableSize(kvstore *kvs, int didx) { return hashtableSize(ht); } -kvstoreHashtableIterator *kvstoreGetHashtableIterator(kvstore *kvs, int didx) { +kvstoreHashtableIterator *kvstoreGetHashtableIterator(kvstore *kvs, int didx, uint8_t flags) { kvstoreHashtableIterator *kvs_di = zmalloc(sizeof(*kvs_di)); kvs_di->kvs = kvs; kvs_di->didx = didx; - hashtableInitIterator(&kvs_di->di, kvstoreGetHashtable(kvs, didx)); + hashtableInitIterator(&kvs_di->di, kvstoreGetHashtable(kvs, didx), flags); return kvs_di; } -kvstoreHashtableIterator *kvstoreGetHashtableSafeIterator(kvstore *kvs, int didx) { - kvstoreHashtableIterator *kvs_di = zmalloc(sizeof(*kvs_di)); - kvs_di->kvs = kvs; - kvs_di->didx = didx; - hashtableInitSafeIterator(&kvs_di->di, kvstoreGetHashtable(kvs, didx)); - return kvs_di; -} - -/* Free the kvs_di returned by kvstoreGetHashtableIterator and kvstoreGetHashtableSafeIterator. */ +/* Free the kvs_di returned by kvstoreGetHashtableIterator. */ void kvstoreReleaseHashtableIterator(kvstoreHashtableIterator *kvs_di) { /* The hashtable may be deleted during the iteration process, so here need to check for NULL. */ if (kvstoreGetHashtable(kvs_di->kvs, kvs_di->didx)) { diff --git a/src/kvstore.h b/src/kvstore.h index 1a8c74a6b9..d5db1a89aa 100644 --- a/src/kvstore.h +++ b/src/kvstore.h @@ -43,7 +43,7 @@ void kvstoreHashtableTrackMemUsage(hashtable *s, ssize_t delta); size_t kvstoreHashtableMetadataSize(void); /* kvstore iterator specific functions */ -kvstoreIterator *kvstoreIteratorInit(kvstore *kvs); +kvstoreIterator *kvstoreIteratorInit(kvstore *kvs, uint8_t flags); void kvstoreIteratorRelease(kvstoreIterator *kvs_it); int kvstoreIteratorGetCurrentHashtableIndex(kvstoreIterator *kvs_it); int kvstoreIteratorNext(kvstoreIterator *kvs_it, void **next); @@ -57,8 +57,7 @@ unsigned long kvstoreHashtableRehashingCount(kvstore *kvs); /* Specific hashtable access by hashtable-index */ unsigned long kvstoreHashtableSize(kvstore *kvs, int didx); -kvstoreHashtableIterator *kvstoreGetHashtableIterator(kvstore *kvs, int didx); -kvstoreHashtableIterator *kvstoreGetHashtableSafeIterator(kvstore *kvs, int didx); +kvstoreHashtableIterator *kvstoreGetHashtableIterator(kvstore *kvs, int didx, uint8_t flags); void kvstoreReleaseHashtableIterator(kvstoreHashtableIterator *kvs_id); int kvstoreHashtableIteratorNext(kvstoreHashtableIterator *kvs_di, void **next); int kvstoreHashtableRandomEntry(kvstore *kvs, int didx, void **found); diff --git a/src/latency.c b/src/latency.c index 2beb4859d1..fa448dac35 100644 --- a/src/latency.c +++ b/src/latency.c @@ -528,7 +528,7 @@ void fillCommandCDF(client *c, struct hdr_histogram *histogram) { * a per command cumulative distribution of latencies. */ void latencyAllCommandsFillCDF(client *c, hashtable *commands, int *command_with_data) { hashtableIterator iter; - hashtableInitSafeIterator(&iter, commands); + hashtableInitIterator(&iter, commands, HASHTABLE_ITER_SAFE); void *next; while (hashtableNext(&iter, &next)) { struct serverCommand *cmd = next; @@ -565,7 +565,7 @@ void latencySpecificCommandsFillCDF(client *c) { if (cmd->subcommands_ht) { hashtableIterator iter; - hashtableInitSafeIterator(&iter, cmd->subcommands_ht); + hashtableInitIterator(&iter, cmd->subcommands_ht, HASHTABLE_ITER_SAFE); void *next; while (hashtableNext(&iter, &next)) { struct serverCommand *sub = next; diff --git a/src/module.c b/src/module.c index 75dcd81cd6..17ac4ddf02 100644 --- a/src/module.c +++ b/src/module.c @@ -12162,7 +12162,7 @@ int moduleFreeCommand(struct ValkeyModule *module, struct serverCommand *cmd) { if (cmd->subcommands_ht) { hashtableIterator iter; void *next; - hashtableInitSafeIterator(&iter, cmd->subcommands_ht); + hashtableInitIterator(&iter, cmd->subcommands_ht, HASHTABLE_ITER_SAFE); while (hashtableNext(&iter, &next)) { struct serverCommand *sub = next; if (moduleFreeCommand(module, sub) != C_OK) continue; @@ -12185,7 +12185,7 @@ void moduleUnregisterCommands(struct ValkeyModule *module) { /* Unregister all the commands registered by this module. */ hashtableIterator iter; void *next; - hashtableInitSafeIterator(&iter, server.commands); + hashtableInitIterator(&iter, server.commands, HASHTABLE_ITER_SAFE); while (hashtableNext(&iter, &next)) { struct serverCommand *cmd = next; if (moduleFreeCommand(module, cmd) != C_OK) continue; diff --git a/src/object.c b/src/object.c index b8200dd815..94c2985edb 100644 --- a/src/object.c +++ b/src/object.c @@ -630,7 +630,7 @@ void dismissSetObject(robj *o, size_t size_hint) { * page size, and there's a high chance we'll actually dismiss something. */ if (size_hint / hashtableSize(ht) >= server.page_size) { hashtableIterator iter; - hashtableInitIterator(&iter, ht); + hashtableInitIterator(&iter, ht, 0); void *next; while (hashtableNext(&iter, &next)) { sds item = next; @@ -682,7 +682,7 @@ void dismissHashObject(robj *o, size_t size_hint) { * a page size, and there's a high chance we'll actually dismiss something. */ if (size_hint / hashtableSize(ht) >= server.page_size) { hashtableIterator iter; - hashtableInitIterator(&iter, ht); + hashtableInitIterator(&iter, ht, 0); void *next; while (hashtableNext(&iter, &next)) { dismissHashTypeEntry(next); @@ -1156,7 +1156,7 @@ size_t objectComputeSize(robj *key, robj *o, size_t sample_size, int dbid) { asize = sizeof(*o) + hashtableMemUsage(ht); hashtableIterator iter; - hashtableInitIterator(&iter, ht); + hashtableInitIterator(&iter, ht, 0); void *next; while (hashtableNext(&iter, &next) && samples < sample_size) { sds element = next; @@ -1197,7 +1197,7 @@ size_t objectComputeSize(robj *key, robj *o, size_t sample_size, int dbid) { } else if (o->encoding == OBJ_ENCODING_HASHTABLE) { hashtable *ht = o->ptr; hashtableIterator iter; - hashtableInitIterator(&iter, ht); + hashtableInitIterator(&iter, ht, 0); void *next; asize = sizeof(*o) + hashtableMemUsage(ht); diff --git a/src/pubsub.c b/src/pubsub.c index 27b5611788..be6c739e98 100644 --- a/src/pubsub.c +++ b/src/pubsub.c @@ -366,7 +366,7 @@ int pubsubUnsubscribeChannel(client *c, robj *channel, int notify, pubsubtype ty void pubsubShardUnsubscribeAllChannelsInSlot(unsigned int slot) { if (!kvstoreHashtableSize(server.pubsubshard_channels, slot)) return; - kvstoreHashtableIterator *kvs_di = kvstoreGetHashtableSafeIterator(server.pubsubshard_channels, slot); + kvstoreHashtableIterator *kvs_di = kvstoreGetHashtableIterator(server.pubsubshard_channels, slot, HASHTABLE_ITER_SAFE); void *element; while (kvstoreHashtableIteratorNext(kvs_di, &element)) { dict *clients = element; @@ -730,7 +730,7 @@ void channelList(client *c, sds pat, kvstore *pubsub_channels) { replylen = addReplyDeferredLen(c); for (unsigned int i = 0; i < slot_cnt; i++) { if (!kvstoreHashtableSize(pubsub_channels, i)) continue; - kvstoreHashtableIterator *kvs_di = kvstoreGetHashtableIterator(pubsub_channels, i); + kvstoreHashtableIterator *kvs_di = kvstoreGetHashtableIterator(pubsub_channels, i, 0); void *next; while (kvstoreHashtableIteratorNext(kvs_di, &next)) { dict *clients = next; diff --git a/src/rdb.c b/src/rdb.c index 0bb5d7d45d..6653e99c3a 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -887,7 +887,7 @@ ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key, int dbid) { nwritten += n; hashtableIterator iterator; - hashtableInitIterator(&iterator, set); + hashtableInitIterator(&iterator, set, 0); void *next; while (hashtableNext(&iterator, &next)) { sds ele = next; @@ -959,7 +959,7 @@ ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key, int dbid) { nwritten += n; hashtableIterator iter; - hashtableInitIterator(&iter, ht); + hashtableInitIterator(&iter, ht, 0); void *next; while (hashtableNext(&iter, &next)) { sds field = hashTypeEntryGetField(next); @@ -1349,7 +1349,7 @@ ssize_t rdbSaveDb(rio *rdb, int dbid, int rdbflags, long *key_counter) { if ((res = rdbSaveLen(rdb, expires_size)) < 0) goto werr; written += res; - kvs_it = kvstoreIteratorInit(db->keys); + kvs_it = kvstoreIteratorInit(db->keys, HASHTABLE_ITER_SAFE | HASHTABLE_ITER_PREFETCH_VALUES); int last_slot = -1; /* Iterate this DB writing every entry */ void *next; diff --git a/src/server.c b/src/server.c index 144841eff9..ab06a24470 100644 --- a/src/server.c +++ b/src/server.c @@ -577,6 +577,15 @@ const void *hashtableObjectGetKey(const void *entry) { return objectGetKey(entry); } +/* Prefetch the value if it's not embedded. */ +void hashtableObjectPrefetchValue(const void *entry) { + const robj *obj = entry; + if (obj->encoding != OBJ_ENCODING_EMBSTR && + obj->encoding != OBJ_ENCODING_INT) { + valkey_prefetch(obj->ptr); + } +} + int hashtableObjKeyCompare(const void *key1, const void *key2) { const robj *o1 = key1, *o2 = key2; return hashtableSdsKeyCompare(o1->ptr, o2->ptr); @@ -589,6 +598,7 @@ void hashtableObjectDestructor(void *val) { /* Kvstore->keys, keys are sds strings, vals are Objects. */ hashtableType kvstoreKeysHashtableType = { + .entryPrefetchValue = hashtableObjectPrefetchValue, .entryGetKey = hashtableObjectGetKey, .hashFunction = hashtableSdsHash, .keyCompare = hashtableSdsKeyCompare, @@ -602,6 +612,7 @@ hashtableType kvstoreKeysHashtableType = { /* Kvstore->expires */ hashtableType kvstoreExpiresHashtableType = { + .entryPrefetchValue = hashtableObjectPrefetchValue, .entryGetKey = hashtableObjectGetKey, .hashFunction = hashtableSdsHash, .keyCompare = hashtableSdsKeyCompare, @@ -3205,7 +3216,7 @@ void populateCommandTable(void) { void resetCommandTableStats(hashtable *commands) { hashtableIterator iter; void *next; - hashtableInitSafeIterator(&iter, commands); + hashtableInitIterator(&iter, commands, HASHTABLE_ITER_SAFE); while (hashtableNext(&iter, &next)) { struct serverCommand *c = next; c->microseconds = 0; @@ -4988,7 +4999,7 @@ void addReplyCommandSubCommands(client *c, void *next; hashtableIterator iter; - hashtableInitSafeIterator(&iter, cmd->subcommands_ht); + hashtableInitIterator(&iter, cmd->subcommands_ht, HASHTABLE_ITER_SAFE); while (hashtableNext(&iter, &next)) { struct serverCommand *sub = next; if (use_map) addReplyBulkCBuffer(c, sub->fullname, sdslen(sub->fullname)); @@ -5150,7 +5161,7 @@ void commandCommand(client *c) { hashtableIterator iter; void *next; addReplyArrayLen(c, hashtableSize(server.commands)); - hashtableInitIterator(&iter, server.commands); + hashtableInitIterator(&iter, server.commands, 0); while (hashtableNext(&iter, &next)) { struct serverCommand *cmd = next; addReplyCommandInfo(c, cmd); @@ -5209,7 +5220,7 @@ int shouldFilterFromCommandList(struct serverCommand *cmd, commandListFilter *fi void commandListWithFilter(client *c, hashtable *commands, commandListFilter filter, int *numcmds) { hashtableIterator iter; void *next; - hashtableInitIterator(&iter, commands); + hashtableInitIterator(&iter, commands, 0); while (hashtableNext(&iter, &next)) { struct serverCommand *cmd = next; if (!shouldFilterFromCommandList(cmd, &filter)) { @@ -5228,7 +5239,7 @@ void commandListWithFilter(client *c, hashtable *commands, commandListFilter fil void commandListWithoutFilter(client *c, hashtable *commands, int *numcmds) { hashtableIterator iter; void *next; - hashtableInitIterator(&iter, commands); + hashtableInitIterator(&iter, commands, 0); while (hashtableNext(&iter, &next)) { struct serverCommand *cmd = next; addReplyBulkCBuffer(c, cmd->fullname, sdslen(cmd->fullname)); @@ -5290,7 +5301,7 @@ void commandInfoCommand(client *c) { hashtableIterator iter; void *next; addReplyArrayLen(c, hashtableSize(server.commands)); - hashtableInitIterator(&iter, server.commands); + hashtableInitIterator(&iter, server.commands, 0); while (hashtableNext(&iter, &next)) { struct serverCommand *cmd = next; addReplyCommandInfo(c, cmd); @@ -5312,7 +5323,7 @@ void commandDocsCommand(client *c) { hashtableIterator iter; void *next; addReplyMapLen(c, hashtableSize(server.commands)); - hashtableInitIterator(&iter, server.commands); + hashtableInitIterator(&iter, server.commands, 0); while (hashtableNext(&iter, &next)) { struct serverCommand *cmd = next; addReplyBulkCBuffer(c, cmd->fullname, sdslen(cmd->fullname)); @@ -5441,7 +5452,7 @@ const char *getSafeInfoString(const char *s, size_t len, char **tmp) { sds genValkeyInfoStringCommandStats(sds info, hashtable *commands) { hashtableIterator iter; void *next; - hashtableInitSafeIterator(&iter, commands); + hashtableInitIterator(&iter, commands, HASHTABLE_ITER_SAFE); while (hashtableNext(&iter, &next)) { struct serverCommand *c = next; char *tmpsafe; @@ -5478,7 +5489,7 @@ sds genValkeyInfoStringACLStats(sds info) { sds genValkeyInfoStringLatencyStats(sds info, hashtable *commands) { hashtableIterator iter; void *next; - hashtableInitSafeIterator(&iter, commands); + hashtableInitIterator(&iter, commands, HASHTABLE_ITER_SAFE); while (hashtableNext(&iter, &next)) { struct serverCommand *c = next; char *tmpsafe; diff --git a/src/sort.c b/src/sort.c index 7af96141e8..754ebef4a2 100644 --- a/src/sort.c +++ b/src/sort.c @@ -447,7 +447,7 @@ void sortCommandGeneric(client *c, int readonly) { } else if (sortval->type == OBJ_ZSET) { hashtable *ht = ((zset *)sortval->ptr)->ht; hashtableIterator iter; - hashtableInitIterator(&iter, ht); + hashtableInitIterator(&iter, ht, 0); void *next; while (hashtableNext(&iter, &next)) { zskiplistNode *node = next; diff --git a/src/t_hash.c b/src/t_hash.c index b6e6457bb6..b347ecf31f 100644 --- a/src/t_hash.c +++ b/src/t_hash.c @@ -382,7 +382,7 @@ void hashTypeInitIterator(robj *subject, hashTypeIterator *hi) { hi->fptr = NULL; hi->vptr = NULL; } else if (hi->encoding == OBJ_ENCODING_HASHTABLE) { - hashtableInitIterator(&hi->iter, subject->ptr); + hashtableInitIterator(&hi->iter, subject->ptr, 0); } else { serverPanic("Unknown hash encoding"); } diff --git a/src/t_set.c b/src/t_set.c index 4279baf82f..a69345de4f 100644 --- a/src/t_set.c +++ b/src/t_set.c @@ -317,7 +317,7 @@ setTypeIterator *setTypeInitIterator(robj *subject) { si->subject = subject; si->encoding = subject->encoding; if (si->encoding == OBJ_ENCODING_HASHTABLE) { - si->hashtable_iterator = hashtableCreateIterator(subject->ptr); + si->hashtable_iterator = hashtableCreateIterator(subject->ptr, 0); } else if (si->encoding == OBJ_ENCODING_INTSET) { si->ii = 0; } else if (si->encoding == OBJ_ENCODING_LISTPACK) { @@ -1179,7 +1179,7 @@ void srandmemberWithCountCommand(client *c) { /* CASE 3 & 4: send the result to the user. */ { hashtableIterator iter; - hashtableInitIterator(&iter, ht); + hashtableInitIterator(&iter, ht, 0); addReplyArrayLen(c, count); serverAssert(count == hashtableSize(ht)); diff --git a/src/t_zset.c b/src/t_zset.c index 77c96613b7..2444f3ecd0 100644 --- a/src/t_zset.c +++ b/src/t_zset.c @@ -2092,7 +2092,7 @@ static void zuiInitIterator(zsetopsrc *op) { it->is.is = op->subject->ptr; it->is.ii = 0; } else if (op->encoding == OBJ_ENCODING_HASHTABLE) { - it->ht.iter = hashtableCreateIterator(op->subject->ptr); + it->ht.iter = hashtableCreateIterator(op->subject->ptr, 0); } else if (op->encoding == OBJ_ENCODING_LISTPACK) { it->lp.lp = op->subject->ptr; it->lp.p = lpFirst(it->lp.lp); @@ -2349,7 +2349,7 @@ static size_t zsetHashtableGetMaxElementLength(hashtable *ht, size_t *totallen) size_t maxelelen = 0; hashtableIterator iter; - hashtableInitIterator(&iter, ht); + hashtableInitIterator(&iter, ht, 0); void *next; while (hashtableNext(&iter, &next)) { zskiplistNode *node = next; @@ -2749,7 +2749,7 @@ static void zunionInterDiffGenericCommand(client *c, robj *dstkey, int numkeysIn /* Step 2: Create the skiplist using final score ordering */ hashtableIterator iter; - hashtableInitIterator(&iter, dstzset->ht); + hashtableInitIterator(&iter, dstzset->ht, 0); void *next; while (hashtableNext(&iter, &next)) { diff --git a/src/unit/test_hashtable.c b/src/unit/test_hashtable.c index 689440e43d..71a7dde841 100644 --- a/src/unit/test_hashtable.c +++ b/src/unit/test_hashtable.c @@ -547,7 +547,7 @@ int test_iterator(int argc, char **argv, int flags) { size_t num_returned = 0; hashtableIterator iter; void *next; - hashtableInitIterator(&iter, ht); + hashtableInitIterator(&iter, ht, 0); while (hashtableNext(&iter, &next)) { uint8_t *entry = next; num_returned++; @@ -592,7 +592,7 @@ int test_safe_iterator(int argc, char **argv, int flags) { size_t num_returned = 0; hashtableIterator iter; void *next; - hashtableInitSafeIterator(&iter, ht); + hashtableInitIterator(&iter, ht, HASHTABLE_ITER_SAFE); while (hashtableNext(&iter, &next)) { uint8_t *entry = next; size_t index = entry - entry_counts; @@ -657,7 +657,7 @@ int test_compact_bucket_chain(int argc, char **argv, int flags) { size_t num_chained_buckets = hashtableChainedBuckets(ht, 0); size_t num_returned = 0; hashtableIterator iter; - hashtableInitSafeIterator(&iter, ht); + hashtableInitIterator(&iter, ht, HASHTABLE_ITER_SAFE); void *entry; while (hashtableNext(&iter, &entry)) { /* As long as the iterator is still returning entries from the same diff --git a/src/unit/test_kvstore.c b/src/unit/test_kvstore.c index d4cc91d6d8..55b311c4ba 100644 --- a/src/unit/test_kvstore.c +++ b/src/unit/test_kvstore.c @@ -77,7 +77,7 @@ int test_kvstoreIteratorRemoveAllKeysNoDeleteEmptyHashtable(int argc, char **arg TEST_ASSERT(kvstoreHashtableAdd(kvs1, didx, stringFromInt(i))); } - kvs_it = kvstoreIteratorInit(kvs1); + kvs_it = kvstoreIteratorInit(kvs1, HASHTABLE_ITER_SAFE); while (kvstoreIteratorNext(kvs_it, &key)) { curr_slot = kvstoreIteratorGetCurrentHashtableIndex(kvs_it); TEST_ASSERT(kvstoreHashtableDelete(kvs1, curr_slot, key)); @@ -110,7 +110,7 @@ int test_kvstoreIteratorRemoveAllKeysDeleteEmptyHashtable(int argc, char **argv, TEST_ASSERT(kvstoreHashtableAdd(kvs2, didx, stringFromInt(i))); } - kvs_it = kvstoreIteratorInit(kvs2); + kvs_it = kvstoreIteratorInit(kvs2, HASHTABLE_ITER_SAFE); while (kvstoreIteratorNext(kvs_it, &key)) { curr_slot = kvstoreIteratorGetCurrentHashtableIndex(kvs_it); TEST_ASSERT(kvstoreHashtableDelete(kvs2, curr_slot, key)); @@ -146,7 +146,7 @@ int test_kvstoreHashtableIteratorRemoveAllKeysNoDeleteEmptyHashtable(int argc, c TEST_ASSERT(kvstoreHashtableAdd(kvs1, didx, stringFromInt(i))); } - kvs_di = kvstoreGetHashtableSafeIterator(kvs1, didx); + kvs_di = kvstoreGetHashtableIterator(kvs1, didx, HASHTABLE_ITER_SAFE); while (kvstoreHashtableIteratorNext(kvs_di, &key)) { TEST_ASSERT(kvstoreHashtableDelete(kvs1, didx, key)); } @@ -177,7 +177,7 @@ int test_kvstoreHashtableIteratorRemoveAllKeysDeleteEmptyHashtable(int argc, cha TEST_ASSERT(kvstoreHashtableAdd(kvs2, didx, stringFromInt(i))); } - kvs_di = kvstoreGetHashtableSafeIterator(kvs2, didx); + kvs_di = kvstoreGetHashtableIterator(kvs2, didx, HASHTABLE_ITER_SAFE); while (kvstoreHashtableIteratorNext(kvs_di, &key)) { TEST_ASSERT(kvstoreHashtableDelete(kvs2, didx, key)); }