Skip to content

Commit

Permalink
Merge branch 'df-dev' of https://github.com/gleanerio/nabu into dev_eco
Browse files Browse the repository at this point in the history
# Conflicts:
#	internal/objects/pipecopy.go
  • Loading branch information
valentinedwv committed Nov 15, 2024
2 parents d46cf0a + e34ad75 commit d9538ed
Show file tree
Hide file tree
Showing 15 changed files with 169 additions and 291 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2.0.18-df-development
2.0.19-df-development
8 changes: 8 additions & 0 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -177,4 +177,12 @@ nabu object --cfgPath directory --cfgName name objectId
eg use generated
```
nabu object --cfgPath ../gleaner/configs --cfgName local milled/opentopography/ffa0df033bb3a8fc9f600c80df3501fe1a2dbe93.rdf
```

### Using URL based configuration

Nabu can also read the configuration file from over the network

```
go run ../../cmd/nabu/main.go release --cfgURL https://provisium.io/data/nabuconfig.yaml --prefix summoned/dataverse --endpoint localoxi
```
11 changes: 11 additions & 0 deletions docs/httpSPARQL.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,15 @@ curl -H 'Accept: application/sparql-results+json' http://coreos.lan:9090/blazegr

```bash
curl -H 'Accept: application/sparql-results+json' http://coreos.lan:9090/blazegraph/namespace/iow/sparql --data-urlencode 'query=SELECT (COUNT(DISTINCT ?graph) AS ?namedGraphsCount)(COUNT(*) AS ?triplesCount)WHERE {GRAPH ?graph {?subject ?predicate ?object}}'
```


### Oxigraph

```bash
curl -i -X PUT -H 'Content-Type:text/x-nquads' --data-binary @veupathdb_release.nq http://localhost:7878/store
```

```bash
curl -i -X POST -H 'Content-Type:text/x-nquads' --data-binary @veupathdb_release.nq http://localhost:7878/store
```
6 changes: 0 additions & 6 deletions docs/images/prefix.d2

This file was deleted.

35 changes: 0 additions & 35 deletions docs/images/prefix.svg

This file was deleted.

3 changes: 0 additions & 3 deletions docs/images/prune.d2

This file was deleted.

35 changes: 0 additions & 35 deletions docs/images/prune.svg

This file was deleted.

77 changes: 0 additions & 77 deletions docs/images/workflow.d2

This file was deleted.

102 changes: 0 additions & 102 deletions docs/images/workflow.svg

This file was deleted.

2 changes: 1 addition & 1 deletion internal/graph/toFromRDF.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"bytes"
"encoding/json"
"fmt"
"log"
log "github.com/sirupsen/logrus"
"strings"

"github.com/knakk/rdf"
Expand Down
85 changes: 65 additions & 20 deletions internal/objects/pipecopy.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,44 @@ import (
"bufio"
"bytes"
"context"
"crypto/sha256"
"encoding/hex"
"fmt"
"github.com/spf13/viper"
"io"
"strings"
"sync"

"github.com/spf13/viper"
"time"

"github.com/gleanerio/nabu/internal/graph"
log "github.com/sirupsen/logrus"

"github.com/minio/minio-go/v7"
)

func getLastElement(s string) string {
parts := strings.Split(s, "/")
return parts[len(parts)-1]
}

// GenerateDateHash generates a unique hash based on the current date and time.
func generateDateHash() string {
// Get the current date and time
now := time.Now()

// Format the date and time as a string
dateString := now.Format("2006-01-02 15:04:05")

// Create a SHA256 hash
hash := sha256.New()
hash.Write([]byte(dateString))

// Convert the hash to a hex string
hashString := hex.EncodeToString(hash.Sum(nil))

return hashString
}

// PipeCopy writes a new object based on an prefix, this function assumes the objects are valid when concatenated
// v1: viper config object
// mc: minio client pointer
Expand All @@ -26,7 +51,8 @@ import (
// destprefix: destination prefix
// sf: boolean to declare if single file or not. If so, skip skolimization since JSON-LD library output is enough
func PipeCopy(v1 *viper.Viper, mc *minio.Client, name, bucket, prefix, destprefix string) error {
log.Printf("PipeCopy with name: %s bucket: %s prefix: %s", name, bucket, prefix)
orgname := v1.GetString("implementation_network.orgname")
log.Printf("PipeCopy with name: %s bucket: %s prefix: %s org name: %s", name, bucket, prefix, orgname)

pr, pw := io.Pipe() // TeeReader of use?
lwg := sync.WaitGroup{} // work group for the pipe writes...
Expand All @@ -47,10 +73,6 @@ func PipeCopy(v1 *viper.Viper, mc *minio.Client, name, bucket, prefix, destprefi
}
}(pw)

// Set and use a "single file flag" to bypass skolimaization since if it is a single file
// the JSON-LD to RDF will correctly map blank nodes.
// NOTE: with a background context we can't get the len(channel) so we have to iterate it.
// This is fast, but it means we have to do the ListObjects twice
clen := 0
sf := false
ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -67,27 +89,25 @@ func PipeCopy(v1 *viper.Viper, mc *minio.Client, name, bucket, prefix, destprefi

objectCh := mc.ListObjects(context.Background(), bucket, minio.ListObjectsOptions{Prefix: prefix, Recursive: isRecursive})

// for object := range mc.ListObjects(context.Background(), bucket, minio.ListObjectsOptions{Prefix: prefix, Recursive: isRecursive}, doneCh) {
lastProcessed := false
idList := make([]string, 0)
for object := range objectCh {
fo, err := mc.GetObject(context.Background(), bucket, object.Key, minio.GetObjectOptions{})
if err != nil {
log.Errorf(" failed to read object %s %s ", object.Key, err)
//fmt.Println(err)
continue
}
log.Tracef(" processing object %s ", object.Key)
var b bytes.Buffer
bw := bufio.NewWriter(&b)

_, err = io.Copy(bw, fo)
if err != nil {
log.Errorf(" failed to read object %s %s ", object.Key, err)
log.Println(err)
continue
}

s := string(b.Bytes())

nq := ""
//log.Println("Calling JSONLDtoNQ")
if strings.HasSuffix(object.Key, ".nq") {
nq = s
} else {
Expand All @@ -97,37 +117,62 @@ func PipeCopy(v1 *viper.Viper, mc *minio.Client, name, bucket, prefix, destprefi
continue
}
}

var snq string

if sf {
snq = nq // just pass through the RDF without trying to Skolemize since we ar a single fil
snq = nq
} else {
snq, err = graph.Skolemization(nq, object.Key)
if err != nil {
log.Errorf(" failed Skolemization %s %s ", object.Key, err)
continue
}
}

// 1) get graph URI
ctx, err := graph.MakeURN(v1, object.Key)
if err != nil {
log.Errorf(" failed MakeURN %s %s ", object.Key, err)
continue
}
// 2) convert NT to NQ
csnq, err := graph.NtToNq(snq, ctx)
if err != nil {
log.Errorf(" failed NtToNq %s %s ", object.Key, err)
continue
}

_, err = pw.Write([]byte(csnq))
if err != nil {
log.Errorf(" failed pipe write %s %s ", object.Key, err)
continue
}
idList = append(idList, ctx)
lastProcessed = true
}

// Once we are done with the loop, put in the triples to associate all the graphURIs with the org.
if lastProcessed {

data := `<urn:gleaner.io:` + orgname + `:datacatalog> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <https://schema.org/DataCatalog> .
<urn:gleaner.io:` + orgname + `:datacatalog> <https://schema.org/description> "GleanerIO Nabu generated catalog" .
<urn:gleaner.io:` + orgname + `:datacatalog> <https://schema.org/dateCreated> "` + time.Now().Format("2006-01-02 15:04:05") + `" .
<urn:gleaner.io:` + orgname + `:datacatalog> <https://schema.org/provider> <urn:gleaner.io:` + orgname + `:provider> .
<urn:gleaner.io:` + orgname + `:datacatalog> <https://schema.org/publisher> <urn:gleaner.io:` + getLastElement(prefix) + `:publisher> .
<urn:gleaner.io:` + orgname + `:provider> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <https://schema.org/Organization> .
<urn:gleaner.io:` + orgname + `:provider> <https://schema.org/name> "` + orgname + `" .
<urn:gleaner.io:` + getLastElement(prefix) + `:publisher> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <https://schema.org/Organization> .
<urn:gleaner.io:` + getLastElement(prefix) + `:publisher> <https://schema.org/name> "` + getLastElement(prefix) + `" .
`

for _, item := range idList {
data += `<urn:gleaner.io:` + orgname + `:datacatalog> <https://schema.org/dataset> <` + item + `> .` + "\n"
}

namedgraph := "urn:gleaner.io:" + orgname + ":" + getLastElement(prefix) + ":datacatalog:" + generateDateHash()
sdata, err := graph.NtToNq(data, namedgraph)

// Perform the final write to the pipe here
// ilstr := strings.Join(idList, ",")
_, err = pw.Write([]byte(sdata))
if err != nil {
log.Println(err)
}
}
}()

Expand Down
25 changes: 16 additions & 9 deletions internal/services/bulk/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package bulk
import (
"bytes"
"errors"
"fmt"
"io"
"net/http"
"strings"
Expand Down Expand Up @@ -37,7 +36,7 @@ func BulkLoad(v1 *viper.Viper, mc *minio.Client, bucketName string, item string)

// check for the required bulk endpoint, no need to move on from here
if spql.URL == "" {
return "", errors.New("The configuration file lacks an endpointBulk entry")
return "", errors.New("configuration file lacks an endpointBulk entry")
}

log.Printf("Object %s:%s for %s with method %s type %s", bucketName, item, ep, md, ct)
Expand All @@ -53,12 +52,13 @@ func BulkLoad(v1 *viper.Viper, mc *minio.Client, bucketName string, item string)
// Review if this graph g should b here since we are loading quads
// I don't think it should b. validate with all the tested triple stores
//bn := strings.Replace(bucketName, ".", ":", -1) // convert to urn : values, buckets with . are not valid IRIs
g, err := graph.MakeURN(v1, item)
//g, err := graph.MakeURN(v1, item)
if err != nil {
log.Error("gets3Bytes %v\n", err)
return "", err // Assume return. since on this error things are not good?
}
url := fmt.Sprintf("%s?graph=%s", ep, g)
//url := fmt.Sprintf("%s?graph=%s", ep, g) // NOTE 11-13-2023 ?graph with nquads fails with Oxigraph
url := ep // testing

// check if JSON-LD and convert to RDF
if strings.Contains(item, ".jsonld") {
Expand All @@ -73,8 +73,16 @@ func BulkLoad(v1 *viper.Viper, mc *minio.Client, bucketName string, item string)
if err != nil {
return "", err
}
req.Header.Set("Content-Type", ct) // needs to be x-nquads for blaze, n-quads for jena and graphdb
req.Header.Set("User-Agent", "EarthCube_DataBot/1.0")

headers := map[string]string{
"Content-Type": ct, // replace value with actual content
"User-Agent": "EarthCube_DataBot/1.0",
// add other headers here
}

for k, v := range headers {
req.Header.Add(k, v)
}

client := &http.Client{}
resp, err := client.Do(req)
Expand All @@ -87,16 +95,15 @@ func BulkLoad(v1 *viper.Viper, mc *minio.Client, bucketName string, item string)
}
}(resp.Body)

log.Println(resp)
body, err := io.ReadAll(resp.Body) // return body if you want to debugg test with it
body, err := io.ReadAll(resp.Body) // return body if you want to debug test with it
if err != nil {
log.Println(string(body))
return string(body), err
}

// report
log.Println(string(body))
log.Printf("success: %s : %d : %s\n", item, len(b), ep)
log.Printf("status: %s : %d : %s\n", item, len(b), ep)

return string(body), err
}
1 change: 1 addition & 0 deletions internal/services/releases/bulkLoader.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func BulkRelease(v1 *viper.Viper, mc *minio.Client) error {
return err
}

// TODO Should this be optional / controlled by flag?
// Copy the "latest" graph just made to archive with a date
// This means the graph in latests is a duplicate of the most recently dated version in archive/{provider}
const layout = "2006-01-02-15-04-05"
Expand Down
Loading

0 comments on commit d9538ed

Please sign in to comment.