Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add minhash deduplicator based on RAY and Redis #489

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 14 additions & 6 deletions configs/config_all.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -632,17 +632,25 @@ process:
- video_deduplicator: # deduplicator to deduplicate samples at document-level using exact matching of videos between documents.
consider_text: false # whether to consider text hash together with video hash when applying deduplication.
- ray_video_deduplicator: # the simple video deduplicator that can run on multi-nodes using md5 hashing exact matching method
redis_host: 'redis_host' # the host of the redis instance
redis_port: 6380 # the port of redis instance, please note that the default port of redis is 6379 which is the same as default port for ray, so we need to modify the default redis config to use it in other port
redis_address: 'redis://localhost:6379' # the address of the redis instance
- ray_image_deduplicator: # the simple image deduplicator that can deduplicate samples at document-level using exact matching of images between documents.
redis_host: 'redis_host' # the host of the redis instance
redis_port: 6380 # the port of redis instance, please note that the default port of redis is 6379 which is the same as default port for ray, so we need to modify the default redis config to use it in other port
redis_address: 'redis://localhost:6379' # the address of the redis instance
method: phash # hash method for image. One of [phash, dhash, whash, ahash]
- ray_document_deduplicator: # the simple document deduplicator that can run on multi-nodes using md5 hashing exact matching method
redis_host: 'redis_host' # the host of the redis instance
redis_port: 6380 # the port of redis instance, please note that the default port of redis is 6379 which is the same as default port for ray, so we need to modify the default redis config to use it in other port
redis_address: 'redis://localhost:6379' # the address of the redis instance
lowercase: false # whether to convert text to lower case
ignore_non_character: false # whether to ignore non-alphabet characters, including whitespaces, digits, and punctuations
- ray_redis_minhash_deduplicator: # the document deduplicator that can run on multi-nodes using minhashLSH algorithm
redis_address: 'redis://localhost:6379' # the address of the redis instance
tokenization: space # tokenization method for text. One of [space, punctuation, character, sentencepiece]
window_size: 5 # window size of shingling
num_permutations: 256 # number of permutations in minhash computing
jaccard_threshold: 0.7 # the min jaccard similarity threshold in near-duplicate detection. When the jaccard similarity of two sample texts is >= this threshold, they are regarded as similar samples and this op will only keep one of them after deduplication
num_bands: null # number of bands in LSH. Default it's None, and it will be determined by an optimal params computation algorithm by minimize the weighted sum of probs of False Positives and False Negatives
num_rows_per_band: null # number of rows in each band in LSH. Default it's None, and it will be determined by an optimal params computation algorithm
lowercase: true # whether to convert text to lower case
ignore_pattern: null # whether to ignore sub-strings with specific pattern when computing simhash.
tokenizer_model: null # path for the sentencepiece model, used for sentencepiece tokenization.

# Selector ops
- frequency_specified_field_selector: # selector to select samples based on the sorted frequency of specified field value
Expand Down
4 changes: 3 additions & 1 deletion data_juicer/core/ray_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from data_juicer import cuda_device_count
from data_juicer.core.data import DJDataset
from data_juicer.ops import Filter, Mapper
from data_juicer.ops import Deduplicator, Filter, Mapper
from data_juicer.utils.constant import Fields
from data_juicer.utils.lazy_loader import LazyLoader
from data_juicer.utils.process_utils import calculate_np
Expand Down Expand Up @@ -123,6 +123,8 @@ def _run_single_op(self, op):
self.data.write_json(op.stats_export_path,
force_ascii=False)
self.data = self.data.filter(op.process)
elif isinstance(op, Deduplicator):
self.data = op.run(self.data)
else:
logger.error(
'Ray executor only support Filter and Mapper OPs for now')
Expand Down
3 changes: 2 additions & 1 deletion data_juicer/ops/deduplicator/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@
from .ray_basic_deduplicator import RayBasicDeduplicator
from .ray_document_deduplicator import RayDocumentDeduplicator
from .ray_image_deduplicator import RayImageDeduplicator
from .ray_redis_minhash_deduplicator import RayRedisMinhashDeduplicator
from .ray_video_deduplicator import RayVideoDeduplicator
from .video_deduplicator import VideoDeduplicator

__all__ = [
'DocumentDeduplicator', 'DocumentMinhashDeduplicator',
'DocumentSimhashDeduplicator', 'ImageDeduplicator', 'RayBasicDeduplicator',
'RayDocumentDeduplicator', 'RayImageDeduplicator', 'RayVideoDeduplicator',
'VideoDeduplicator'
'RayRedisMinhashDeduplicator', 'VideoDeduplicator'
]
15 changes: 5 additions & 10 deletions data_juicer/ops/deduplicator/ray_basic_deduplicator.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from pydantic import PositiveInt

from data_juicer.utils.constant import HashKeys
from data_juicer.utils.lazy_loader import LazyLoader

Expand All @@ -19,23 +17,20 @@ class RayBasicDeduplicator(Filter):
EMPTY_HASH_VALUE = 'EMPTY'

def __init__(self,
redis_host: str = 'localhost',
redis_port: PositiveInt = 6380,
redis_address: str = 'redis://localhost:6379',
*args,
**kwargs):
"""
Initialization.
:param redis_host: the hostname of redis server
:param redis_port: the port of redis server
:param redis_address: the address of redis server
:param args: extra args
:param kwargs: extra args
"""
super().__init__(*args, **kwargs)
self.redis_host = redis_host
self.redis_port = redis_port
self.redis_address = redis_address
# TODO: add a barrier to ensure that flushdb is performed before
# the operator is called
r = redis.StrictRedis(host=self.redis_host, port=self.redis_port, db=0)
r = redis.from_url(url=redis_address)
r.flushdb(0)

def calculate_hash(self, sample, context=False):
Expand All @@ -44,7 +39,7 @@ def calculate_hash(self, sample, context=False):

def compute_stats_single(self, sample, context=False):
# init redis client
r = redis.StrictRedis(host=self.redis_host, port=self.redis_port, db=0)
r = redis.from_url(url=self.redis_address)
# compute hash
md5_value = self.calculate_hash(sample, context)
# check existing
Expand Down
12 changes: 3 additions & 9 deletions data_juicer/ops/deduplicator/ray_document_deduplicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import string

import regex as re
from pydantic import PositiveInt

from ..base_op import OPERATORS
from .ray_basic_deduplicator import RayBasicDeduplicator
Expand All @@ -17,26 +16,21 @@ class RayDocumentDeduplicator(RayBasicDeduplicator):
"""

def __init__(self,
redis_host: str = 'localhost',
redis_port: PositiveInt = 6380,
redis_address: str = 'redis://localhost:6379',
lowercase: bool = False,
ignore_non_character: bool = False,
*args,
**kwargs):
"""
Initialization method.
:param redis_host: the hostname of redis server
:param redis_port: the port of redis server
:param redis_address: the address of redis server
:param lowercase: Whether to convert sample text to lower case
:param ignore_non_character: Whether to ignore non-alphabet
characters, including whitespaces, digits, and punctuations
:param args: extra args
:param kwargs: extra args.
"""
super().__init__(redis_host=redis_host,
redis_port=redis_port,
*args,
**kwargs)
super().__init__(redis_address=redis_address, *args, **kwargs)
self.lowercase = lowercase
self.remove_non_character_regex = re.compile(
f'\s+|\d+|[{re.escape(string.punctuation)}]' # noqa: W605
Expand Down
12 changes: 3 additions & 9 deletions data_juicer/ops/deduplicator/ray_image_deduplicator.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import numpy as np
from pydantic import PositiveInt

from data_juicer.utils.lazy_loader import LazyLoader
from data_juicer.utils.mm_utils import load_data_with_context, load_image
Expand Down Expand Up @@ -36,22 +35,17 @@ class RayImageDeduplicator(RayBasicDeduplicator):
"""

def __init__(self,
redis_host: str = 'localhost',
redis_port: PositiveInt = 6380,
redis_address: str = 'redis://localhost:6379',
method: str = 'phash',
*args,
**kwargs):
"""
Initialization.
:param redis_host: the hostname of redis server
:param redis_port: the port of redis server
:param redis_address: the address of redis server
:param args: extra args
:param kwargs: extra args
"""
super().__init__(redis_host=redis_host,
redis_port=redis_port,
*args,
**kwargs)
super().__init__(redis_address=redis_address, *args, **kwargs)
if method not in HASH_METHOD:
raise ValueError(f'Keep strategy [{method}] is not supported. '
f'Can only be one of {HASH_METHOD}.')
Expand Down
Loading
Loading