From 582865a614c57cc2c034b78ee90380b2aeb40271 Mon Sep 17 00:00:00 2001 From: Giacomo Citi Date: Thu, 2 Nov 2023 18:07:40 +0100 Subject: [PATCH 01/34] feat: cube package with validation pipeline --- packages/cli/lib/discoverManifests.js | 1 + packages/cli/lib/pipeline.js | 2 +- packages/cube/README.md | 68 +++++++ packages/cube/lib/batch.js | 30 +++ packages/cube/lib/quads.js | 11 ++ packages/cube/lib/target.js | 12 ++ packages/cube/package.json | 37 ++++ packages/cube/pipeline/cube-validation.ttl | 216 +++++++++++++++++++++ packages/sparql/manifest.ttl | 4 +- 9 files changed, 378 insertions(+), 3 deletions(-) create mode 100644 packages/cube/README.md create mode 100644 packages/cube/lib/batch.js create mode 100644 packages/cube/lib/quads.js create mode 100644 packages/cube/lib/target.js create mode 100644 packages/cube/package.json create mode 100644 packages/cube/pipeline/cube-validation.ttl diff --git a/packages/cli/lib/discoverManifests.js b/packages/cli/lib/discoverManifests.js index 01351f6f..1e816673 100644 --- a/packages/cli/lib/discoverManifests.js +++ b/packages/cli/lib/discoverManifests.js @@ -9,6 +9,7 @@ const require = module.createRequire(import.meta.url) export default async function * () { const packages = findPlugins({ includeDev: true, + includePeer: true, filter({ pkg }) { return packagePattern.test(pkg.name) && hasManifest(pkg.name) }, diff --git a/packages/cli/lib/pipeline.js b/packages/cli/lib/pipeline.js index 543e9e23..94e9165c 100644 --- a/packages/cli/lib/pipeline.js +++ b/packages/cli/lib/pipeline.js @@ -34,7 +34,7 @@ export const desugar = async (dataset, { logger, knownOperations } = {}) => { const [quad] = step.dataset.match(step.term) const knownStep = knownOperations.get(quad?.predicate) if (!knownStep) { - logger?.warn(`Operation <${quad?.predicate.value}> not found in known manifests. Have you added the right \`branard59-*\` package as dependency?`) + logger?.warn(`Operation <${quad?.predicate.value}> not found in known manifests. Have you added the right \`barnard59-*\` package as dependency?`) continue } diff --git a/packages/cube/README.md b/packages/cube/README.md new file mode 100644 index 00000000..f7c87ce9 --- /dev/null +++ b/packages/cube/README.md @@ -0,0 +1,68 @@ +# barnard59-cube + + +## Cube validation + +`cube-validation.ttl` contains pipelines to retrieve and validate cube observations and their constraints. + +### fetch cube constraint + +Pipeline `fetch-cube-constraint` queries a given SPARQL endpoint (default is https://lindas.admin.ch/query) to retrieve +a [concise bounded description](https://docs.stardog.com/query-stardog/#describe-queries) of the `cube:Constraint` part of a given cube. + +```bash +npx barnard59 run ./pipeline/cube-validation.ttl \ + --pipeline http://barnard59.zazuko.com/pipeline/cube-validation/fetch-cube-constraint \ + --variable cube=https://agriculture.ld.admin.ch/agroscope/PRIFm8t15/2 \ + --variable endpoint=https://int.lindas.admin.ch/query +``` + +This pipeline is useful mainly for cubes published with [cube creator](https://github.com/zazuko/cube-creator) (if the cube definition is manually crafted, likely it's already available as a local file). + + +### check cube constraint + +Pipeline `check-cube-constraint` validates the input constraint against the shapes provided with the `profile` variable (the default profile is https://cube.link/latest/shape/standalone-constraint-constraint but [cube link](https://cube.link/) defines additional ones). + +The pipeline reads the constraint from `stdin`, allowing input from a local file (as in the following example) as well as from the output of the `fetch-cube-constraint` pipeline (in most cases it's useful to have the constraint in a local file because it's needed also for the `check-cube-observations` pipeline). + +```bash +cat myConstraint.ttl \ +| npx barnard59 run ./pipeline/cube-validation.ttl \ + --pipeline http://barnard59.zazuko.com/pipeline/cube-validation/check-cube-constraint \ + --variable profile=https://cube.link/v0.0.5/shape/standalone-constraint-constraint +``` +TODO: explain how validation errors are reported + + +### fetch cube observations + +Pipeline `fetch-cube-observations` queries a given SPARQL endpoint (default is https://lindas.admin.ch/query) to retrieve the observations of a given cube. + +```bash +npx barnard59 run ./pipeline/cube-validation.ttl \ + --pipeline http://barnard59.zazuko.com/pipeline/cube-validation/fetch-cube-observations \ + --variable cube=https://agriculture.ld.admin.ch/agroscope/PRIFm8t15/2 \ + --variable endpoint=https://int.lindas.admin.ch/query +``` +Results are returned sorted by observation so that the potentially big output stream can be split (by the `check-cube-observations` pipeline) and each observation can be processed separately. + + + +### check cube observations + +Pipeline `check-cube-observations` validates the input observations against the shapes provided with the _constraint_ variable. + +The pipeline reads the observations from `stdin`, allowing input from a local file (as in the following example) as well as from the output of the `fetch-cube-observations` pipeline. + +```bash +cat myObservations.ttl \ +| npx barnard59 run ./pipeline/cube-validation.ttl \ + --pipeline http://barnard59.zazuko.com/pipeline/cube-validation/check-cube-observations \ + --variable constraint=myConstraint.ttl +``` +To enable validation, the pipeline adds to the constraint a `sh:targetClass` property with value `cube:Observation` (assuming that each observation has an explicit property `rdf:type` with value `cube:Observation`). + +To leverage streaming, the pipeline also assumes the triples for the same observation to be adjacent (`fetch-cube-observations` achieves this sorting by observation). + +TODO: explain how validation errors are reported diff --git a/packages/cube/lib/batch.js b/packages/cube/lib/batch.js new file mode 100644 index 00000000..3cf81833 --- /dev/null +++ b/packages/cube/lib/batch.js @@ -0,0 +1,30 @@ +import { Duplex } from 'stream' +import rdf from '@zazuko/env-node' + +// Iterable => Iterable +export async function * chunkObjectsBySize(size, iterable) { + let chunk = [] + for await (const item of iterable) { + chunk.push(item) + if (chunk.length === size) { + yield chunk + chunk = [] + } + } + if (chunk.length > 0) { + yield chunk + } +} + +// Iterable => Iterable +export async function * chunkBySize(size, iterable) { + for await (const array of chunkObjectsBySize(size, iterable)) { + const batch = rdf.dataset() + for (const dataset of array) { + batch.addAll(dataset) + } + yield batch + } +} + +export const batch = size => Duplex.from(iterable => chunkBySize(Number(size), iterable)) diff --git a/packages/cube/lib/quads.js b/packages/cube/lib/quads.js new file mode 100644 index 00000000..670a9d8f --- /dev/null +++ b/packages/cube/lib/quads.js @@ -0,0 +1,11 @@ +import { Transform } from 'node:stream' +import rdf from '@zazuko/env-node' + +export const toQuad = () => new Transform({ + readableObjectMode: true, + writableObjectMode: true, + transform(row, _encoding, callback) { + this.push(rdf.quad(row.s, row.p, row.o)) + callback() + }, +}) diff --git a/packages/cube/lib/target.js b/packages/cube/lib/target.js new file mode 100644 index 00000000..c8d0df92 --- /dev/null +++ b/packages/cube/lib/target.js @@ -0,0 +1,12 @@ +import rdf from '@zazuko/env-node' + +const cube = rdf.namespace('https://cube.link/') + +export const addTarget = shape => { + const constraint = rdf.clownface({ dataset: shape, term: cube.Constraint }).in(rdf.ns.rdf.type) + if (!constraint.term) { + throw new Error('could not find a unique constraint') + } + constraint.addOut(rdf.ns.sh.targetClass, cube.Observation) + return shape +} diff --git a/packages/cube/package.json b/packages/cube/package.json new file mode 100644 index 00000000..bafab833 --- /dev/null +++ b/packages/cube/package.json @@ -0,0 +1,37 @@ +{ + "name": "barnard59-cube", + "version": "2.0.0", + "description": "RDF cubes for Linked Data pipelines", + "type": "module", + "main": "index.js", + "scripts": { + "test": "mocha" + }, + "repository": { + "type": "git", + "url": "git://github.com/zazuko/barnard59.git", + "directory": "packages/cube" + }, + "keywords": [], + "author": "Thomas Bergwinkl (https://www.bergnet.org/people/bergi/card#me)", + "license": "MIT", + "bugs": { + "url": "https://github.com/zazuko/barnard59/issues" + }, + "homepage": "https://github.com/zazuko/barnard59", + "peerDependencies": { + "barnard59-base": "^2.0.0", + "barnard59-rdf": "^2.0.0", + "barnard59-http": "^2.0.0", + "barnard59-sparql": "^2.0.0", + "barnard59-formats": "^2.0.0", + "barnard59-validate-shacl": "^0.3.8" + }, + "engines": { + "node": ">= 14.0.0" + }, + "dependencies": { + "@zazuko/env-node": "^1.0.0", + "rdf-stream-to-dataset-stream": "^1.0.0" + } +} diff --git a/packages/cube/pipeline/cube-validation.ttl b/packages/cube/pipeline/cube-validation.ttl new file mode 100644 index 00000000..1406211b --- /dev/null +++ b/packages/cube/pipeline/cube-validation.ttl @@ -0,0 +1,216 @@ +@prefix rdf: . +@prefix rdfs: . +@prefix code: . +@prefix p: . +@prefix sparql: . +@prefix http: . +@prefix shacl: . +@prefix base: . +@prefix n3: . +@prefix ntriples: . + +@base . + +_:endpoint a p:Variable ; + p:name "endpoint" ; + rdfs:label "SPARQL endpoint" ; + p:value "https://lindas.admin.ch/query" +. + +_:cube a p:Variable ; + p:name "cube" ; + rdfs:label "cube URI" ; +. + +_:constraint a p:Variable ; + p:name "constraint" ; + rdfs:label "cube constraint file" ; +. + +_:profile a p:Variable ; + p:name "profile" ; + rdfs:label "cube constraint profile URL" ; + p:value "https://cube.link/latest/shape/standalone-constraint-constraint" ; +. + + a p:Pipeline ; + p:variables [ p:variable _:endpoint, _:cube ] ; + p:steps + [ + p:stepList + ( + _:queryConstraint + [ ntriples:serialize () ] + [ base:stdout () ] + ) + ] +. + + a p:Pipeline ; + p:variables [ p:variable _:endpoint, _:cube ] ; + p:steps + [ + p:stepList + ( + _:queryObservations + _:toQuads + [ ntriples:serialize () ] + [ base:stdout () ] + ) + ] +. + + a p:Pipeline ; + p:variables [ p:variable _:profile ] ; + p:steps + [ + p:stepList ( + _:stdin + [ n3:parse () ] + _:toDataset + _:validateWithProfile + ) + ] +. + + a p:Pipeline ; + p:variables [ p:variable _:constraint ] ; + p:steps + [ + p:stepList ( + _:stdin + [ n3:parse () ] + _:toDatasetBySubject + _:batch + _:validateWithConstraint + ) + ] +. + +# relying on cbd is not ideal (vendor specific) +_:queryConstraint sparql:construct + [ code:name "endpoint"; code:value "endpoint"^^p:VariableName ] , + [ + code:name "query"; + code:value """ + #pragma describe.strategy cbd + + PREFIX cube: + + DESCRIBE ?s + WHERE { + <${cube}> cube:observationConstraint ?s . + } + """^^code:EcmaScriptTemplateLiteral + ] +. + +# use SELECT instead of CONSTRUCT to ensure result is ordered by subject +_:queryObservations sparql:select + [ + code:name "endpoint"; + code:value "endpoint"^^p:VariableName + ] , + [ + code:name "query"; + code:value """ + PREFIX cube: + + SELECT ?s ?p ?o + WHERE { + <${cube}> cube:observationSet/cube:observation ?s . + ?s ?p ?o + } + ORDER BY ?s + """^^code:EcmaScriptTemplateLiteral + ] +. + +_:toQuads a p:Step ; + code:implementedBy + [ + a code:EcmaScriptModule; + code:link + ] +. + +_:stdin a p:Step ; + code:implementedBy "() => process.stdin"^^code:EcmaScript +. + +_:toDataset a p:Step ; + code:implementedBy + [ + rdf:type code:EcmaScript ; + code:link ; + ] +. + +_:toDatasetBySubject a p:Step ; + code:implementedBy + [ + rdf:type code:EcmaScript ; + code:link ; + ] +. + +_:batch a p:Step ; + code:implementedBy + [ + a code:EcmaScriptModule ; + code:link ; + ] ; + code:arguments (50) +. + +_:validateWithProfile shacl:validate + [ code:name "shape" ; code:value _:getProfile ] + # beware of maxError = 1, see https://github.com/zazuko/rdf-validate-shacl/issues/88 +. + +_:validateWithConstraint shacl:validate + [ code:name "shape" ; code:value _:getConstraint ] + # beware of maxError = 1, see https://github.com/zazuko/rdf-validate-shacl/issues/88 +. + + +_:getProfile a p:Pipeline , p:ReadableObjectMode; + p:steps + [ + p:stepList + ( + [ http:get [ code:name "url" ; code:value "profile"^^p:VariableName ] ] + [ n3:parse () ] + ) + ] + . + +_:getConstraint a p:Pipeline , p:ReadableObjectMode ; + p:variables [ p:variable _:constraint ] ; + p:steps + [ + p:stepList + ( + _:readConstraint + _:toDataset + _:addTarget + [ base:flatten () ] + ) + ] +. + +_:readConstraint a p:Step ; + code:implementedBy + [ + a code:EcmaScriptModule ; + code:link ; + ] ; + code:arguments ("constraint"^^p:VariableName) +. + +_:addTarget base:map ( + [ + a code:EcmaScriptModule ; + code:link + ] +) . diff --git a/packages/sparql/manifest.ttl b/packages/sparql/manifest.ttl index 7a83ad19..2e927d1a 100644 --- a/packages/sparql/manifest.ttl +++ b/packages/sparql/manifest.ttl @@ -7,12 +7,12 @@ rdfs:label "SPARQL Select"; rdfs:comment "Runs the given CONSTRUCT query against the given endpoint parses the result."; code:implementedBy [ a code:EcmaScriptModule; - code:link + code:link ].