Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

misc: Scala 2.12 final move #870

Closed
wants to merge 22 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ language: scala
dist: trusty
env:
global:
_JAVA_OPTIONS="-Dakka.test.timefactor=3 -XX:MaxMetaspaceSize=256m"
_JAVA_OPTIONS="-Dakka.test.timefactor=3 -XX:MaxMetaspaceSize=512m"
scala:
- 2.11.12
- 2.12.12
jdk:
- openjdk11

Expand All @@ -19,6 +19,8 @@ before_cache:
- find $HOME/.sbt -name "*.lock" | xargs rm
- find $HOME/.ivy2 -name "ivydata-*.properties" | xargs rm

sbt_args: -jvm-opts travis/jvmopts

services:
- cassandra
sudo: required
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ Note: This will have no impact if delete.topic.enable is not set to true.
```

2) `./filodb-dev-stop.sh` and restart filodb instances like above
3) Re-run `./dev-gateway.sh --gen-prom-data`. You can check consumption via running the `TestConsumer`, like this: `java -Xmx4G -Dconfig.file=conf/timeseries-filodb-server.conf -cp standalone/target/scala-2.11/standalone-assembly-0.8-SNAPSHOT.jar filodb.kafka.TestConsumer conf/timeseries-dev-source.conf`. Also, the `memstore_rows_ingested` metric which is logged to `logs/filodb-server-N.log` should become nonzero.
3) Re-run `./dev-gateway.sh --gen-prom-data`. You can check consumption via running the `TestConsumer`, like this: `java -Xmx4G -Dconfig.file=conf/timeseries-filodb-server.conf -cp standalone/target/scala-2.12/standalone-assembly-0.8-SNAPSHOT.jar filodb.kafka.TestConsumer conf/timeseries-dev-source.conf`. Also, the `memstore_rows_ingested` metric which is logged to `logs/filodb-server-N.log` should become nonzero.

To stop the dev server. Note that this will stop all the FiloDB servers if multiple are running.
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import org.scalatest.concurrent.ScalaFutures
import filodb.akkabootstrapper.{AkkaBootstrapper, ClusterMembershipHttpResponse}
import org.scalatest.wordspec.AnyWordSpecLike

import scala.language.postfixOps

trait AkkaBootstrapperMultiNodeConfig extends MultiNodeConfig {

val node1 = role("node1")
Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ publishTo := Some(Resolver.file("Unused repo", file("target/unusedrepo")))
// Global setting across all subprojects
ThisBuild / organization := "org.filodb"
ThisBuild / organizationName := "FiloDB"
ThisBuild / scalaVersion := "2.11.12"
ThisBuild / scalaVersion := "2.12.12"
ThisBuild / publishMavenStyle := true
ThisBuild / Test / publishArtifact := false
ThisBuild / IntegrationTest / publishArtifact := false
Expand Down
2 changes: 2 additions & 0 deletions cli/src/test/scala/filodb/cli/FilodbCliSpec.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package filodb.cli

import filodb.coordinator.{ActorName, ClusterRole, RunnableSpec}
import org.scalatest.Ignore

@Ignore
class FilodbCliSpec extends RunnableSpec {
"A Filodb Cli" must {
"initialize" in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,11 +238,14 @@ final class FilodbCluster(val system: ExtendedActorSystem, overrideConfig: Confi

}

private[filodb] trait KamonInit {
Kamon.init()
}

/** Mixin for easy usage of the FiloDBCluster Extension.
* Used by all `ClusterRole` nodes starting an ActorSystem and FiloDB Cluster nodes.
*/
private[filodb] trait FilodbClusterNode extends NodeConfiguration with StrictLogging {

private[filodb] trait FilodbClusterNode extends KamonInit with NodeConfiguration with StrictLogging {
def role: ClusterRole

/** Override to pass in additional module config. */
Expand All @@ -262,8 +265,6 @@ private[filodb] trait FilodbClusterNode extends NodeConfiguration with StrictLog
ActorSystem(role.systemName, allConfig)
}

Kamon.init()

lazy val cluster = FilodbCluster(system)

implicit lazy val ec = cluster.ec
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ object Utils extends StrictLogging {
case err: ErrorResponse => throw ChildErrorResponse(coordRef, err)
case a: A @unchecked => logger.trace(s"Received $a from $coordRef"); a
}
future.onFailure {
future.failed.foreach {
case e: Exception => logger.warn(s"Error asking $coordRef message $msg", e)
}
Task.fromFuture(future)
Expand All @@ -133,4 +133,4 @@ object Utils extends StrictLogging {
}
scatterGather[A](coordsAndMsgs, parallelism)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ trait SocketChecker {
}
}

trait FilodbClusterNodeSpec extends AbstractSpec with FilodbClusterNode with ScalaFutures {
trait FilodbClusterNodeSpec extends AbstractSpec with ScalaFutures with FilodbClusterNode {
val port = 22552 + util.Random.nextInt(200)

// Ensure that CoordinatedShutdown does not shutdown the whole test JVM, otherwise Travis CI/CD fails
Expand Down
12 changes: 6 additions & 6 deletions core/src/main/scala/filodb.core/query/KeyFilter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,41 +13,41 @@ sealed trait Filter {

object Filter {
final case class Equals(value: Any) extends Filter {
val filterFunc: Any => Boolean = (item: Any) => value.equals(item)
override def filterFunc: Any => Boolean = (item: Any) => value.equals(item)
val operatorString: String = "="
def valuesStrings: Set[Any] = Set(value)
}

final case class In(values: Set[Any]) extends Filter {
val filterFunc: (Any) => Boolean = (item: Any) => values.contains(item)
override def filterFunc: (Any) => Boolean = (item: Any) => values.contains(item)
val operatorString: String = "in"
def valuesStrings: Set[Any] = values
}

final case class And(left: Filter, right: Filter) extends Filter {
private val leftFunc = left.filterFunc
private val rightFunc = right.filterFunc
val filterFunc: (Any) => Boolean = (item: Any) => leftFunc(item) && rightFunc(item)
override def filterFunc: (Any) => Boolean = (item: Any) => leftFunc(item) && rightFunc(item)
val operatorString: String = "&&"
def valuesStrings: Set[Any] = left.valuesStrings.union(right.valuesStrings)
}

final case class NotEquals(value: Any) extends Filter {
val filterFunc: (Any) => Boolean = (item: Any) => !value.equals(item)
override def filterFunc: (Any) => Boolean = (item: Any) => !value.equals(item)
val operatorString: String = "!="
def valuesStrings: Set[Any] = Set(value)
}

final case class EqualsRegex(value: Any) extends Filter {
val pattern = Pattern.compile(value.toString, Pattern.DOTALL)
val filterFunc: (Any) => Boolean = (item: Any) => pattern.matcher(item.toString).matches()
override def filterFunc: (Any) => Boolean = (item: Any) => pattern.matcher(item.toString).matches()
val operatorString: String = "=~"
def valuesStrings: Set[Any] = Set(value)
}

final case class NotEqualsRegex(value: Any) extends Filter {
val pattern = Pattern.compile(value.toString, Pattern.DOTALL)
val filterFunc: (Any) => Boolean = (item: Any) => !pattern.matcher(item.toString).matches()
override def filterFunc: (Any) => Boolean = (item: Any) => !pattern.matcher(item.toString).matches()
val operatorString: String = "!~"
def valuesStrings: Set[Any] = Set(value)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,18 +127,19 @@ class ShardDownsamplerSpec extends AnyFunSpec with Matchers with BeforeAndAfterA
}
}

def toDoubles(s: Seq[Long]): Seq[Double] = s.map(_.toDouble)
// timestamps
val expectedTimestamps = (100000L to 195000L by 5000) ++ Seq(199000L)
downsampledData1.map(_._1) shouldEqual expectedTimestamps
// mins
val expectedMins = Seq(500000d) ++ (505000d to 980000d by 25000)
downsampledData1.map(_._2) shouldEqual expectedMins
val expectedMins = Seq(500000L) ++ (505000L to 980000L by 25000L)
downsampledData1.map(_._2) shouldEqual toDoubles(expectedMins)
// maxes
val expectedMaxes = (100000d to 195000d by 5000).map(_ * 5) ++ Seq(995000d)
downsampledData1.map(_._3) shouldEqual expectedMaxes
val expectedMaxes = (100000L to 195000L by 5000L).map(_ * 5) ++ Seq(995000L)
downsampledData1.map(_._3.toDouble) shouldEqual toDoubles(expectedMaxes)
// sums = (min to max).sum
val expectedSums = expectedMins.zip(expectedMaxes).map { case (min, max) => (min to max by 5000d).sum }
downsampledData1.map(_._4) shouldEqual expectedSums
val expectedSums = expectedMins.zip(expectedMaxes).map { case (min, max) => (min to max by 5000L).sum }
downsampledData1.map(_._4) shouldEqual toDoubles(expectedSums)
// counts
val expectedCounts = Seq(1d) ++ Seq.fill(19)(5d) ++ Seq(4d)
downsampledData1.map(_._5) shouldEqual expectedCounts
Expand Down Expand Up @@ -176,14 +177,16 @@ class ShardDownsamplerSpec extends AnyFunSpec with Matchers with BeforeAndAfterA
val expectedTimestamps2 = (100000L to 195000L by 10000) ++ Seq(199000L)
downsampledData2.map(_._1) shouldEqual expectedTimestamps2
// mins
val expectedMins2 = Seq(500000d) ++ (505000d to 980000d by 50000)
downsampledData2.map(_._2) shouldEqual expectedMins2
// NOTE: Would be better to use BigDecimals, but .sum does not work correctly on
// BigDecimal Range due to Scala bug: https://github.com/scala/scala/pull/7232
val expectedMins2 = Seq(500000L) ++ (505000L to 980000L by 50000L)
downsampledData2.map(_._2) shouldEqual toDoubles(expectedMins2)
// maxes
val expectedMaxes2 = (100000d to 195000d by 10000).map(_ * 5) ++ Seq(995000d)
downsampledData2.map(_._3) shouldEqual expectedMaxes2
val expectedMaxes2 = (100000L to 195000L by 10000L).map(_ * 5) ++ Seq(995000L)
downsampledData2.map(_._3) shouldEqual toDoubles(expectedMaxes2)
// sums = (min to max).sum
val expectedSums2 = expectedMins2.zip(expectedMaxes2).map { case (min, max) => (min to max by 5000d).sum }
downsampledData2.map(_._4) shouldEqual expectedSums2
val expectedSums2 = expectedMins2.zip(expectedMaxes2).map { case (min, max) => (min to max by 5000L).sum }
downsampledData2.map(_._4) shouldEqual toDoubles(expectedSums2)
// counts
val expectedCounts2 = Seq(1d) ++ Seq.fill(9)(10d) ++ Seq(9d)
downsampledData2.map(_._5) shouldEqual expectedCounts2
Expand Down
2 changes: 1 addition & 1 deletion dev-gateway.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@
args=${@:-"conf/timeseries-dev-source.conf"}
java -Dconfig.file=conf/timeseries-filodb-server.conf \
-Dkamon.prometheus.embedded-server.port=9097 \
-cp gateway/target/scala-2.11/gateway-* filodb.gateway.GatewayServer $args &
-cp gateway/target/scala-2.12/gateway-* filodb.gateway.GatewayServer $args &
2 changes: 1 addition & 1 deletion doc/ingestion.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ You can also look at [SourceSinkSuite.scala](../kafka/src/it/scala/filodb/kafka/
### Testing the Consumer

* `sbt standalone/assembly`
* `java -cp standalone/target/scala-2.11/standalone-assembly-0.7.0.jar filodb.kafka.TestConsumer my-kafka-sourceconfig.conf`
* `java -cp standalone/target/scala-2.12/standalone-assembly-0.7.0.jar filodb.kafka.TestConsumer my-kafka-sourceconfig.conf`

See the TestConsumer for more info.

Expand Down
2 changes: 1 addition & 1 deletion filo-cli
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/env bash

# set -x
SCALA_VERSION="2.11"
SCALA_VERSION="2.12"
FILO_VERSION=$(cat version.sbt | sed -e 's/.*"\(.*\)"/\1/g')
CLI_FILE=`pwd`"/cli/target/scala-$SCALA_VERSION/filo-cli-$FILO_VERSION"

Expand Down
4 changes: 2 additions & 2 deletions filodb-dev-start.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ done

cd "$(dirname "$0")"

if [ ! -f standalone/target/scala-2.11/standalone-assembly-*-SNAPSHOT.jar ]; then
if [ ! -f standalone/target/scala-2.12/standalone-assembly-*-SNAPSHOT.jar ]; then
echo "Standalone assembly not found. Building..."
sbt standalone/assembly
fi

echo "Starting FiloDB standalone server..."
java -Xmx4G $PORTS_ARG -Dconfig.file=$CONFIG -DlogSuffix=$LOG_SUFFIX -cp standalone/target/scala-2.11/standalone-assembly-*-SNAPSHOT.jar filodb.standalone.FiloServer &
java -Xmx4G $PORTS_ARG -Dconfig.file=$CONFIG -DlogSuffix=$LOG_SUFFIX -cp standalone/target/scala-2.12/standalone-assembly-*-SNAPSHOT.jar filodb.standalone.FiloServer &
8 changes: 4 additions & 4 deletions http/src/main/scala/filodb/http/PrometheusApiRoute.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ class PrometheusApiRoute(nodeCoord: ActorRef, settings: HttpSettings)(implicit a
// [Range Queries](https://prometheus.io/docs/prometheus/latest/querying/api/#range-queries)
path( "api" / "v1" / "query_range") {
get {
parameter('query.as[String], 'start.as[Double], 'end.as[Double], 'histogramMap.as[Boolean].?,
'step.as[Int], 'explainOnly.as[Boolean].?, 'verbose.as[Boolean].?, 'spread.as[Int].?)
parameter(('query.as[String], 'start.as[Double], 'end.as[Double], 'histogramMap.as[Boolean].?,
'step.as[Int], 'explainOnly.as[Boolean].?, 'verbose.as[Boolean].?, 'spread.as[Int].?))
{ (query, start, end, histMap, step, explainOnly, verbose, spread) =>
val logicalPlan = Parser.queryRangeToLogicalPlan(query, TimeStepParams(start.toLong, step, end.toLong))

Expand All @@ -58,8 +58,8 @@ class PrometheusApiRoute(nodeCoord: ActorRef, settings: HttpSettings)(implicit a
// [Instant Queries](https://prometheus.io/docs/prometheus/latest/querying/api/#instant-queries)
path( "api" / "v1" / "query") {
get {
parameter('query.as[String], 'time.as[Double], 'explainOnly.as[Boolean].?, 'verbose.as[Boolean].?,
'spread.as[Int].?, 'histogramMap.as[Boolean].?, 'step.as[Double].?)
parameter(('query.as[String], 'time.as[Double], 'explainOnly.as[Boolean].?, 'verbose.as[Boolean].?,
'spread.as[Int].?, 'histogramMap.as[Boolean].?, 'step.as[Double].?))
{ (query, time, explainOnly, verbose, spread, histMap, step) =>
val stepLong = step.map(_.toLong).getOrElse(0L)
val logicalPlan = Parser.queryToLogicalPlan(query, time.toLong, stepLong)
Expand Down
4 changes: 2 additions & 2 deletions jmh/src/main/scala/filodb.jmh/QueryAndIngestBenchmark.scala
Original file line number Diff line number Diff line change
Expand Up @@ -163,12 +163,12 @@ class QueryAndIngestBenchmark extends StrictLogging {
def parallelQueries(): Unit = {
val futures = (0 until numQueries).map { n =>
val f = asyncAsk(coordinator, queryCommands(n % queryCommands.length))
f.onSuccess {
f.foreach {
case q: QueryResult2 =>
case e: QError => throw new RuntimeException(s"Query error $e")
}
f
}
Await.result(Future.sequence(futures), 60.seconds)
}
}
}
6 changes: 3 additions & 3 deletions jmh/src/main/scala/filodb.jmh/QueryInMemoryBenchmark.scala
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ class QueryInMemoryBenchmark extends StrictLogging {
val qCmd = queryCommands(n % queryCommands.length)
val time = System.currentTimeMillis
val f = asyncAsk(coordinator, qCmd.copy(qContext = qCmd.qContext.copy(queryId = n.toString, submitTime = time)))
f.onSuccess {
f.foreach {
case q: QueryResult2 => queriesSucceeded += 1
case e: QError => queriesFailed += 1
}
Expand All @@ -182,7 +182,7 @@ class QueryInMemoryBenchmark extends StrictLogging {
val qCmd = queryCommands2(n % queryCommands2.length)
val time = System.currentTimeMillis
val f = asyncAsk(coordinator, qCmd.copy(qContext = qCmd.qContext.copy(queryId = n.toString, submitTime = time)))
f.onSuccess {
f.foreach {
case q: QueryResult2 => queriesSucceeded += 1
case e: QError => queriesFailed += 1
}
Expand Down Expand Up @@ -247,4 +247,4 @@ class QueryInMemoryBenchmark extends StrictLogging {
.countL.runAsync
Await.result(f, 60.seconds)
}
}
}
2 changes: 1 addition & 1 deletion jmh/src/main/scala/filodb.jmh/QueryOnDemandBenchmark.scala
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ class QueryOnDemandBenchmark extends StrictLogging {
def parallelQueries(): Unit = {
val futures = (0 until numQueries).map { n =>
val f = asyncAsk(coordinator, queryCommands(n % queryCommands.length))
f.onSuccess {
f.foreach {
case q: QueryResult2 =>
case e: QError => throw new RuntimeException(s"Query error $e")
}
Expand Down
11 changes: 10 additions & 1 deletion memory/src/main/scala/filodb.memory/format/UnsafeUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,16 @@ import org.agrona.DirectBuffer
import spire.implicits.cforRange
// scalastyle:off number.of.methods
object UnsafeUtils {
val unsafe = scala.concurrent.util.Unsafe.instance
// scalastyle:off
// Copy and translation from: https://github.com/scala/scala/blob/2.13.x/src/library/scala/runtime/Statics.java#L1760
final val unsafe: sun.misc.Unsafe = classOf[sun.misc.Unsafe].getDeclaredFields
.find(_.getType == classOf[sun.misc.Unsafe])
.map { field => {
field.setAccessible(true)
field.get(null).asInstanceOf[sun.misc.Unsafe]
}
} getOrElse (throw new IllegalStateException("Can't find instance of sun.misc.Unsafe"))
// scalastyle:on

// scalastyle:off
val ZeroPointer: Any = null
Expand Down
2 changes: 1 addition & 1 deletion project/FiloBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ object Submodules {
.settings(
commonSettings,
name := "spark-jobs",
fork in Test := true,
fork in Test := false,
baseDirectory in Test := file("."), // since we have a config using FiloDB project root as relative path
assemblySettings,
scalacOptions += "-language:postfixOps",
Expand Down
2 changes: 1 addition & 1 deletion project/FiloSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ object FiloSettings {
config="-Dconfig.file=$FILO_CONFIG_FILE"
fi
: ${FILOLOG:="."}
exec $CMD -Xmx4g -Xms1g -DLOG_DIR=$FILOLOG $config $allprops -jar "$0" "$@" ;
exec $CMD -Xmx2g -Xms1g -DLOG_DIR=$FILOLOG $config $allprops -jar "$0" "$@" ;
""".split("\n")

lazy val kafkaSettings = Seq(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ trait Expressions extends Aggregates with Functions {
if (hasScalarResult(lhs) && hasScalarResult(rhs)) {
val rangeParams = RangeParams(timeParams.start, timeParams.step, timeParams.end)

(lhs, rhs) match {
((lhs, rhs): @unchecked) match {
// 3 + 4
case (lh: ScalarExpression, rh: ScalarExpression) =>
ScalarBinaryOperation(operator.getPlanOperator, Left(lh.toScalar), Left(rh.toScalar), rangeParams)
Expand Down
1 change: 0 additions & 1 deletion query/src/main/scala/filodb/query/PromCirceSupport.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import io.circe.{Decoder, Encoder, HCursor, Json}
import io.circe.syntax._

object PromCirceSupport {
import cats.syntax.either._
// necessary to encode sample in promql response as an array with long and double value as string
// Specific encoders for *Sampl types
implicit val encodeSampl: Encoder[DataSampl] = Encoder.instance {
Expand Down
2 changes: 1 addition & 1 deletion query/src/main/scala/filodb/query/ResultTypes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ trait QueryCommand extends NodeCommand with java.io.Serializable {
def dataset: DatasetRef
}

trait QueryResponse extends NodeResponse with java.io.Serializable {
sealed trait QueryResponse extends NodeResponse with java.io.Serializable {
def id: String
}

Expand Down
Loading