Skip to content

Commit

Permalink
finally caching of context #15
Browse files Browse the repository at this point in the history
  • Loading branch information
manonthegithub committed Jun 9, 2022
1 parent 35a0d16 commit 54e8531
Show file tree
Hide file tree
Showing 5 changed files with 208 additions and 62 deletions.
39 changes: 13 additions & 26 deletions src/main/scala/org/dbpedia/databus/ApiImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import org.apache.jena.rdf.model.Model
import org.apache.jena.riot.Lang
import org.apache.jena.shared.JenaException
import org.dbpedia.databus.ApiImpl.Config
import org.dbpedia.databus.RdfConversions.{generateGraphId, graphToBytes, jsonLdContextUriString, mapContentType, readModel}
import org.dbpedia.databus.RdfConversions.{contextUri, generateGraphId, graphToBytes, mapContentType, readModel}
import org.dbpedia.databus.swagger.api.DatabusApi
import org.dbpedia.databus.swagger.model.{OperationFailure, OperationSuccess}
import sttp.model.Uri
Expand All @@ -31,7 +31,7 @@ class ApiImpl(config: Config) extends DatabusApi {


override def dataidSubgraph(body: String)(request: HttpServletRequest): Try[String] =
readModel(body.getBytes, defaultLang)
readModel(body.getBytes, defaultLang, contextUri(body.getBytes, defaultLang))
.flatMap(m => Tractate.extract(m.getGraph, TractateV1.Version))
.map(_.stringForSigning)

Expand Down Expand Up @@ -63,18 +63,11 @@ class ApiImpl(config: Config) extends DatabusApi {
.map(_.toLowerCase)
.getOrElse("")
val lang = mapContentType(ct, defaultLang)
readModel(body.getBytes, lang)
val ctxUri = contextUri(body.getBytes, lang)
readModel(body.getBytes, lang, ctxUri)
.flatMap(model => {
saveToVirtuoso(model, graphId)({
val ctxUriString = {
// TODO maybe extract it somehow from reader (custom reader needed)
if (lang.getName == Lang.JSONLD.getName) {
jsonLdContextUriString(body)
} else {
None
}
}
graphToBytes(model.getGraph, defaultLang, ctxUriString)
graphToBytes(model.getGraph, defaultLang, ctxUri)
.flatMap(a => saveFiles(username, Map(
pa -> a
)).map(hash => OperationSuccess(graphId, hash)))
Expand Down Expand Up @@ -124,19 +117,13 @@ class ApiImpl(config: Config) extends DatabusApi {
val lang = getLangFromAcceptHeader(request)
setResponseHeaders(Map("Content-Type" -> lang.getContentType.toHeaderString))(request)
client.readFile(username, p)
.flatMap(body =>
readModel(body, defaultLang)
.flatMap(m => {
val ctxUriString = {
// TODO maybe extract it somehow from reader (custom reader needed)
if (lang.getName == Lang.JSONLD.getName) {
jsonLdContextUriString(new String(body))
} else {
None
}
}
graphToBytes(m.getGraph, lang, ctxUriString)
}))
.flatMap(body => {
val ctxUri = contextUri(body, defaultLang)
readModel(body, defaultLang, ctxUri)
.flatMap(m =>
graphToBytes(m.getGraph, lang, ctxUri)
)
})
.map(new String(_))
}

Expand All @@ -155,7 +142,7 @@ class ApiImpl(config: Config) extends DatabusApi {
}

private[databus] def saveToVirtuoso[T](data: Array[Byte], lang: Lang, graphId: String)(execInTransaction: => Try[T]): Try[T] =
readModel(data, lang)
readModel(data, lang, contextUri(data, lang))
.flatMap(saveToVirtuoso(_, graphId)(execInTransaction))

private[databus] def saveToVirtuoso[T](model: Model, graphId: String)(execInTransaction: => Try[T]): Try[T] = {
Expand Down
67 changes: 67 additions & 0 deletions src/main/scala/org/dbpedia/databus/CachingJsonldContext.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package org.dbpedia.databus

import java.util.concurrent.ConcurrentHashMap

import com.github.jsonldjava.core.{Context, JsonLdOptions}
import org.dbpedia.databus.CachingJsonldContext.ApproxSizeStringKeyCache

import scala.collection.JavaConverters._

class CachingJsonldContext(sizeLimit: Int, opts: JsonLdOptions) extends Context(opts) {

private val cache = new ApproxSizeStringKeyCache[Context](sizeLimit)

override def parse(ctx: Object): Context =
ctx match {
case s: String =>
cache.get(s)
.map(c => super.parse(c))
.getOrElse({
val re = super.parse(ctx)
cache.put(s, re)
re
})
case _ => super.parse(ctx)
}



}

object CachingJsonldContext {

// not the most efficient impl, but should work for now :)
class ApproxSizeStringKeyCache[T](sizeLimit: Int) {
private val cache = new ConcurrentHashMap[StringCacheKey, T](sizeLimit)

def put(s: String, c: T) = {
// not trying to keep the size strictly equal to the limit
cache.put(new StringCacheKey(s), c)
if (cache.size() > sizeLimit) {
keysSorted
.take(cache.size() - sizeLimit)
.foreach(cache.remove)
}
}

def get(s: String): Option[T] =
Option(cache.get(new StringCacheKey(s)))

def keysSorted: Seq[StringCacheKey] =
cache.keySet()
.asScala.toSeq.sorted

}

class StringCacheKey(val str: String, val order: Long = System.nanoTime()) extends Comparable[StringCacheKey] {
override def equals(other: Any): Boolean = other match {
case that: StringCacheKey => that.str == this.str
case _ => false
}

override def hashCode(): Int = str.hashCode

override def compareTo(o: StringCacheKey): Int = this.order.compareTo(o.order)
}

}
109 changes: 77 additions & 32 deletions src/main/scala/org/dbpedia/databus/SparqlClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,18 @@ package org.dbpedia.databus

import java.io.{ByteArrayInputStream, ByteArrayOutputStream}

import com.github.jsonldjava.core.JsonLdConsts
import com.github.jsonldjava.core.{JsonLdConsts, JsonLdOptions}
import com.github.jsonldjava.utils.JsonUtils
import com.mchange.v2.c3p0.ComboPooledDataSource
import org.apache.jena.atlas.json.JsonString
import org.apache.jena.graph.{Graph, Node}
import org.apache.jena.rdf.model.{Model, ModelFactory}
import org.apache.jena.riot.lang.JsonLDReader
import org.apache.jena.riot.system.StreamRDFLib
import org.apache.jena.riot.writer.JsonLDWriter
import org.apache.jena.riot.{Lang, RDFDataMgr, RDFFormat, RDFLanguages, RDFWriter}
import org.apache.jena.riot.{Lang, RDFDataMgr, RDFFormat, RDFLanguages, RDFParserBuilder, RDFWriter, RIOT}
import org.apache.jena.shacl.{ShaclValidator, Shapes, ValidationReport}
import org.apache.jena.sparql.util
import org.dbpedia.databus.ApiImpl.Config
import org.slf4j.LoggerFactory
import sttp.client3.{DigestAuthenticationBackend, HttpURLConnectionBackend, basicRequest}
Expand Down Expand Up @@ -151,15 +154,46 @@ class FusekiJDBCClient(host: String, port: Int, user: String, pass: String, data

object RdfConversions {

private lazy val CachingContext = initCachingContext()

private val DefaultShaclLang = Lang.TTL

def readModel(data: Array[Byte], lang: Lang): Try[Model] = Try {
def readModel(data: Array[Byte], lang: Lang, context: Option[String]): Try[Model] = Try {
val model = ModelFactory.createDefaultModel()
val dataStream = new ByteArrayInputStream(data)
RDFDataMgr.read(model, dataStream, lang)
val dest = StreamRDFLib.graph(model.getGraph)
val parser = RDFParserBuilder.create()
.source(dataStream)
.base(null)
.lang(lang)

context.foreach(cs =>
parser.context(
jenaContext(CachingContext.parse(cs))
)
)

parser.parse(dest)
model
}

def graphToBytes(model: Graph, outputLang: Lang, context: Option[String]): Try[Array[Byte]] = Try {
val str = new ByteArrayOutputStream()
val builder = RDFWriter.create.format(langToFormat(outputLang))
.source(model)

context.foreach(ctx => {
val jctx = jenaContext(CachingContext.parse(ctx))
builder.context(jctx)
builder.set(JsonLDWriter.JSONLD_CONTEXT_SUBSTITUTION, new JsonString(ctx))
})

builder
.build()
.output(str)
str.toByteArray
}

def validateWithShacl(model: Model, shacl: Graph): Try[ValidationReport] =
Try(
ShaclValidator.get()
Expand All @@ -168,29 +202,20 @@ object RdfConversions {

def validateWithShacl(file: Array[Byte], shaclData: Array[Byte], modelLang: Lang): Try[ValidationReport] =
for {
shaclGra <- readModel(shaclData, DefaultShaclLang)
model <- readModel(file, modelLang)
shaclGra <- readModel(shaclData, DefaultShaclLang, contextUri(shaclData, DefaultShaclLang))
ctxUri = contextUri(file, modelLang)
model <- readModel(file, modelLang, ctxUri)
re <- validateWithShacl(model, shaclGra.getGraph)
} yield re

def validateWithShacl(file: Array[Byte], shaclUri: String, modelLang: Lang): Try[ValidationReport] =
for {
shaclGra <- Try(RDFDataMgr.loadGraph(shaclUri))
model <- readModel(file, modelLang)
ctxUri = contextUri(file, modelLang)
model <- readModel(file, modelLang, ctxUri)
re <- validateWithShacl(model, shaclGra)
} yield re

def graphToBytes(model: Graph, outputLang: Lang, context: Option[String]): Try[Array[Byte]] = Try {
val str = new ByteArrayOutputStream()
val builder = RDFWriter.create.format(langToFormat(outputLang))
.source(model)
context.foreach(ctx => builder.set(JsonLDWriter.JSONLD_CONTEXT_SUBSTITUTION, new JsonString(ctx)))
builder
.build()
.output(str)
str.toByteArray
}

def langToFormat(lang: Lang): RDFFormat = lang match {
case RDFLanguages.TURTLE => RDFFormat.TURTLE_PRETTY
case RDFLanguages.TTL => RDFFormat.TTL
Expand All @@ -203,20 +228,6 @@ object RdfConversions {
case RDFLanguages.TRIX => RDFFormat.TRIX
}

def jsonLdContextUriString(data: String): Option[String] = {
val jsonObject = JsonUtils.fromString(new String(data))
Option(
jsonObject
.asInstanceOf[java.util.Map[String, Object]]
.get(JsonLdConsts.CONTEXT)
)
.map(_.toString)
.flatMap(ctx => Uri.parse(ctx) match {
case Left(_) => None
case Right(uri) => Some(uri.toString())
})
}

def mapFilenameToContentType(fn: String): String =
fn.split('.').last match {
case "ttl" => "text/turtle"
Expand Down Expand Up @@ -287,6 +298,40 @@ object RdfConversions {
bld.append(">")
}

// TODO implement extraction of context as an object and then setting it directly
def contextUri(data: Array[Byte], lang: Lang): Option[String] =
if (lang.getName == Lang.JSONLD.getName) jsonLdContextUriString(new String(data)) else None

private def jsonLdContextUriString(data: String): Option[String] = {
val jsonObject = JsonUtils.fromString(new String(data))
Option(
jsonObject
.asInstanceOf[java.util.Map[String, Object]]
.get(JsonLdConsts.CONTEXT)
)
.map(_.toString)
.flatMap(ctx => Uri.parse(ctx) match {
case Left(_) => None
case Right(uri) => Some(uri.toString())
})
}

import com.github.jsonldjava.core.Context

private def initCachingContext() = {
val opts = new JsonLdOptions(null)
opts.useNamespaces = true
new CachingJsonldContext(30, opts)
}

private def jenaContext(jsonLdCtx: Context) = {
val context: util.Context = RIOT.getContext.copy()
jsonLdCtx.putAll(jsonLdCtx.getPrefixes(true))
context.put(JsonLDWriter.JSONLD_CONTEXT, jsonLdCtx)
context.put(JsonLDReader.JSONLD_CONTEXT, jsonLdCtx)
context
}

private def escapeString(s: String) = {
val sb = new StringBuilder(s.length())
val slen = s.length()
Expand Down
50 changes: 50 additions & 0 deletions src/test/scala/org/dbpedia/databus/CacheTests.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package org.dbpedia.databus

import java.util.UUID

import org.dbpedia.databus.CachingJsonldContext.ApproxSizeStringKeyCache
import org.scalatest.{FlatSpec, Matchers}

class CacheTests extends FlatSpec with Matchers {

"CacheKey" should "be sorted by time of creation" in {

val caches =
Seq(
new CachingJsonldContext.StringCacheKey("scsc", 0),
new CachingJsonldContext.StringCacheKey("scsc", -10),
new CachingJsonldContext.StringCacheKey("scsc", 100),
new CachingJsonldContext.StringCacheKey("zzzz", 0),
new CachingJsonldContext.StringCacheKey("aaaa", 0)
)

caches.sorted.map(k => k.order) should contain theSameElementsInOrderAs (Seq(-10, 0, 0, 0, 100))

}

"CacheKey" should "be equal with same string" in {
val re = new CachingJsonldContext.StringCacheKey("scsc", 0) == new CachingJsonldContext.StringCacheKey("scsc", -10)
re should be(true)

val re2 = new CachingJsonldContext.StringCacheKey("scsc", 0) == new CachingJsonldContext.StringCacheKey("aaaa", 0)
re2 should be(false)
}

"ApproxSizeCache" should "not overflow the size" in {
val cache = new ApproxSizeStringKeyCache[Int](10)
val seq = (1 to 100).map(i => (UUID.randomUUID().toString, i))
seq.foreach(p => cache.put(p._1, p._2))

cache.keysSorted.map(_.str) should contain theSameElementsInOrderAs (seq.drop(90).map(_._1))
}

"ApproxSizeCache" should "have same size for same string key" in {
val cache = new ApproxSizeStringKeyCache[Int](10)
val seq = Seq("a", "a", "a")
seq.foreach(p => cache.put(p, UUID.randomUUID().hashCode()))

cache.keysSorted.size should be(1)
}


}
5 changes: 1 addition & 4 deletions src/test/scala/org/dbpedia/databus/DatabusScalatraTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,7 @@ class DatabusScalatraTest extends ScalatraFlatSpec {
}
}

"File read" should "work" in {

val file = "group.jsonld"
val bytes = Files.readAllBytes(Paths.get(getClass.getClassLoader.getResource(file).getFile))
"File read" should "return 500" in {

get("/databus/graph/read?repo=kuckuck&path=pa/not_existing.jsonld") {
status should equal(500)
Expand Down

0 comments on commit 54e8531

Please sign in to comment.