Skip to content

Latest commit

 

History

History
103 lines (85 loc) · 7.44 KB

bulkdump.rst

File metadata and controls

103 lines (85 loc) · 7.44 KB

BulkDump (Dev)

Author: Zhe Wang
Reviewer: Michael Stack, Jingyu Zhou
Audience: FDB developers, SREs and expert users.

Overview

In a FoundationDB (FDB) key-value cluster, every key-value pair is replicated across multiple storage servers. The BulkDump tool is developed to dump all key-value pairs within the input range to files. Note that when the input range is large, the range splits into smaller ranges. Each subrange of data is dumped to a file at a version. All data within a file is at the same version. However, file versions can differ.

Input and output

When a user wants to start a bulkdump job, the user provides the range to dump and the path root of where to dump the data. The range can be any subrange within the user key space (i.e. " " ~ "\xff"). Dumping the data of the system key space and special key space (i.e. "\xff" ~ "\xff\xff\xff") is not allowed. The path root can be either a blobstore url or a path of a file system. Given the input range, if the range is large, the range splits into smaller ranges. Each subrange is dumped at a version to a folder. In particular, the folder is organized as following:

  1. (rootLocal)/(relativeFolder)/(dumpVersion)-manifest.txt (must have)
  2. (rootLocal)/(relativeFolder)/(dumpVersion)-data.sst (omitted if the subrange is empty)
  3. (rootLocal)/(relativeFolder)/(dumpVersion)-sample.sst (omitted if data size is too small to have a sample)

The (relativeFolder) is defined as (JobId)/(TaskId)/(BatchId). The (dumpVersion) is the version of the data stored in the (dumpVersion)-data.sst file. At any time, a FDB cluster can have at most one bulkdump job. A bulkdump job is partitioned into tasks by range and aligned to the shard boundary. When dumping the range of a task, the data is collected in batches. All key-value pairs of a batch are collected at the same version. Above all, (JobId) is the unique ID of a job. (TaskId) is the unique ID of a task. (BatchId) is the unique ID of a batch. All tasks's data files of the same job locates at the same Job folder named by the JobId. A task can consist of multiple batches, where each batch has a distinct version. However, all the data within a single batch shares the same version.

Each (relativeFolder) corresponds to exactly one subrange with exactly one manifest file. The manifest file includes all necessary information for loading the data from the folder to a FDB cluster. The manifest file content includes following information:

  1. File paths (full path root)
  2. Key Range of the dumped data in the folder
  3. Version when the data of the range is collected
  4. Checksum of the data
  5. Datasize of the data in bytes
  6. Bytes sampling setting (when a cluster loads the folder, if the setting mismatches, the loading cluster does bytes sampling by itself; Otherwise, the loading cluster directly uses the sample file of the folder).

In the job folder, there is a global manifest file including all ranges and their corresponding manifest files. When loading a cluster, users can use this global manifest to rebuild the data.

How to use?

Currently, FDBCLI tools and low-level ManagementAPIs are provided to submit a job or clear a job. These operations are achieved by issuing transactions to update the bulkdump metadata. Submitting a job is achieved by writting the job metadata to the bulkdump metadata range of the job. When submitting a job, the API checks if there is any ongoing bulkdump job. If yes, it will reject the job. Otherwise, it accepts the job. Clearing a job is achieved by erasing the entire user range space of the bulkdump metadata range. When clearing a job, all metadata will be cleared and any ongoing task is stopped (with some latency).

FDBCLI provides following interfaces to do the operations:

  1. Submit a job: bulkdump local|blobstore (BeginKey) (EndKey) (RootFolder) // Supply 'local' or 'blobstore' -- "local" indicates dump the data to a local folder. Will support dumping to a blob storage.
  2. Clear a job: bulkdump clear (JobID)
  3. Enable the feature: bulkdump mode on | off // "bulkdump mode" command prints the current value (on or off) of the mode.

ManagementAPI provides following interfaces to do the operations:

  1. Submit a job: submitBulkDumpJob(BulkDumpState job); // For generating the input job metadata, see the point 4.
  2. Clear a job: clearBulkDumpJob(UID jobId);
  3. Enable the feature: setBulkDumpMode(int mode); // Set mode = 1 to enable; Set mode = 0 to disable.
  4. BulkDump job metadata is generated by createBulkDumpJob();

Mechanisms

Workflow

  • Users input a range by a transaction and this range is persisted to bulkdump metadata (with "\xff/bulkDump/" prefix).
  • Bulkdump metadata is range-based.
  • DD observes this range to dump by reading from the metadata.
  • DD partitions the range into smaller ranges according to the shard boundary.
  • DD randomly chooses one storage server which owns the range as the agent to do the dump. DD holds outstanding promise with this SS. The task assigned to a SS is stateless.
  • DD sends the range dump request to the storage server. DD spawns a dedicated actor waiting on the call. If any failure happens at SS side, DD will know this.
  • DD sends the range dump request within the max parallelism specified by the knob DD_BULKDUMP_PARALLELISM.
  • SS recieves the request and read the data from local storage. If the range has been moved away or splitted, the SS replies failure to the DD and DD will retry the remaining range later. If the range is there, SS read the data and upload the data to external storage. This PR only implements to dump the data to local disk. There will be a PR to dump the data to S3.
  • When SS completes, the SS marks this range as completed in the metadata.

Invariant

  • At any time, FDB cluster accepts at most one bulkdump job. When user issuing a bulk dump job, the client will check if there is an existing bulk load job. If yes, reject the request.
  • DD partitions the range into subranges according to the shard boundary. For a subrange, the data is guaranteed to put into the same folder --- same as task ID.
  • Each data filename is the version indicating the version of the data read by the SS.
  • Each subrange always has one manifest file indicating the metadata information of the data, such as Range, Checksum (to be implemented later in a separate PR), and FilePath.
  • In SS, we dump files at first and then write metadata in the system key space. If any phase is failed, DD will re-do the range. For each time SS writes the folder (locally or in BlobStore), the SS erases the folder at first.
  • A SS handles at most one dump task at a time (the parallelism is protected by the knob SS_SERVE_BULKDUMP_PARALLELISM. With current implementation, this knob is set to 1. However, we leave the flexibility of setting bulkdump parallelism at a SS here).
  • Each subrange does not necessarily have a byteSample file and data file which depends on the data size. A SS may be assigned a range but the range is empty.

Failure handling

  • SS failure: DD will receive broken_promise. DD gives up working on the range at this time. DD will re-issue the request (via a different task) in the future until the range completes.
  • DD failure: It is possible that the same SS recieves two requests to work on the same range. SS uses a FlowLock to guarantee that SS handles one request at a time. So, there is no conflict.
  • S3 outage: Result in task failure. The failed task will be retried by DD.