Skip to content

Latest commit

 

History

History
284 lines (222 loc) · 15.2 KB

File metadata and controls

284 lines (222 loc) · 15.2 KB

R2

This page guides you through the process of setting up the R2 destination connector.

Prerequisites

List of required fields:

  • Account ID
  • Access Key ID
  • Secret Access Key
  • R2 Bucket Name
  • R2 Bucket Path
  1. Allow connections from Airbyte server to your Cloudflare R2 bucket

Step 1: Set up R2

Sign in to your Cloudflare account. Purchase R2 this

Use an existing or create new Access Key ID and Secret Access Key.

Prepare R2 bucket that will be used as destination, see this to create an S3 bucket, or you can create bucket via R2 module of dashboard.

Step 2: Set up the R2 destination connector in Airbyte

For Airbyte Cloud:

  1. Log into your Airbyte Cloud account.
  2. In the left navigation bar, click Destinations. In the top-right corner, click + new destination.
  3. On the destination setup page, select R2 from the Destination type dropdown and enter a name for this connector.
  4. Configure fields:
    • Account Id
      • See this to copy your Account ID.
    • Access Key Id
      • See this on how to generate an access key.
    • Secret Access Key
      • Corresponding key to the above key id.
    • R2 Bucket Name
      • See this to create an R2 bucket or you can create bucket via R2 module of dashboard.
    • R2 Bucket Path
      • Subdirectory under the above bucket to sync the data into.
    • R2 Path Format
      • Additional string format on how to store data under R2 Bucket Path. Default value is ${NAMESPACE}/${STREAM_NAME}/${YEAR}_${MONTH}_${DAY} _${EPOCH}_.
    • R2 Filename pattern
      • The pattern allows you to set the file-name format for the R2 staging file(s), next placeholders combinations are currently supported: {date}, {date:yyyy_MM}, {timestamp}, {timestamp:millis}, {timestamp:micros}, {part_number}, {sync_id}, {format_extension}. Please, don't use empty space and not supportable placeholders, as they won't recognized.
  5. Click Set up destination.

For Airbyte OSS:

  1. Go to local Airbyte page.

  2. In the left navigation bar, click Destinations. In the top-right corner, click + new destination.

  3. On the destination setup page, select R2 from the Destination type dropdown and enter a name for this connector.

  4. Configure fields:

    • Account Id
      • See this to copy your Account ID.
    • Access Key Id
      • See this on how to generate an access key.
    • Secret Access Key
      • Corresponding key to the above key id.
    • Make sure your R2 bucket is accessible from the machine running Airbyte.
      • This depends on your networking setup.
      • The easiest way to verify if Airbyte is able to connect to your R2 bucket is via the check connection tool in the UI.
    • R2 Bucket Name
      • See this to create an R2 bucket or you can create bucket via R2 module of dashboard.
    • R2 Bucket Path
      • Subdirectory under the above bucket to sync the data into.
    • R2 Path Format
      • Additional string format on how to store data under R2 Bucket Path. Default value is ${NAMESPACE}/${STREAM_NAME}/${YEAR}_${MONTH}_${DAY} _${EPOCH}_.
    • R2 Filename pattern
      • The pattern allows you to set the file-name format for the R2 staging file(s), next placeholders combinations are currently supported: {date}, {date:yyyy_MM}, {timestamp}, {timestamp:millis}, {timestamp:micros}, {part_number}, {sync_id}, {format_extension}. Please, don't use empty space and not supportable placeholders, as they won't recognized.
  5. Click Set up destination.

The full path of the output data with the default S3 Path Format ${NAMESPACE}/${STREAM_NAME}/${YEAR}_${MONTH}_${DAY}_${EPOCH}_ is:

<bucket-name>/<source-namespace-if-exists>/<stream-name>/<upload-date>_<epoch>_<partition-id>.<format-extension>

For example:

testing_bucket/data_output_path/public/users/2021_01_01_1234567890_0.csv.gz
↑              ↑                ↑      ↑     ↑          ↑          ↑ ↑
|              |                |      |     |          |          | format extension
|              |                |      |     |          |          unique incremental part id
|              |                |      |     |          milliseconds since epoch
|              |                |      |     upload date in YYYY_MM_DD
|              |                |      stream name
|              |                source namespace (if it exists)
|              bucket path
bucket name

The rationales behind this naming pattern are:

  1. Each stream has its own directory.
  2. The data output files can be sorted by upload time.
  3. The upload time composes of a date part and millis part so that it is both readable and unique.

But it is possible to further customize by using the available variables to format the bucket path:

  • ${NAMESPACE}: Namespace where the stream comes from or configured by the connection namespace fields.
  • ${STREAM_NAME}: Name of the stream
  • ${YEAR}: Year in which the sync was writing the output data in.
  • ${MONTH}: Month in which the sync was writing the output data in.
  • ${DAY}: Day in which the sync was writing the output data in.
  • ${HOUR}: Hour in which the sync was writing the output data in.
  • ${MINUTE} : Minute in which the sync was writing the output data in.
  • ${SECOND}: Second in which the sync was writing the output data in.
  • ${MILLISECOND}: Millisecond in which the sync was writing the output data in.
  • ${EPOCH}: Milliseconds since Epoch in which the sync was writing the output data in.
  • ${UUID}: random uuid string

Note:

  • Multiple / characters in the R2 path are collapsed into a single / character.
  • If the output bucket contains too many files, the part id variable is using a UUID instead. It uses sequential ID otherwise.

Please note that the stream name may contain a prefix, if it is configured on the connection. A data sync may create multiple files as the output files can be partitioned by size (targeting a size of 200MB compressed or lower) .

Supported sync modes

Feature Support Notes
Full Refresh Sync Warning: this mode deletes all previously synced data in the configured bucket path.
Incremental - Append Sync
Incremental - Deduped History As this connector does not support dbt, we don't support this sync mode on this destination.
Namespaces Setting a specific bucket path is equivalent to having separate namespaces.

The Airbyte R2 destination allows you to sync data to Cloudflare R2. Each stream is written to its own directory under the bucket. ⚠️ Please note that under "Full Refresh Sync" mode, data in the configured bucket and path will be wiped out before each sync. We recommend you to provision a dedicated R2 resource for this sync to prevent unexpected data deletion from misconfiguration. ⚠️

Supported Output schema

Each stream will be outputted to its dedicated directory according to the configuration. The complete datastore of each stream includes all the output files under that directory. You can think of the directory as equivalent of a Table in the database world.

  • Under Full Refresh Sync mode, old output files will be purged before new files are created.
  • Under Incremental - Append Sync mode, new output files will be added that only contain the new data.

Avro

Apache Avro serializes data in a compact binary format. Currently, the Airbyte R2 Avro connector always uses thebinary encoding, and assumes that all data records follow the same schema.

Configuration

Here is the available compression codecs:

  • No compression
  • deflate
    • Compression level
      • Range [0, 9]. Default to 0.
      • Level 0: no compression & fastest.
      • Level 9: best compression & slowest.
  • bzip2
  • xz
    • Compression level
      • Range [0, 9]. Default to 6.
      • Level 0-3 are fast with medium compression.
      • Level 4-6 are fairly slow with high compression.
      • Level 7-9 are like level 6 but use bigger dictionaries and have higher memory requirements. Unless the uncompressed size of the file exceeds 8 MiB, 16 MiB, or 32 MiB, it is waste of memory to use the presets 7, 8, or 9, respectively.
  • zstandard
    • Compression level
      • Range [-5, 22]. Default to 3.
      • Negative levels are 'fast' modes akin to lz4 or snappy.
      • Levels above 9 are generally for archival purposes.
      • Levels above 18 use a lot of memory.
    • Include checksum
      • If set to true, a checksum will be included in each data block.
  • snappy

Data schema

Under the hood, an Airbyte data stream in JSON schema is first converted to an Avro schema, then the JSON object is converted to an Avro record. Because the data stream can come from any data source, the JSON to Avro conversion process has arbitrary rules and limitations. Learn more about how source data is converted to Avro and the current limitations here.

CSV

Like most of the other Airbyte destination connectors, usually the output has three columns: a UUID, an emission timestamp, and the data blob. With the CSV output, it is possible to normalize (flatten) the data blob to multiple columns.

Column Condition Description
_airbyte_ab_id Always exists A uuid assigned by Airbyte to each processed record.
_airbyte_emitted_at Always exists. A timestamp representing when the event was pulled from the data source.
_airbyte_data When no normalization (flattening) is needed, all data reside under this column as a json blob.
root level fields When root level normalization (flattening) is selected, the root level fields are expanded.

For example, given the following json object from a source:

{
  "user_id": 123,
  "name": {
    "first": "John",
    "last": "Doe"
  }
}

With no normalization, the output CSV is:

_airbyte_ab_id _airbyte_emitted_at _airbyte_data
26d73cde-7eb1-4e1e-b7db-a4c03b4cf206 1622135805000 { "user_id": 123, name: { "first": "John", "last": "Doe" } }

With root level normalization, the output CSV is:

_airbyte_ab_id _airbyte_emitted_at user_id name
26d73cde-7eb1-4e1e-b7db-a4c03b4cf206 1622135805000 123 { "first": "John", "last": "Doe" }

Output files can be compressed. The default option is GZIP compression. If compression is selected, the output filename will have an extra extension (GZIP: .csv.gz).

JSON Lines (JSONL)

JSON Lines is a text format with one JSON per line. Each line has a structure as follows:

{
  "_airbyte_ab_id": "<uuid>",
  "_airbyte_emitted_at": "<timestamp-in-millis>",
  "_airbyte_data": "<json-data-from-source>"
}

For example, given the following two json objects from a source:

[
  {
    "user_id": 123,
    "name": {
      "first": "John",
      "last": "Doe"
    }
  },
  {
    "user_id": 456,
    "name": {
      "first": "Jane",
      "last": "Roe"
    }
  }
]

They will be like this in the output file:

{ "_airbyte_ab_id": "26d73cde-7eb1-4e1e-b7db-a4c03b4cf206", "_airbyte_emitted_at": "1622135805000", "_airbyte_data": { "user_id": 123, "name": { "first": "John", "last": "Doe" } } }
{ "_airbyte_ab_id": "0a61de1b-9cdd-4455-a739-93572c9a5f20", "_airbyte_emitted_at": "1631948170000", "_airbyte_data": { "user_id": 456, "name": { "first": "Jane", "last": "Roe" } } }

Output files can be compressed. The default option is GZIP compression. If compression is selected, the output filename will have an extra extension (GZIP: .jsonl.gz).

Parquet

Configuration

The following configuration is available to configure the Parquet output:

Parameter Type Default Description
compression_codec enum UNCOMPRESSED Compression algorithm. Available candidates are: UNCOMPRESSED, SNAPPY, GZIP, LZO, BROTLI, LZ4, and ZSTD.
block_size_mb integer 128 (MB) Block size (row group size) in MB. This is the size of a row group being buffered in memory. It limits the memory usage when writing. Larger values will improve the IO when reading, but consume more memory when writing.
max_padding_size_mb integer 8 (MB) Max padding size in MB. This is the maximum size allowed as padding to align row groups. This is also the minimum size of a row group.
page_size_kb integer 1024 (KB) Page size in KB. The page size is for compression. A block is composed of pages. A page is the smallest unit that must be read fully to access a single record. If this value is too small, the compression will deteriorate.
dictionary_page_size_kb integer 1024 (KB) Dictionary Page Size in KB. There is one dictionary page per column per row group when dictionary encoding is used. The dictionary page size works like the page size but for dictionary.
dictionary_encoding boolean true Dictionary encoding. This parameter controls whether dictionary encoding is turned on.

These parameters are related to the ParquetOutputFormat. See the Java doc for more details. Also see Parquet documentation for their recommended configurations (512 - 1024 MB block size, 8 KB page size).

Data schema

Under the hood, an Airbyte data stream in JSON schema is first converted to an Avro schema, then the JSON object is converted to an Avro record, and finally the Avro record is outputted to the Parquet format. Because the data stream can come from any data source, the JSON to Avro conversion process has arbitrary rules and limitations. Learn more about how source data is converted to Avro and the current limitations here.

CHANGELOG

Version Date Pull Request Subject
0.1.0 2022-09-25 #15296 Initial release.