You can create a JAR with the following steps:
- Run
sbt assembly
. This produces a JAR intarget/scala-2.11
directory. - Put this JAR in
$SEAHORSE/jars
, where$SEAHORSE
is the directory with docker-compose.yml or Vagrantfile (depending whether you run Docker or Vagrant). - Restart Seahorse (By either
stop
ping andstart
ingdocker-compose
orhalt
ing andup
ingvagrant
). - Operations are now visible in Seahorse Workflow Editor.
This is a simple example designed to show a minimum set of steps needed to create an operation. You can find full source in IdentityOperation.scala.
First, we need to annotate the operation so that Seahorse knows that it should be registered in the operation catalogue.
@Register
We extend DOperation1To1[DataFrame, DataFrame]
because our operation takes one DataFrame
as input and return one DataFrame
on the output.
final class IdentityOperation
extends DOperation1To1[DataFrame, DataFrame]
Version 4 UUID used to uniquely identify this operation.
After changes in operation are made (e.g. name is changed), id
should not be changed – it is used by Seahorse to
recognize it as the same operation as previously.
override val id: Id = "e9990168-daf7-44c6-8e0c-fbc50456fbec"
Next, we define some information for the user.
override val name: String = "Identity"
override val description: String = "Passes DataFrame along."
Now for the "core logic" of our operation. In our case we simply return the
DataFrame
that was passed as an argument.
override protected def execute(input: DataFrame)(context: ExecutionContext): DataFrame = input
Finally, we declare operation's parameters. Our operations does not have any, so empty Array
is returned.
override def params = Array.empty
This example is slightly more advanced and more practical. We implement RandomSplit
, a simpler version of
ai.deepsense.deeplang.doperations.Split
, which splits DataFrame in two with given ratio. You can find full source
in RandomSplit.scala.
We register our class, as before. This operation takes one DataFrame
and returns two DataFrame
s,
so operation type is DOperation1To2
.
We also extend Params
– a mixin helpful when declaring operation's parameters.
extends DOperation1To2[DataFrame, DataFrame, DataFrame] with Params
After declaring id
, name
and description
, we declare first of user-definable parameters, splitRatio
val splitRatio = NumericParam(
name = "split ratio",
description = "Percentage of rows that should end up in the first output DataFrame.",
It should be a number in [0, 1] interval...
validator = RangeValidator(0.0, 1.0, beginIncluded = true, endIncluded = true))
... and is equal to 0.5 by default. If we did not provide the default, user would get a warning while trying to use this operation.
setDefault(splitRatio, 0.5)
We can also provide standard getters and setters. It is optional, can be useful in tests however.
def setSplitRatio(value: Double): this.type = set(splitRatio, value)
def getSplitRatio: Double = $(splitRatio)
Similarly, we give user ability to provide a seed for a pseudorandom number generator.
val seed = NumericParam(
name = "seed",
description = "Seed for pseudo random number generator",
validator = RangeValidator(Int.MinValue, Int.MaxValue, step = Some(1.0))
)
setDefault(seed, 0.0)
Now, we declare which parameters this operation has, together with order in which they will be shown to user.
splitRatio
is more important, so it goes first.
declareParams
will additionally check that we did not miss any parameter.
override val params = declareParams(splitRatio, seed)
Execution is pretty straightforward. Schema of resulting DataFrames is the same as the input DataFrame.
We split DataFrame
into two DataFrame
s,
using underlying Spark randomSplit
function and wrapping them back in DataFrames.
override protected def execute(
df: DataFrame)(
context: ExecutionContext): (DataFrame, DataFrame) = {
val Array(f1: RDD[Row], f2: RDD[Row]) =
df.sparkDataFrame.rdd.randomSplit(Array(getSplitRatio, 1.0 - getSplitRatio), getSeed)
val schema = df.sparkDataFrame.schema
val dataFrame1 = context.dataFrameBuilder.buildDataFrame(schema, f1)
val dataFrame2 = context.dataFrameBuilder.buildDataFrame(schema, f2)
(dataFrame1, dataFrame2)
}
Finally, we add inference, which is mainly used to deduce the output schema. It can be also used to validate that the input schema is correct.
In our case, we simply say that schemas of both resulting DataFrame
s are the same as schema of input DataFrame
and we return no warnings.
override protected def inferKnowledge(knowledge: DKnowledge[DataFrame])(context: InferContext)
: ((DKnowledge[DataFrame], DKnowledge[DataFrame]), InferenceWarnings) = {
((knowledge, knowledge), InferenceWarnings.empty)
}