Skip to content

Commit

Permalink
Merge pull request #394 from openaddresses/parquet
Browse files Browse the repository at this point in the history
Add Parquet as an output format for collections
  • Loading branch information
iandees authored Jan 23, 2025
2 parents 0929922 + f550050 commit b11e386
Show file tree
Hide file tree
Showing 3 changed files with 547 additions and 9 deletions.
107 changes: 104 additions & 3 deletions task/collect.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ import { mkdirp } from 'mkdirp';
import S3 from '@aws-sdk/client-s3';
import { Upload } from '@aws-sdk/lib-storage';
import archiver from 'archiver';
import parquet from '@dsnp/parquetjs';
import minimist from 'minimist';
import { Transform } from 'stream';
import wkx from 'wkx';

const s3 = new S3.S3Client({
region: process.env.AWS_DEFAULT_REGION
Expand Down Expand Up @@ -106,9 +108,15 @@ async function collect(tmp, collection, oa) {
const zip = await zip_datas(tmp, collection_data, collection.name);

console.error(`ok - zip created: ${zip}`);
await upload_collection(zip, collection.name);
await upload_zip_collection(zip, collection.name);
console.error('ok - archive uploaded');

const pq = await parquet_datas(tmp, collection_data, collection.name);

console.error(`ok - parquet created: ${pq}`);
await upload_parquet_collection(pq, collection.name);
console.error('ok - parquet uploaded');

await oa.cmd('collection', 'update', {
':collection': collection.id,
size: fs.statSync(zip).size
Expand Down Expand Up @@ -191,7 +199,7 @@ async function get_source(oa, tmp, data, stats) {
return path.resolve(tmp, 'sources', dir, source);
}

async function upload_collection(file, name) {
async function upload_zip_collection(file, name) {
const s3uploader = new Upload({
client: s3,
params: {
Expand All @@ -215,7 +223,6 @@ async function upload_collection(file, name) {
endpoint: `https://${process.env.CLOUDFLARE_ACCOUNT_ID}.r2.cloudflarestorage.com`
});


const r2uploader = new Upload({
client: r2,
params: {
Expand All @@ -229,7 +236,45 @@ async function upload_collection(file, name) {
await r2uploader.done();

console.error(`ok - uploaded: r2://${process.env.R2Bucket}/v2.openaddresses.io/${process.env.StackName}/collection-${name}.zip`);
}

async function upload_parquet_collection(file, name) {
const s3uploader = new Upload({
client: s3,
params: {
ContentType: 'application/vnd.apache.parquet',
Body: fs.createReadStream(file),
Bucket: process.env.Bucket,
Key: `${process.env.StackName}/collection-${name}.parquet`
}
});

await s3uploader.done();

console.error(`ok - s3://${process.env.Bucket}/${process.env.StackName}/collection-${name}.parquet`);

const r2 = new S3.S3Client({
region: 'auto',
credentials: {
accessKeyId: process.env.R2_ACCESS_KEY_ID,
secretAccessKey: process.env.R2_SECRET_ACCESS_KEY
},
endpoint: `https://${process.env.CLOUDFLARE_ACCOUNT_ID}.r2.cloudflarestorage.com`
});

const r2uploader = new Upload({
client: r2,
params: {
ContentType: 'application/vnd.apache.parquet',
Body: fs.createReadStream(file),
Bucket: process.env.R2Bucket,
Key: `v2.openaddresses.io/${process.env.StackName}/collection-${name}.parquet`
}
});

await r2uploader.done();

console.error(`ok - uploaded: r2://${process.env.R2Bucket}/v2.openaddresses.io/${process.env.StackName}/collection-${name}.parquet`);
}

function zip_datas(tmp, datas, name) {
Expand Down Expand Up @@ -271,3 +316,59 @@ function zip_datas(tmp, datas, name) {
archive.finalize();
});
}

function parquet_datas(tmp, datas, name) {
return new Promise((resolve) => {
const schema = {
source_name: { type: 'UTF8' },
geometry: { type: 'BINARY' },
id: { type: 'UTF8' },
pid: { type: 'UTF8' },
number: { type: 'UTF8' },
street: { type: 'UTF8' },
unit: { type: 'UTF8' },
city: { type: 'UTF8' },
postcode: { type: 'UTF8' },
district: { type: 'UTF8' },
region: { type: 'UTF8' },
addrtype: { type: 'UTF8' },
notes: { type: 'UTF8' }
};
const writer = parquet.ParquetWriter.openFile(schema, path.resolve(tmp, `${name}.parquet`));

for (const data of datas) {
const resolved_data_filename = path.resolve(tmp, 'sources', data);

// Read the file and parse it as linefeed-delimited JSON
const data_stream = fs.createReadStream(resolved_data_filename);
const data_lines = data_stream.pipe(split());
data_lines.on('data', (line) => {
const record = JSON.parse(line);
const properties = record.properties;
const wkbGeometry = wkx.Geometry.parseGeoJSON(record.geometry).toWkb();

writer.appendRow({
source_name: data,
geometry: wkbGeometry,
id: properties.id,
pid: properties.pid,
number: properties.number,
street: properties.street,
unit: properties.unit,
city: properties.city,
postcode: properties.postcode,
district: properties.district,
region: properties.region,
addrtype: properties.addrtype,
notes: properties.notes
});
});
data_lines.on('end', () => {
console.error(`ok - ${resolved_data_filename} processed and appended to parquet file`);
});
}

writer.close();
return resolve(path.resolve(tmp, `${name}.parquet`));
});
}
Loading

0 comments on commit b11e386

Please sign in to comment.