Skip to content

Commit

Permalink
Protobuf Decode Cypher procedure
Browse files Browse the repository at this point in the history
# Description
Adds a procedure for decoding a protobuf value from within Cypher, given
the byte string, a schema, and a type name. Also fixes a few bugs in the
extant Protobuf tests and adds throwable documentation to the protobuf
utilities.

In order to avoid performing a file read on every invocation of
`parseProtobuf` (name chosen to match `parseJson`), the `ProtobufParser`
has been wrapped with a cache, owned by the AppState. This cache will
retain up to 10 schema-type pairs, cycling through them as necessary.
This slightly aids the protobuf construction problem by making it more
explicit, putting the blocking work on the blocking dispatcher.

## Type of change
Delete options that are not relevant, add if necessary
- [x] Bug fix (non-breaking change which fixes an issue)
- [x] New feature (non-breaking change which adds functionality)
- [x] This change requires a documentation update

# How Has This Been Tested?
Describe the tests performed, including instructions to reproduce, and
relevant details for your test configuration
- [x] New unit tests added and pass

# Checklist:
- [x] I have performed a self-review of my code
- [x] I have verified my code doesn't add an implementation for
something that already exists
- [x] I have commented my code, particularly in hard-to-understand areas
- [x] I have made corresponding changes to the documentation
- [x] My changes generate no new warnings
- [x] I have added tests that prove my change is effective and works
- [x] New and existing unit tests pass locally with my changes

GitOrigin-RevId: 1100243f70feb9dba91ac7bdd50ed0b2a6f6eda5
  • Loading branch information
emanb29 authored and thatbot-copy[bot] committed May 2, 2024
1 parent a0685d6 commit be1b85b
Show file tree
Hide file tree
Showing 15 changed files with 299 additions and 73 deletions.
4 changes: 3 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ lazy val `quine`: Project = project
.settings(commonSettings)
.dependsOn(
`quine-core` % "compile->compile;test->test",
`quine-cypher`,
`quine-cypher` % "compile->compile;test->test",
`quine-endpoints`.jvm % "compile->compile;test->test",
`quine-gremlin`,
`quine-cassandra-persistor`,
Expand All @@ -252,6 +252,8 @@ lazy val `quine`: Project = project
"com.github.scopt" %% "scopt" % scoptV,
"com.google.api.grpc" % "proto-google-common-protos" % protobufCommonV,
"com.google.guava" % "guava" % guavaV,
"com.github.ben-manes.caffeine" % "caffeine" % caffeineV,
"com.github.blemale" %% "scaffeine" % scaffeineV,
"com.google.protobuf" % "protobuf-java" % protobufV,
//"commons-io" % "commons-io" % commonsIoV % Test,
"io.circe" %% "circe-config" % "0.10.1",
Expand Down
2 changes: 2 additions & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ object Dependencies {
val awsSdkv1V = "1.12.670"
val betterMonadicForV = "0.3.1"
val bootstrapV = "5.3.3"
val caffeineV = "3.1.8"
val cassandraClientV = "4.18.0"
val catsV = "2.10.0"
val catsEffectV = "3.5.4"
Expand Down Expand Up @@ -54,6 +55,7 @@ object Dependencies {
val reactPlotlyV = "2.5.1"
val reactV = "17.0.2"
val rocksdbV = "8.11.3"
val scaffeineV = "5.2.1"
val scalaCheckV = "1.17.0"
val scalaJava8CompatV = "1.0.2"
val scalaJavaTimeV = "2.5.0"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1448,8 +1448,9 @@ object CypherCreateSetLabels extends UserDefinedProcedure {
}

/** Lookup a standing query by user-facing name, yielding its [[StandingQueryResults]] as they are produced
* Registered by registerUserDefinedProcedure at runtime by product entrypoint and in docs' GenerateCypherTables
* NB despite the name including `wiretap`, this is implemented as an pekko-streams map
* Registered by registerUserDefinedProcedure at runtime by appstate and in docs' GenerateCypherTables
* NB despite the name including `wiretap`, this is implemented as an pekko-streams map, so it will
* backpressure
*/
class CypherStandingWiretap(lookupByName: (String, NamespaceId) => Option[StandingQueryId])
extends UserDefinedProcedure {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import java.nio.file.{Files, Path, Paths, StandardOpenOption}

import org.pegdown.PegDownProcessor

import com.thatdot.quine.app.ingest.serialization.{CypherParseProtobuf, ProtobufParser}
import com.thatdot.quine.compiler.cypher.CypherStandingWiretap
import com.thatdot.quine.graph.cypher.{BuiltinFunc, Func, Proc, UserDefinedFunction, UserDefinedProcedure}

Expand Down Expand Up @@ -133,6 +134,7 @@ object GenerateCypherTables extends App {
builtinFuncsPath -> builtinFunctionTable(Func.builtinFunctions.sortBy(_.name)),
userDefinedFuncsPath -> userDefinedFunctionTable(Func.userDefinedFunctions.values.toList.sortBy(_.name)),
userDefinedProcsPath -> userDefinedProcedureTable(
new CypherParseProtobuf(ProtobufParser.BlockingWithoutCaching) ::
(new CypherStandingWiretap((_, _) => None) ::
Proc.userDefinedProcedures.values.toList).sortBy(_.name)
)
Expand Down
5 changes: 0 additions & 5 deletions quine/src/main/scala/com/thatdot/quine/app/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import pureconfig.error.ConfigReaderException

import com.thatdot.quine.app.config.{PersistenceAgentType, PersistenceBuilder, QuineConfig, WebServerConfig}
import com.thatdot.quine.app.routes.QuineAppRoutes
import com.thatdot.quine.compiler.cypher.{CypherStandingWiretap, registerUserDefinedProcedure}
import com.thatdot.quine.graph._

object Main extends App with LazyLogging {
Expand Down Expand Up @@ -168,10 +167,6 @@ object Main extends App with LazyLogging {
val loadDataFut: Future[Unit] = quineApp.loadAppData(timeout, config.shouldResumeIngest)
Await.result(loadDataFut, timeout.duration * 2)

registerUserDefinedProcedure(
new CypherStandingWiretap((queryName, namespace) => quineApp.getStandingQueryId(queryName, namespace))
)

statusLines.info("Graph is ready")

// The web service is started unless it was disabled.
Expand Down
14 changes: 13 additions & 1 deletion quine/src/main/scala/com/thatdot/quine/app/QuineApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ import com.quine.cypher.phases.{BooleanExpressionRewriter, PredicateLogicRewrite
import com.typesafe.scalalogging.LazyLogging

import com.thatdot.quine.app.ingest.IngestSrcDef
import com.thatdot.quine.app.ingest.serialization.{CypherParseProtobuf, ProtobufParser}
import com.thatdot.quine.app.routes._
import com.thatdot.quine.compiler.cypher
import com.thatdot.quine.compiler.cypher.{CypherStandingWiretap, registerUserDefinedProcedure}
import com.thatdot.quine.graph.InvalidQueryPattern._
import com.thatdot.quine.graph.MasterStream.SqResultsSrcType
import com.thatdot.quine.graph.StandingQueryPattern.{
Expand Down Expand Up @@ -403,6 +405,7 @@ final class QuineApp(graph: GraphService)
}(ec)
}

private[this] val protobufParserCache = new ProtobufParser.LoadingCache(graph.dispatchers)
def addIngestStream(
name: String,
settings: IngestStreamConfiguration,
Expand Down Expand Up @@ -433,7 +436,7 @@ final class QuineApp(graph: GraphService)
intoNamespace,
settings,
valveSwitchMode
)(graph)
)(graph, protobufParserCache)
.leftMap(errs => IngestStreamConfiguration.InvalidStreamConfiguration(errs))
.map { ingestSrcDef =>

Expand Down Expand Up @@ -577,6 +580,15 @@ final class QuineApp(graph: GraphService)
val quickQueriesFut = getOrDefaultGlobalMetaData(QuickQueriesKey, UiNodeQuickQuery.defaults)
val nodeAppearancesFut = getOrDefaultGlobalMetaData(NodeAppearancesKey, UiNodeAppearance.defaults)

// Register all user-defined procedures that require app/graph information (the rest will be loaded
// when the first query is compiled by the [[resolveCalls]] step of the Cypher compilation pipeline)
registerUserDefinedProcedure(
new CypherParseProtobuf(protobufParserCache)
)
registerUserDefinedProcedure(
new CypherStandingWiretap((queryName, namespace) => getStandingQueryId(queryName, namespace))
)

val standingQueryOutputsFut = Future
.sequence(
getNamespaces.map(ns =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,12 +214,14 @@ abstract class RawValuesIngestSrcDef(

object IngestSrcDef extends LazyLogging {

private def importFormatFor(label: StreamedRecordFormat): ImportFormat =
private def importFormatFor(
label: StreamedRecordFormat
)(implicit protobufParserCache: ProtobufParser.Cache): ImportFormat =
label match {
case StreamedRecordFormat.CypherJson(query, parameter) =>
new CypherJsonInputFormat(query, parameter)
case StreamedRecordFormat.CypherProtobuf(query, parameter, schemaUrl, typeName) =>
new ProtobufInputFormat(query, parameter, filenameOrUrl(schemaUrl), typeName)
new ProtobufInputFormat(query, parameter, protobufParserCache.get(filenameOrUrl(schemaUrl), typeName))
case StreamedRecordFormat.CypherRaw(query, parameter) =>
new CypherRawInputFormat(query, parameter)
case StreamedRecordFormat.Drop => new TestOnlyDrop()
Expand Down Expand Up @@ -260,7 +262,8 @@ object IngestSrcDef extends LazyLogging {
settings: IngestStreamConfiguration,
initialSwitchMode: SwitchMode
)(implicit
graph: CypherOpsGraph
graph: CypherOpsGraph,
protobufParserCache: ProtobufParser.Cache
): ValidatedNel[String, IngestSrcDef] = settings match {
case KafkaIngest(
format,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package com.thatdot.quine.app.ingest.serialization

import java.net.URL

import org.apache.pekko.stream.scaladsl.Source
import org.apache.pekko.util.Timeout

import com.typesafe.scalalogging.LazyLogging

import com.thatdot.quine.graph.cypher
import com.thatdot.quine.graph.cypher.{
Expr,
Parameters,
ProcedureExecutionLocation,
QueryContext,
Type,
UserDefinedProcedure,
UserDefinedProcedureSignature,
Value
}
import com.thatdot.quine.model.QuineId
import com.thatdot.quine.util.StringInput.filenameOrUrl

/** Parse a protobuf message into a Cypher map according to a schema provided by a schema cache.
* Because loading the schema is asynchronous, this must be a procedure rather than a function.
*/
class CypherParseProtobuf(private val cache: ProtobufParser.Cache) extends UserDefinedProcedure with LazyLogging {
def name: String = "parseProtobuf"

def canContainUpdates: Boolean = false

def isIdempotent: Boolean = true

def canContainAllNodeScan: Boolean = false

def call(context: QueryContext, arguments: Seq[Value], location: ProcedureExecutionLocation)(implicit
parameters: Parameters,
timeout: Timeout
): Source[Vector[Value], _] = {
val (bytes, schemaUrl, typeName): (Array[Byte], URL, String) = arguments match {
case Seq(Expr.Bytes(bytes, bytesRepresentId), Expr.Str(schemaUrl), Expr.Str(typeName)) =>
if (bytesRepresentId)
logger.info(
s"""Received an ID (${QuineId(bytes).pretty(location.idProvider)}) as a source of
|bytes to parse a protobuf value of type: $typeName.""".stripMargin.replace('\n', ' ')
)
(bytes, filenameOrUrl(schemaUrl), typeName)
case _ =>
throw wrongSignature(arguments)
}
Source.future(cache.getFuture(schemaUrl, typeName)).map { parser =>
Vector(parser.parseBytes(bytes))
}
}

def signature: UserDefinedProcedureSignature = UserDefinedProcedureSignature(
arguments = Seq("bytes" -> Type.Bytes, "schemaUrl" -> Type.Str, "typeName" -> Type.Str),
outputs = Seq("value" -> cypher.Type.Map),
description = "Parses a protobuf message into a Cypher map value"
)
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package com.thatdot.quine.app.ingest.serialization

import java.net.URL

import scala.concurrent.Future
import scala.util.{Failure, Success, Try}

Expand Down Expand Up @@ -131,9 +129,8 @@ class CypherRawInputFormat(query: String, parameter: String) extends CypherImpor

}

class ProtobufInputFormat(query: String, parameter: String, schemaUrl: URL, typeName: String)
class ProtobufInputFormat(query: String, parameter: String, parser: ProtobufParser)
extends CypherImportFormat(query, parameter) {
private val parser = new ProtobufParser(schemaUrl, typeName)

override protected def importBytes(data: Array[Byte]): Try[cypher.Value] = Try(parser.parseBytes(data))
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,35 @@ package com.thatdot.quine.app.ingest.serialization
import java.net.URL

import scala.collection.mutable
import scala.concurrent.duration.DurationInt
import scala.concurrent.{Await, Future}
import scala.jdk.CollectionConverters._

import com.google.protobuf.Descriptors.EnumValueDescriptor
import com.github.blemale.scaffeine.{AsyncLoadingCache, Scaffeine}
import com.google.protobuf.Descriptors.FieldDescriptor.JavaType
import com.google.protobuf.Descriptors.FieldDescriptor.JavaType._
import com.google.protobuf.Descriptors.{DescriptorValidationException, EnumValueDescriptor}
import com.google.protobuf.LegacyDescriptorsUtil.LegacyOneofDescriptor
import com.google.protobuf.{ByteString, Descriptors, DynamicMessage}

import com.thatdot.quine.app.serialization.ProtobufSchema
import com.thatdot.quine.graph.cypher
import com.thatdot.quine.graph.cypher.Expr
import com.thatdot.quine.util.QuineDispatchers

class ProtobufParser(schemaUrl: URL, typeName: String) extends ProtobufSchema(schemaUrl, typeName) {
/** Parses Protobuf messages to cypher values according to a schema.
* @throws java.io.IOException If opening the schema file fails
* @throws DescriptorValidationException If the schema file is invalid
* @throws IllegalArgumentException If the schema file does not contain the specified type
*/
class ProtobufParser private (schemaUrl: URL, typeName: String) extends ProtobufSchema(schemaUrl, typeName) {

def parseBytes(bytes: Array[Byte]): cypher.Value =
def parseBytes(bytes: Array[Byte]): Expr.Map =
protobufMessageToCypher(
DynamicMessage.parseFrom(messageType, bytes)
)

private def protobufMessageToCypher(message: DynamicMessage): cypher.Value = Expr.Map {
private def protobufMessageToCypher(message: DynamicMessage): Expr.Map = Expr.Map {
val descriptor = message.getDescriptorForType
val oneOfs = descriptor.getOneofs.asScala.view
// optionals are modeled as (synthetic) oneOfs of a single field.
Expand Down Expand Up @@ -100,3 +109,40 @@ class ProtobufParser(schemaUrl: URL, typeName: String) extends ProtobufSchema(sc
case MESSAGE => protobufMessageToCypher(value.asInstanceOf[DynamicMessage])
}
}
object ProtobufParser {
trait Cache {
def get(schemaUrl: URL, typeName: String): ProtobufParser
def getFuture(schemaUrl: URL, typeName: String): Future[ProtobufParser]
}
object BlockingWithoutCaching extends Cache {
def get(schemaUrl: URL, typeName: String): ProtobufParser = new ProtobufParser(schemaUrl, typeName)
def getFuture(schemaUrl: URL, typeName: String): Future[ProtobufParser] =
Future.successful(new ProtobufParser(schemaUrl, typeName))
}
class LoadingCache(val dispatchers: QuineDispatchers) extends Cache {
import LoadingCache.CacheKey
private val parserCache: AsyncLoadingCache[CacheKey, ProtobufParser] =
Scaffeine()
.maximumSize(10)
.buildAsyncFuture { case CacheKey(schemaUrl, typeName) =>
Future(new ProtobufParser(schemaUrl, typeName))(dispatchers.blockingDispatcherEC)
}

/** Get a parser for the given schema and type name. This method will block until the parser is available.
* see TODO on [[ProtobufSchema]] about blocking at construction time
*/
@throws[java.io.IOException]("If opening the schema file fails")
@throws[DescriptorValidationException]("If the schema file is invalid")
@throws[IllegalArgumentException]("If the schema file does not contain the specified type")
def get(schemaUrl: URL, typeName: String): ProtobufParser =
Await.result(parserCache.get(CacheKey(schemaUrl, typeName)), 2.seconds)

/** Like [[get]], but doesn't block the calling thread, and failures are returned as a Future.failed
*/
def getFuture(schemaUrl: URL, typeName: String): Future[ProtobufParser] =
parserCache.get(CacheKey(schemaUrl, typeName))
}
object LoadingCache {
private case class CacheKey(schemaUrl: URL, typeName: String)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,13 @@ import scala.util.Using
import com.google.protobuf.DescriptorProtos.{FileDescriptorProto, FileDescriptorSet}
import com.google.protobuf.Descriptors.{Descriptor, FileDescriptor}

/** Provides common utilities for its inheritors to parse protobuf descriptors.
* @see [[com.thatdot.quine.app.ingest.serialization.ProtobufParser]]
* @see [[QuineValueToProtobuf]]
* @throws java.io.IOException If opening the schema file fails
* @throws DescriptorValidationException If the schema file is invalid
* @throws IllegalArgumentException If the schema file does not contain the specified type
*/
abstract class ProtobufSchema(schemaUrl: URL, typeName: String) {
// TODO: All of this IO and validating that the type name exists in the file should probably
// be done in a different mechanism, not blocking the thread at class construction time
Expand All @@ -26,12 +33,13 @@ abstract class ProtobufSchema(schemaUrl: URL, typeName: String) {

private def getMessageTypeNames(file: FileDescriptorProto) = file.getMessageTypeList.asScala.map(_.getName)

protected val messageType: Descriptor = files.filter(file => getMessageTypeNames(file) contains typeName) match {
// Check that our named type was found in one and only one file:
case Seq(file) => resolveFileDescriptor(file).findMessageTypeByName(typeName)
case _ =>
throw new IllegalArgumentException(
s"No message of type '$typeName' found among " + files.flatMap(getMessageTypeNames).mkString("[", ", ", "]")
)
}
final protected val messageType: Descriptor =
files.filter(file => getMessageTypeNames(file) contains typeName) match {
// Check that our named type was found in one and only one file:
case Seq(file) => resolveFileDescriptor(file).findMessageTypeByName(typeName)
case _ =>
throw new IllegalArgumentException(
s"No message of type '$typeName' found among " + files.flatMap(getMessageTypeNames).mkString("[", ", ", "]")
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,21 @@ sealed abstract class ConversionFailure
final case class TypeMismatch(provided: QuineType, expected: JavaType) extends ConversionFailure {
def message: String = s"Can't coerce $provided into $expected"
}
final case class UnexpectedNull(fieldName: String) extends ConversionFailure {
def message: String = s"Unexpected null for field '$fieldName'"
}
case object NotAList extends ConversionFailure
final case class InvalidEnumValue(provided: String, expected: Seq[EnumValueDescriptor]) extends ConversionFailure
final case class FieldError(fieldName: String, conversionFailure: ConversionFailure) extends ConversionFailure {
//override def message: String = s"Error converting field '$fieldName': $conversionFailure"
}
final case class ErrorCollection(errors: NonEmptyChain[ConversionFailure]) extends ConversionFailure

/** Converts QuineValues to Protobuf messages according to a schema.
* @throws java.io.IOException If opening the schema file fails
* @throws DescriptorValidationException If the schema file is invalid
* @throws IllegalArgumentException If the schema file does not contain the specified type
*/
class QuineValueToProtobuf(schemaUrl: URL, typeName: String) extends ProtobufSchema(schemaUrl, typeName) {

/** Mainly for testing
Expand Down Expand Up @@ -64,6 +72,7 @@ class QuineValueToProtobuf(schemaUrl: URL, typeName: String) extends ProtobufSch
.setSeconds(datetime.getSecond)
.setNanos(datetime.getNano)

@throws[IllegalArgumentException]("If the value provided is Null")
def quineValueToProtobuf(field: FieldDescriptor, qv: QuineValue): Either[ConversionFailure, AnyRef] = qv match {
case QuineValue.Str(string) =>
field.getJavaType match {
Expand Down Expand Up @@ -97,8 +106,7 @@ class QuineValueToProtobuf(schemaUrl: URL, typeName: String) extends ProtobufSch
java.lang.Boolean.FALSE,
TypeMismatch(QuineType.Boolean, field.getJavaType)
)
// Just so the linting doesn't complain about missing cases:
case QuineValue.Null => sys.error("This case should not happen because we filter out nulls.")
case QuineValue.Null => Left(UnexpectedNull(field.getName))
case QuineValue.Bytes(bytes) =>
Either.cond(
field.getJavaType == JavaType.BYTE_STRING,
Expand Down
Loading

0 comments on commit be1b85b

Please sign in to comment.