Skip to content

Latest commit

 

History

History
115 lines (85 loc) · 6.17 KB

dedup.md

File metadata and controls

115 lines (85 loc) · 6.17 KB
description
Deduplication support in Apache Pinot.

Stream ingestion with Dedup

Pinot provides native support for deduplication (dedup) during the real-time ingestion (v0.11.0+).

Prerequisites for enabling dedup

To enable dedup on a Pinot table, make the following table configuration and schema changes:

Define the primary key in the schema

To be able to dedup records, a primary key is needed to uniquely identify a given record. To define a primary key, add the field primaryKeyColumns to the schema definition.

{% code title="schemaWithPK.json" %}

{
    "primaryKeyColumns": ["id"]
}

{% endcode %}

Note this field expects a list of columns, as the primary key can be composite.

While ingesting a record, if its primary key is found to be already present, the record will be dropped.

Partition the input stream by the primary key

An important requirement for the Pinot dedup table is to partition the input stream by the primary key. For Kafka messages, this means the producer shall set the key in the send API. If the original stream is not partitioned, then a streaming processing job (e.g. Flink) is needed to shuffle and repartition the input stream into a partitioned one for Pinot's ingestion.

Use strictReplicaGroup for routing

The dedup Pinot table can use only the low-level consumer for the input streams. As a result, it uses the partitioned replica-group assignment for the segments. Moreover, dedup poses the additional requirement that all segments of the same partition must be served from the same server to ensure the data consistency across the segments. Accordingly, it requires strictReplicaGroup as the routing strategy. To use that, configure instanceSelectorType in Routing as the following:

{% code title="routing" %}

{
  "routing": {
    "instanceSelectorType": "strictReplicaGroup"
  }
}

{% endcode %}

Other limitations

  • The high-level consumer is not allowed for the input stream ingestion, which means stream.kafka.consumer.type must be lowLevel.
  • The incoming stream must be partitioned by the primary key such that, all records with a given primaryKey must be consumed by the same Pinot server instance.

Enable dedup in the table configurations

To enable dedup for a REALTIME table, add the following to the table config.

{% code title="tableConfigWithDedup.json" %}

{ 
 ...
  "dedupConfig": { 
        "dedupEnabled": true, 
        "hashFunction": "NONE" 
   }, 
 ...
}

{% endcode %}

Supported values for hashFunction are NONE, MD5 and MURMUR3, with the default being NONE.

Metadata TTL

Server stores the existing primary keys in dedup metadata map kept on JVM heap. As the dedup metadata grows, the heap memory pressure increases, which may affect the performance of ingestion and queries. One can set a positive metadata TTL to enable the TTL mechanism to keep the metadata size bounded. By default, the table's time colum is used as the dedup time column. The time unit of TTL is the same as the dedup time column. The TTL should be set long enough so that new records can be deduplicated before the primary keys gets removed.

{ 
 ...
  "dedupConfig": { 
        "dedupEnabled": true, 
        "hashFunction": "NONE",
        "dedupTimeColumn": "mtime",
        "metadataTTL": 30000
   }, 
 ...
}

Enable preload for faster server restarts

When ingesting new records, the server has to read the metadata map to check for duplicates. But when server restarts, the documents in existing segments are all unique as ensured by the dedup logic during real-time ingestion. So we can do write-only to bootstrap the metadata map faster.

{ 
 ...
  "dedupConfig": { 
        "dedupEnabled": true, 
        "hashFunction": "NONE",
        "dedupTimeColumn": "mtime",
        "metadataTTL": 30000,
        "enablePreload": true
   }, 
 ...
}

The feature also requires you to specify pinot.server.instance.max.segment.preload.threads: N in the server config where N should be replaced with the number of threads that should be used for preload. It's 0 by default to disable the preloading feature. This preloading thread pool is shared with upsert table's preloading.

Best practices

Unlike other real-time tables, Dedup table takes up more memory resources as it needs to bookkeep the primary key and its corresponding segment reference, in memory. As a result, it's important to plan the capacity beforehand, and monitor the resource usage. Here are some recommended practices of using Dedup table.

  • Create the Kafka topic with more partitions. The number of Kafka partitions determines the partition numbers of the Pinot table. The more partitions you have in the Kafka topic, more Pinot servers you can distribute the Pinot table to and therefore more you can scale the table horizontally.
  • Dedup table maintains an in-memory map from the primary key to the segment reference. So it's recommended to use a simple primary key type and avoid composite primary keys to save the memory cost. In addition, consider the hashFunction config in the Dedup config, which can be MD5 or MURMUR3, to store the 128-bit hashcode of the primary key instead. This is useful when your primary key takes more space. But keep in mind, this hash may introduce collisions, though the chance is very low.
  • Monitoring: Set up a dashboard over the metric pinot.server.dedupPrimaryKeysCount.tableName to watch the number of primary keys in a table partition. It's useful for tracking its growth which is proportional to the memory usage growth.
  • Capacity planning: It's useful to plan the capacity beforehand to ensure you will not run into resource constraints later. A simple way is to measure the amount of the primary keys in the Kafka throughput per partition and time the primary key space cost to approximate the memory usage. A heap dump is also useful to check the memory usage so far on an dedup table instance.