Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve] PIP-381: Handle PositionInfo that's too large to serialize as a single entry #22799

Open
wants to merge 23 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
e014356
ManagedCursor: compress data written to BookKeeper
eolivelli May 10, 2024
33e4c71
serialize/compress without intermediate byte arrays (#268)
dlg99 May 16, 2024
6d1b93a
Print time
eolivelli May 16, 2024
568d446
ManagedCursor: manually serialise PositionInfo (#270)
eolivelli May 16, 2024
0240250
Fix PositionInfoUtilsTest
eolivelli May 16, 2024
ed8df4d
PositionInfo Util serialization fix and test (#272)
dlg99 May 17, 2024
c2f0908
Remove auto reset of cursor in case of read error
eolivelli May 17, 2024
27152ff
Revert removal of 'containsKey' in ManagedCursorImpl
eolivelli May 17, 2024
564a668
Prevent ZK connection loss in case of huge cursor status (#273)
eolivelli May 20, 2024
0d23d5b
[managed-ledger] Compressed cursors: fix problem with little buffers …
eolivelli May 21, 2024
08af8fc
[tests] Fix build after merge conflict
eolivelli May 21, 2024
e8d3930
Fix WriteCursorLedgerSize metric
nicoloboschi May 22, 2024
89adf38
try ledger recovery from previous entries in case of corrupt/missing …
dlg99 May 28, 2024
43a5b31
fix boken test after merge/resolve
dlg99 May 29, 2024
8db72f4
post-rebase fixes
dlg99 Sep 9, 2024
1397faf
removed usage of byte[] where possible
dlg99 Sep 23, 2024
d4b4195
added config parameters for teh chunk size and to enable/disable chin…
dlg99 Sep 24, 2024
2fdbe63
CR feedback, addComponent(true, ..)
dlg99 Sep 27, 2024
9b88801
Updated broker.conf with new entries
dlg99 Sep 30, 2024
7918f21
updated configs with docs and new config values, including the standa…
dlg99 Oct 1, 2024
048142c
Merge branch 'master' into cpick/cursor-large-state
dlg99 Oct 3, 2024
590c5ac
info logging to debug
dlg99 Oct 3, 2024
54157d8
CR feedback
dlg99 Oct 3, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions buildtools/src/main/resources/pulsar/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
<suppress checks=".*" files=".+[\\/]generated[\\/].+\.java"/>
<suppress checks=".*" files=".+[\\/]generated-sources[\\/].+\.java"/>
<suppress checks=".*" files=".+[\\/]generated-test-sources[\\/].+\.java"/>
<suppress checks=".*" files=".+PositionInfoUtils.java"/>

<!-- suppress most all checks expect below-->
<suppress checks="^(?!.*(UnusedImports|IllegalImport)).*$" files=".*[\\/]src[\\/]test[\\/].*"/>
Expand Down
11 changes: 9 additions & 2 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1941,10 +1941,17 @@ managedLedgerMaxUnackedRangesToPersistInZooKeeper=-1
# persist, it will help to reduce the duplicates caused by the ack state that can not be fully persistent.
dispatcherPauseOnAckStatePersistentEnabled=false

# If enabled, the maximum "acknowledgment holes" will not be limited and "acknowledgment holes" are stored in
# multiple entries.
# If enabled, the maximum "acknowledgment holes" (as defined by managedLedgerMaxUnackedRangesToPersist)
# can be stored in multiple entries, allowing the higher limits.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should define that higher limit because again it will confuse user and no one will have any idea about this limit. PR #9292 by default supports up to 10M unack messages in single entry,. So, we should define higher limit as > 10M unack messages

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not break backwards compatibility given that feature is disabled by default

Copy link
Contributor

@rdhabalia rdhabalia Oct 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how does it break the compatibility by just updating the comment?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also this is a new feature to write in multiple entry from where backward compatibility came from :-)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It writes to the same ledger as existing single entry write.

persistentUnackedRangesWithMultipleEntriesEnabled=false

# If persistentUnackedRangesWithMultipleEntriesEnabled, this sets maximum entry size for storage in bytes.
#persistentUnackedRangesMaxEntrySize=1048576

# Set the compression type to use for cursor info.
# Possible options are NONE, LZ4, ZLIB, ZSTD, SNAPPY
#cursorInfoCompressionType=NONE
Copy link
Member

@lhotari lhotari Oct 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to remove the comment from the line. The reason for this is that when it's without the comment, it won't require PULSAR_PREFIX_ with the k8s config hack that Pulsar uses.

Please also add the config changes to standalone.conf


# Deprecated - Use managedLedgerCacheEvictionIntervalMs instead
managedLedgerCacheEvictionFrequency=0

Expand Down
16 changes: 16 additions & 0 deletions managed-ledger/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,22 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>com.github.splunk.lightproto</groupId>
<artifactId>lightproto-maven-plugin</artifactId>
<version>${lightproto-maven-plugin.version}</version>
<configuration>
<singleOuterClass>true</singleOuterClass>
<classPrefix>Light</classPrefix>
</configuration>
<executions>
<execution>
<goals>
<goal>generate</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ public class ManagedLedgerConfig {
private boolean createIfMissing = true;
private int maxUnackedRangesToPersist = 10000;
private int maxBatchDeletedIndexToPersist = 10000;
private String cursorInfoCompressionType = "NONE";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's use the best possible compression algo by performing various tests. performing perf tests for each type for such internal implementation is difficult for any user and it's the author's responsibility to give those numbers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not break backwards compatibility

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isnt' it part of this new feature? then how come it will impact backward compatibility? if we will introduce it in this feature then we will have compatibility issue and that's what I would like to avoid by not adding it here,

private boolean persistentUnackedRangesWithMultipleEntriesEnabled = false;
private int persistentUnackedRangesMaxEntrySize = 1024 * 1024;
private boolean deletionAtBatchIndexLevelEnabled = true;
private int maxUnackedRangesToPersistInMetadataStore = 1000;
private int maxEntriesPerLedger = 50000;
Expand Down Expand Up @@ -480,14 +482,49 @@ public int getMaxBatchDeletedIndexToPersist() {
return maxBatchDeletedIndexToPersist;
}

/**
* @return true if persistent unacked ranges with multiple entries enabled.
*/
public boolean isPersistentUnackedRangesWithMultipleEntriesEnabled() {
return persistentUnackedRangesWithMultipleEntriesEnabled;
}

/**
* If enabled, the maximum "acknowledgment holes" will be stored in multiple entries, allowing the higher limits.
* @param multipleEntriesEnabled
*/
public void setPersistentUnackedRangesWithMultipleEntriesEnabled(boolean multipleEntriesEnabled) {
this.persistentUnackedRangesWithMultipleEntriesEnabled = multipleEntriesEnabled;
}

/**
* @return max entry size for persistent unacked ranges.
*/
public int getPersistentUnackedRangesMaxEntrySize() {
return persistentUnackedRangesMaxEntrySize;
}

/**
* If persistentUnackedRangesWithMultipleEntriesEnabled, this sets maximum entry size for storage in bytes.
*/
public void setPersistentUnackedRangesMaxEntrySize(int persistentUnackedRangesMaxEntrySize) {
this.persistentUnackedRangesMaxEntrySize = persistentUnackedRangesMaxEntrySize;
}

/**
* @return compression type to use for cursor info.
*/
public String getCursorInfoCompressionType() {
return cursorInfoCompressionType;
}

/**
* Set the compression type to use for cursor info.
*/
public void setCursorInfoCompressionType(String cursorInfoCompressionType) {
this.cursorInfoCompressionType = cursorInfoCompressionType;
}

/**
* @param maxUnackedRangesToPersist
* max unacked message ranges that will be persisted and receverd.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,16 @@ public class ManagedLedgerFactoryConfig {
*/
private long managedCursorInfoCompressionThresholdInBytes = 0;

/**
* If enabled, the maximum "acknowledgment holes" will be stored in multiple entries, allowing the higher limits.
*/
private boolean persistentUnackedRangesWithMultipleEntriesEnabled = false;

/**
* If persistentUnackedRangesWithMultipleEntriesEnabled, this sets maximum entry size for storage in bytes.
*/
private int persistentUnackedRangesMaxEntrySize = 1024 * 1024;

public MetadataCompressionConfig getCompressionConfigForManagedLedgerInfo() {
return new MetadataCompressionConfig(managedLedgerInfoCompressionType,
managedLedgerInfoCompressionThresholdInBytes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public final class LedgerMetadataUtils {

private static final String METADATA_PROPERTY_MANAGED_LEDGER_NAME = "pulsar/managed-ledger";
private static final String METADATA_PROPERTY_CURSOR_NAME = "pulsar/cursor";
public static final String METADATA_PROPERTY_CURSOR_COMPRESSION_TYPE = "pulsar/cursor-compressionType";
private static final String METADATA_PROPERTY_COMPACTEDTOPIC = "pulsar/compactedTopic";
private static final String METADATA_PROPERTY_COMPACTEDTO = "pulsar/compactedTo";
private static final String METADATA_PROPERTY_SCHEMAID = "pulsar/schemaId";
Expand Down Expand Up @@ -72,8 +73,13 @@ static Map<String, byte[]> buildBaseManagedLedgerMetadata(String name) {
* @return an immutable map which describes the cursor
* @see #buildBaseManagedLedgerMetadata(java.lang.String)
*/
static Map<String, byte[]> buildAdditionalMetadataForCursor(String name) {
return Map.of(METADATA_PROPERTY_CURSOR_NAME, name.getBytes(StandardCharsets.UTF_8));
static Map<String, byte[]> buildAdditionalMetadataForCursor(String name, String compressionType) {
if (compressionType != null) {
return Map.of(METADATA_PROPERTY_CURSOR_NAME, name.getBytes(StandardCharsets.UTF_8),
METADATA_PROPERTY_CURSOR_COMPRESSION_TYPE, compressionType.getBytes(StandardCharsets.UTF_8));
} else {
return Map.of(METADATA_PROPERTY_CURSOR_NAME, name.getBytes(StandardCharsets.UTF_8));
}
}

/**
Expand Down
Loading
Loading