diff --git a/VERSION b/VERSION index 1d0348a..c996112 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -2.0.18-df-development +2.0.19-df-development diff --git a/docs/README.md b/docs/README.md index 515d4e2..447b448 100644 --- a/docs/README.md +++ b/docs/README.md @@ -177,4 +177,12 @@ nabu object --cfgPath directory --cfgName name objectId eg use generated ``` nabu object --cfgPath ../gleaner/configs --cfgName local milled/opentopography/ffa0df033bb3a8fc9f600c80df3501fe1a2dbe93.rdf +``` + +### Using URL based configuration + +Nabu can also read the configuration file from over the network + +``` +go run ../../cmd/nabu/main.go release --cfgURL https://provisium.io/data/nabuconfig.yaml --prefix summoned/dataverse --endpoint localoxi ``` \ No newline at end of file diff --git a/docs/httpSPARQL.md b/docs/httpSPARQL.md index 00d57ae..0f597cf 100644 --- a/docs/httpSPARQL.md +++ b/docs/httpSPARQL.md @@ -23,4 +23,15 @@ curl -H 'Accept: application/sparql-results+json' http://coreos.lan:9090/blazegr ```bash curl -H 'Accept: application/sparql-results+json' http://coreos.lan:9090/blazegraph/namespace/iow/sparql --data-urlencode 'query=SELECT (COUNT(DISTINCT ?graph) AS ?namedGraphsCount)(COUNT(*) AS ?triplesCount)WHERE {GRAPH ?graph {?subject ?predicate ?object}}' +``` + + +### Oxigraph + +```bash +curl -i -X PUT -H 'Content-Type:text/x-nquads' --data-binary @veupathdb_release.nq http://localhost:7878/store +``` + +```bash +curl -i -X POST -H 'Content-Type:text/x-nquads' --data-binary @veupathdb_release.nq http://localhost:7878/store ``` \ No newline at end of file diff --git a/docs/images/prefix.d2 b/docs/images/prefix.d2 deleted file mode 100644 index 2318787..0000000 --- a/docs/images/prefix.d2 +++ /dev/null @@ -1,6 +0,0 @@ -direction: right -pkg.Prefix -> objects.ObjectAssembly -> objects.Pipeload -> objects.Insert - -objects.Insert: { -|md sparql-update |} - diff --git a/docs/images/prefix.svg b/docs/images/prefix.svg deleted file mode 100644 index 6b27a76..0000000 --- a/docs/images/prefix.svg +++ /dev/null @@ -1,35 +0,0 @@ - -objectspkgInsertPrefixObjectAssemblyPipeloadsparql-update \ No newline at end of file diff --git a/docs/images/prune.d2 b/docs/images/prune.d2 deleted file mode 100644 index cbeb43b..0000000 --- a/docs/images/prune.d2 +++ /dev/null @@ -1,3 +0,0 @@ -direction: right -pkg.Prune -> prune.Snip -> prune.ObjectList -> prune.graphList - diff --git a/docs/images/prune.svg b/docs/images/prune.svg deleted file mode 100644 index d821a86..0000000 --- a/docs/images/prune.svg +++ /dev/null @@ -1,35 +0,0 @@ - -pkgprunePruntSnipObjectListgraphList \ No newline at end of file diff --git a/docs/images/workflow.d2 b/docs/images/workflow.d2 deleted file mode 100644 index 04b9337..0000000 --- a/docs/images/workflow.d2 +++ /dev/null @@ -1,77 +0,0 @@ -direction: right - - -gi: Get Image(s) { - style.fill: "#e0a3ff" - width: 200 - height: 150 -} - -g: Gleaner Harvest { - style.fill: honeydew - width: 200 - height: 150 -} - -data: Data Graph { - dr: Build Release Graph { - style.fill: "#f4a261" - width: 300 - } - udr: Load Release Graph { - style.fill: "#f4a261" - width: 300 - } - - dp: Prune { - style.fill: "#f4a261" - width: 300 - } -} - -org: Organization Graph { - or: Prefix Load Graph { - style.fill: "#f4a261" - width: 300 - } - - # uor: Load Release Graph { - # style.fill: "#f4a261" - # width: 300 - # } - - # op: Prune { - # style.fill: "#f4a261" - # width: 300 - # } - - # org.or -> org.uor -> org.op - -} - -prov: Provenance Graph { - pr: Build Release Graph { - style.fill: "#f4a261" - width: 300 - } - dpg: Clear Current Graph{ - style.fill: "#f4a261" - width: 300 - } - upr: Load Release Graph { - style.fill: "#f4a261" - width: 300 - } - dpp: Delete Generated Data Graphs { - style.fill: "#f4a261" - width: 300 - } -} - -gi -> g -g -> org.or -g -> data.dr -g -> prov.pr - -data.dr -> data.udr -> data.dp -prov.pr -> prov.dpg -> prov.upr -> prov.dpp diff --git a/docs/images/workflow.svg b/docs/images/workflow.svg deleted file mode 100644 index 5856fb9..0000000 --- a/docs/images/workflow.svg +++ /dev/null @@ -1,102 +0,0 @@ -Get Image(s)Gleaner HarvestData GraphOrganization GraphProvenance GraphBuild Release GraphLoad Release GraphPrunePrefix Load GraphBuild Release GraphClear Current GraphLoad Release GraphDelete Generated Data Graphs - - - diff --git a/internal/graph/toFromRDF.go b/internal/graph/toFromRDF.go index 7eaf259..5231354 100644 --- a/internal/graph/toFromRDF.go +++ b/internal/graph/toFromRDF.go @@ -4,7 +4,7 @@ import ( "bytes" "encoding/json" "fmt" - "log" + log "github.com/sirupsen/logrus" "strings" "github.com/knakk/rdf" diff --git a/internal/objects/pipecopy.go b/internal/objects/pipecopy.go index 3f0eb6a..3d2d702 100644 --- a/internal/objects/pipecopy.go +++ b/internal/objects/pipecopy.go @@ -4,12 +4,14 @@ import ( "bufio" "bytes" "context" + "crypto/sha256" + "encoding/hex" "fmt" + "github.com/spf13/viper" "io" "strings" "sync" - - "github.com/spf13/viper" + "time" "github.com/gleanerio/nabu/internal/graph" log "github.com/sirupsen/logrus" @@ -17,6 +19,29 @@ import ( "github.com/minio/minio-go/v7" ) +func getLastElement(s string) string { + parts := strings.Split(s, "/") + return parts[len(parts)-1] +} + +// GenerateDateHash generates a unique hash based on the current date and time. +func generateDateHash() string { + // Get the current date and time + now := time.Now() + + // Format the date and time as a string + dateString := now.Format("2006-01-02 15:04:05") + + // Create a SHA256 hash + hash := sha256.New() + hash.Write([]byte(dateString)) + + // Convert the hash to a hex string + hashString := hex.EncodeToString(hash.Sum(nil)) + + return hashString +} + // PipeCopy writes a new object based on an prefix, this function assumes the objects are valid when concatenated // v1: viper config object // mc: minio client pointer @@ -26,7 +51,8 @@ import ( // destprefix: destination prefix // sf: boolean to declare if single file or not. If so, skip skolimization since JSON-LD library output is enough func PipeCopy(v1 *viper.Viper, mc *minio.Client, name, bucket, prefix, destprefix string) error { - log.Printf("PipeCopy with name: %s bucket: %s prefix: %s", name, bucket, prefix) + orgname := v1.GetString("implementation_network.orgname") + log.Printf("PipeCopy with name: %s bucket: %s prefix: %s org name: %s", name, bucket, prefix, orgname) pr, pw := io.Pipe() // TeeReader of use? lwg := sync.WaitGroup{} // work group for the pipe writes... @@ -47,10 +73,6 @@ func PipeCopy(v1 *viper.Viper, mc *minio.Client, name, bucket, prefix, destprefi } }(pw) - // Set and use a "single file flag" to bypass skolimaization since if it is a single file - // the JSON-LD to RDF will correctly map blank nodes. - // NOTE: with a background context we can't get the len(channel) so we have to iterate it. - // This is fast, but it means we have to do the ListObjects twice clen := 0 sf := false ctx, cancel := context.WithCancel(context.Background()) @@ -67,27 +89,25 @@ func PipeCopy(v1 *viper.Viper, mc *minio.Client, name, bucket, prefix, destprefi objectCh := mc.ListObjects(context.Background(), bucket, minio.ListObjectsOptions{Prefix: prefix, Recursive: isRecursive}) - // for object := range mc.ListObjects(context.Background(), bucket, minio.ListObjectsOptions{Prefix: prefix, Recursive: isRecursive}, doneCh) { + lastProcessed := false + idList := make([]string, 0) for object := range objectCh { fo, err := mc.GetObject(context.Background(), bucket, object.Key, minio.GetObjectOptions{}) if err != nil { log.Errorf(" failed to read object %s %s ", object.Key, err) - //fmt.Println(err) + continue } log.Tracef(" processing object %s ", object.Key) var b bytes.Buffer bw := bufio.NewWriter(&b) - _, err = io.Copy(bw, fo) if err != nil { log.Errorf(" failed to read object %s %s ", object.Key, err) log.Println(err) + continue } - s := string(b.Bytes()) - nq := "" - //log.Println("Calling JSONLDtoNQ") if strings.HasSuffix(object.Key, ".nq") { nq = s } else { @@ -97,11 +117,9 @@ func PipeCopy(v1 *viper.Viper, mc *minio.Client, name, bucket, prefix, destprefi continue } } - var snq string - if sf { - snq = nq // just pass through the RDF without trying to Skolemize since we ar a single fil + snq = nq } else { snq, err = graph.Skolemization(nq, object.Key) if err != nil { @@ -109,25 +127,52 @@ func PipeCopy(v1 *viper.Viper, mc *minio.Client, name, bucket, prefix, destprefi continue } } - - // 1) get graph URI ctx, err := graph.MakeURN(v1, object.Key) if err != nil { log.Errorf(" failed MakeURN %s %s ", object.Key, err) continue } - // 2) convert NT to NQ csnq, err := graph.NtToNq(snq, ctx) if err != nil { log.Errorf(" failed NtToNq %s %s ", object.Key, err) continue } - _, err = pw.Write([]byte(csnq)) if err != nil { log.Errorf(" failed pipe write %s %s ", object.Key, err) continue } + idList = append(idList, ctx) + lastProcessed = true + } + + // Once we are done with the loop, put in the triples to associate all the graphURIs with the org. + if lastProcessed { + + data := ` . + "GleanerIO Nabu generated catalog" . + "` + time.Now().Format("2006-01-02 15:04:05") + `" . + . + . + . + "` + orgname + `" . + . + "` + getLastElement(prefix) + `" . +` + + for _, item := range idList { + data += ` <` + item + `> .` + "\n" + } + + namedgraph := "urn:gleaner.io:" + orgname + ":" + getLastElement(prefix) + ":datacatalog:" + generateDateHash() + sdata, err := graph.NtToNq(data, namedgraph) + + // Perform the final write to the pipe here + // ilstr := strings.Join(idList, ",") + _, err = pw.Write([]byte(sdata)) + if err != nil { + log.Println(err) + } } }() diff --git a/internal/services/bulk/function.go b/internal/services/bulk/function.go index 955e668..0530a9d 100644 --- a/internal/services/bulk/function.go +++ b/internal/services/bulk/function.go @@ -3,7 +3,6 @@ package bulk import ( "bytes" "errors" - "fmt" "io" "net/http" "strings" @@ -37,7 +36,7 @@ func BulkLoad(v1 *viper.Viper, mc *minio.Client, bucketName string, item string) // check for the required bulk endpoint, no need to move on from here if spql.URL == "" { - return "", errors.New("The configuration file lacks an endpointBulk entry") + return "", errors.New("configuration file lacks an endpointBulk entry") } log.Printf("Object %s:%s for %s with method %s type %s", bucketName, item, ep, md, ct) @@ -53,12 +52,13 @@ func BulkLoad(v1 *viper.Viper, mc *minio.Client, bucketName string, item string) // Review if this graph g should b here since we are loading quads // I don't think it should b. validate with all the tested triple stores //bn := strings.Replace(bucketName, ".", ":", -1) // convert to urn : values, buckets with . are not valid IRIs - g, err := graph.MakeURN(v1, item) + //g, err := graph.MakeURN(v1, item) if err != nil { log.Error("gets3Bytes %v\n", err) return "", err // Assume return. since on this error things are not good? } - url := fmt.Sprintf("%s?graph=%s", ep, g) + //url := fmt.Sprintf("%s?graph=%s", ep, g) // NOTE 11-13-2023 ?graph with nquads fails with Oxigraph + url := ep // testing // check if JSON-LD and convert to RDF if strings.Contains(item, ".jsonld") { @@ -73,8 +73,16 @@ func BulkLoad(v1 *viper.Viper, mc *minio.Client, bucketName string, item string) if err != nil { return "", err } - req.Header.Set("Content-Type", ct) // needs to be x-nquads for blaze, n-quads for jena and graphdb - req.Header.Set("User-Agent", "EarthCube_DataBot/1.0") + + headers := map[string]string{ + "Content-Type": ct, // replace value with actual content + "User-Agent": "EarthCube_DataBot/1.0", + // add other headers here + } + + for k, v := range headers { + req.Header.Add(k, v) + } client := &http.Client{} resp, err := client.Do(req) @@ -87,8 +95,7 @@ func BulkLoad(v1 *viper.Viper, mc *minio.Client, bucketName string, item string) } }(resp.Body) - log.Println(resp) - body, err := io.ReadAll(resp.Body) // return body if you want to debugg test with it + body, err := io.ReadAll(resp.Body) // return body if you want to debug test with it if err != nil { log.Println(string(body)) return string(body), err @@ -96,7 +103,7 @@ func BulkLoad(v1 *viper.Viper, mc *minio.Client, bucketName string, item string) // report log.Println(string(body)) - log.Printf("success: %s : %d : %s\n", item, len(b), ep) + log.Printf("status: %s : %d : %s\n", item, len(b), ep) return string(body), err } diff --git a/internal/services/releases/bulkLoader.go b/internal/services/releases/bulkLoader.go index 2024a54..b315f29 100644 --- a/internal/services/releases/bulkLoader.go +++ b/internal/services/releases/bulkLoader.go @@ -57,6 +57,7 @@ func BulkRelease(v1 *viper.Viper, mc *minio.Client) error { return err } + // TODO Should this be optional / controlled by flag? // Copy the "latest" graph just made to archive with a date // This means the graph in latests is a duplicate of the most recently dated version in archive/{provider} const layout = "2006-01-02-15-04-05" diff --git a/pkg/cli/root.go b/pkg/cli/root.go index 5837db7..6bdb389 100644 --- a/pkg/cli/root.go +++ b/pkg/cli/root.go @@ -18,7 +18,7 @@ import ( "github.com/spf13/viper" ) -var cfgFile, cfgName, cfgPath, nabuConfName string +var cfgFile, cfgURL, cfgName, cfgPath, nabuConfName string var minioVal, portVal, accessVal, secretVal, bucketVal string var sslVal, dangerousVal bool var viperVal *viper.Viper @@ -75,6 +75,7 @@ func init() { // Enpoint Server setting var rootCmd.PersistentFlags().StringVar(&endpointVal, "endpoint", "", "end point server set for the SPARQL endpoints") + rootCmd.PersistentFlags().StringVar(&cfgURL, "cfgURL", "configs", "URL location for config file") rootCmd.PersistentFlags().StringVar(&cfgPath, "cfgPath", "configs", "base location for config files (default is configs/)") rootCmd.PersistentFlags().StringVar(&cfgName, "cfgName", "local", "config file (default is local so configs/local)") rootCmd.PersistentFlags().StringVar(&nabuConfName, "nabuConfName", "nabu", "config file (default is local so configs/local)") @@ -105,6 +106,11 @@ func initConfig() { if err != nil { log.Fatal("cannot read config %s", err) } + } else if cfgURL != "" { + viperVal, err = config.ReadNabuConfigURL(cfgURL) + if err != nil { + log.Fatal("cannot read config URL %s", err) + } } else { // Find home directory. //home, err := os.UserHomeDir() diff --git a/pkg/config/nabuConfig.go b/pkg/config/nabuConfig.go index 675451b..3bc8ad9 100644 --- a/pkg/config/nabuConfig.go +++ b/pkg/config/nabuConfig.go @@ -1,6 +1,13 @@ package config -import "github.com/spf13/viper" +import ( + "fmt" + log "github.com/sirupsen/logrus" + "github.com/spf13/viper" + "io" + "net/http" + "strings" +) var nabuTemplate = map[string]interface{}{ "minio": MinioTemplate, @@ -28,3 +35,54 @@ func ReadNabuConfig(filename string, cfgPath string) (*viper.Viper, error) { err := v.ReadInConfig() return v, err } + +func ReadNabuConfigURL(configURL string) (*viper.Viper, error) { + v := viper.New() + for key, value := range nabuTemplate { + v.SetDefault(key, value) + } + + log.Printf("Reading config from URL: %v\n", configURL) + + resp, err := http.Get(configURL) + if err != nil { + return v, err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return v, fmt.Errorf("HTTP request failed with status code %v", resp.StatusCode) + } + + // Read the content of the config file + configData, err := io.ReadAll(resp.Body) + if err != nil { + return v, err + } + + // Convert configData to a string + configString := string(configData) + + // Convert the string to an io.Reader + reader := strings.NewReader(configString) + + //v.SetConfigName(fileNameWithoutExtTrimSuffix(filename)) + //v.AddConfigPath(cfgPath) + v.SetConfigType("yaml") + //v.BindEnv("headless", "GLEANER_HEADLESS_ENDPOINT") + v.BindEnv("minio.address", "MINIO_ADDRESS") + v.BindEnv("minio.port", "MINIO_PORT") + v.BindEnv("minio.ssl", "MINIO_USE_SSL") + v.BindEnv("minio.accesskey", "MINIO_ACCESS_KEY") + v.BindEnv("minio.secretkey", "MINIO_SECRET_KEY") + v.BindEnv("minio.bucket", "MINIO_BUCKET") + v.AutomaticEnv() + + err = v.ReadConfig(reader) + if err != nil { + fmt.Printf("Error reading config from URL: %v\n", err) + return v, err + } + + return v, err +}