Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dataset creation ERROR DatasetCoordinatorActor: #152

Open
PallaviPersistent opened this issue Apr 10, 2018 · 29 comments
Open

Dataset creation ERROR DatasetCoordinatorActor: #152

PallaviPersistent opened this issue Apr 10, 2018 · 29 comments

Comments

@PallaviPersistent
Copy link

Branch, version, commit

OS and Environment
Red Hat Enterprise Linux Server release 7.2 (Maipo)

JVM version
java -version
java version "1.8.0_161"
Java(TM) SE Runtime Environment (build 1.8.0_161-b12)
Java HotSpot(TM) 64-Bit Server VM (build 25.161-b12, mixed mode)

Scala version
Scala version 2.11.8

Kafka and Cassandra versions and setup
apache-cassandra-3.11.2

Spark version if used
2.0.0

Deployed mode
(client/cluster on Spark Standalone/YARN/Mesos/EMR or default)
Spark Standalone

Actual (wrong) behavior
parDFnew.write.format("filodb.spark").
option("dataset", "parDFNF").
option("row_keys", "appId").
option("partition_keys", "exportMs").
mode(SaveMode.Overwrite).save()

[Stage 2:> (0 + 2) / 2]18/04/10 02:40:47 ERROR DatasetCoordinatorActor: Error in reprojection task (filodb.parDFNF/0)
java.lang.NullPointerException
at org.velvia.filo.vectors.UTF8PtrAppendable.addData(UTF8Vector.scala:270)
at org.velvia.filo.BinaryAppendableVector$class.addVector(BinaryVector.scala:154)
at org.velvia.filo.vectors.ObjectVector.addVector(ObjectVector.scala:16)
at org.velvia.filo.GrowableVector.addData(BinaryVector.scala:251)
at org.velvia.filo.BinaryVectorBuilder.addData(BinaryVector.scala:308)
at org.velvia.filo.VectorBuilderBase$class.add(VectorBuilder.scala:36)
at org.velvia.filo.BinaryVectorBuilder.add(BinaryVector.scala:303)
at org.velvia.filo.RowToVectorBuilder.addRow(RowToVectorBuilder.scala:70)
at filodb.core.store.ChunkSet$$anonfun$2.apply(ChunkSetInfo.scala:53)
at filodb.core.store.ChunkSet$$anonfun$2.apply(ChunkSetInfo.scala:52)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at scala.collection.AbstractIterator.to(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
at filodb.core.store.ChunkSet$.apply(ChunkSetInfo.scala:55)
at filodb.core.store.ChunkSet$.withSkips(ChunkSetInfo.scala:70)
at filodb.core.store.ChunkSetSegment.addChunkSet(Segment.scala:193)
at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2$$anonfun$apply$1.apply$mcV$sp(Reprojector.scala:94)
at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2$$anonfun$apply$1.apply(Reprojector.scala:93)
at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2$$anonfun$apply$1.apply(Reprojector.scala:93)
at kamon.trace.TraceContext$class.withNewSegment(TraceContext.scala:53)
at kamon.trace.MetricsOnlyContext.withNewSegment(MetricsOnlyContext.scala:28)
at filodb.core.Perftools$.subtrace(Perftools.scala:26)
at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2.apply(Reprojector.scala:92)
at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2.apply(Reprojector.scala:79)
at kamon.trace.Tracer$$anonfun$withNewContext$1.apply(TracerModule.scala:62)
at kamon.trace.Tracer$.withContext(TracerModule.scala:53)
at kamon.trace.Tracer$.withNewContext(TracerModule.scala:61)
at kamon.trace.Tracer$.withNewContext(TracerModule.scala:77)
at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1.apply(Reprojector.scala:79)
at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1.apply(Reprojector.scala:78)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at filodb.core.reprojector.DefaultReprojector.toSegments(Reprojector.scala:78)
at filodb.core.reprojector.DefaultReprojector$$anonfun$reproject$1$$anonfun$apply$3.apply(Reprojector.scala:118)
at filodb.core.reprojector.DefaultReprojector$$anonfun$reproject$1$$anonfun$apply$3.apply(Reprojector.scala:117)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at filodb.core.reprojector.DefaultReprojector$$anonfun$reproject$1.apply(Reprojector.scala:117)
at filodb.core.reprojector.DefaultReprojector$$anonfun$reproject$1.apply(Reprojector.scala:117)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
18/04/10 02:40:47 ERROR OneForOneStrategy:
java.lang.NullPointerException
at org.velvia.filo.vectors.UTF8PtrAppendable.addData(UTF8Vector.scala:270)
at org.velvia.filo.BinaryAppendableVector$class.addVector(BinaryVector.scala:154)
at org.velvia.filo.vectors.ObjectVector.addVector(ObjectVector.scala:16)
at org.velvia.filo.GrowableVector.addData(BinaryVector.scala:251)
at org.velvia.filo.BinaryVectorBuilder.addData(BinaryVector.scala:308)
at org.velvia.filo.VectorBuilderBase$class.add(VectorBuilder.scala:36)
at org.velvia.filo.BinaryVectorBuilder.add(BinaryVector.scala:303)
at org.velvia.filo.RowToVectorBuilder.addRow(RowToVectorBuilder.scala:70)
at filodb.core.store.ChunkSet$$anonfun$2.apply(ChunkSetInfo.scala:53)
at filodb.core.store.ChunkSet$$anonfun$2.apply(ChunkSetInfo.scala:52)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at scala.collection.AbstractIterator.to(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
at filodb.core.store.ChunkSet$.apply(ChunkSetInfo.scala:55)
at filodb.core.store.ChunkSet$.withSkips(ChunkSetInfo.scala:70)
at filodb.core.store.ChunkSetSegment.addChunkSet(Segment.scala:193)
at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2$$anonfun$apply$1.apply$mcV$sp(Reprojector.scala:94)
at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2$$anonfun$apply$1.apply(Reprojector.scala:93)
at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2$$anonfun$apply$1.apply(Reprojector.scala:93)
at kamon.trace.TraceContext$class.withNewSegment(TraceContext.scala:53)
at kamon.trace.MetricsOnlyContext.withNewSegment(MetricsOnlyContext.scala:28)
at filodb.core.Perftools$.subtrace(Perftools.scala:26)
at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2.apply(Reprojector.scala:92)
at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2.apply(Reprojector.scala:79)
at kamon.trace.Tracer$$anonfun$withNewContext$1.apply(TracerModule.scala:62)
at kamon.trace.Tracer$.withContext(TracerModule.scala:53)
at kamon.trace.Tracer$.withNewContext(TracerModule.scala:61)
at kamon.trace.Tracer$.withNewContext(TracerModule.scala:77)
at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1.apply(Reprojector.scala:79)
at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1.apply(Reprojector.scala:78)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at filodb.core.reprojector.DefaultReprojector.toSegments(Reprojector.scala:78)
at filodb.core.reprojector.DefaultReprojector$$anonfun$reproject$1$$anonfun$apply$3.apply(Reprojector.scala:118)
at filodb.core.reprojector.DefaultReprojector$$anonfun$reproject$1$$anonfun$apply$3.apply(Reprojector.scala:117)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at filodb.core.reprojector.DefaultReprojector$$anonfun$reproject$1.apply(Reprojector.scala:117)
at filodb.core.reprojector.DefaultReprojector$$anonfun$reproject$1.apply(Reprojector.scala:117)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
18/04/10 02:40:47 WARN NodeCoordinatorActor: Actor Actor[akka://filo-spark/user/coordinator/ds-coord-parDFNF-0#-1168010970] has terminated! Ingestion for (filodb.parDFNF,0) will stop.
18/04/10 02:41:07 WARN RddRowSourceActor: ==> (filodb.parDFNF_0_0_1) No Acks received for last 20 seconds
18/04/10 02:41:07 WARN RddRowSourceActor: ==> (filodb.parDFNF_0_1_0) No Acks received for last 20 seconds

Steps to reproduce
scala> val files=Seq("/root/FiloDB/FiloDB/parquetfile/fragment1522917434336000000.dat","/root/FiloDB/FiloDB/parquetfile/fragment1522917494312000000.dat")
files: Seq[String] = List(/root/FiloDB/FiloDB/parquetfile/fragment1522917434336000000.dat, /root/FiloDB/FiloDB/parquetfile/fragment1522917494312000000.dat)

scala> val parDF=sqlContext.read.parquet(files:_*)
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
18/04/10 02:39:20 WARN General: Plugin (Bundle) "org.datanucleus" is already registered. Ensure you dont have multiple JAR versions of the same plugin in the classpath. The URL "file:/root/FiloDB/spark-2.0.0-bin-hadoop2.7/jars/datanucleus-core-3.2.10.jar" is already registered, and you are trying to register an identical plugin located at URL "file:/root/FiloDB/spark/jars/datanucleus-core-3.2.10.jar."
18/04/10 02:39:20 WARN General: Plugin (Bundle) "org.datanucleus.store.rdbms" is already registered. Ensure you dont have multiple JAR versions of the same plugin in the classpath. The URL "file:/root/FiloDB/spark-2.0.0-bin-hadoop2.7/jars/datanucleus-rdbms-3.2.9.jar" is already registered, and you are trying to register an identical plugin located at URL "file:/root/FiloDB/spark/jars/datanucleus-rdbms-3.2.9.jar."
18/04/10 02:39:20 WARN General: Plugin (Bundle) "org.datanucleus.api.jdo" is already registered. Ensure you dont have multiple JAR versions of the same plugin in the classpath. The URL "file:/root/FiloDB/spark-2.0.0-bin-hadoop2.7/jars/datanucleus-api-jdo-3.2.6.jar" is already registered, and you are trying to register an identical plugin located at URL "file:/root/FiloDB/spark/jars/datanucleus-api-jdo-3.2.6.jar."
parDF: org.apache.spark.sql.DataFrame = [epoch: bigint, rowNum: bigint ... 38 more fields]

scala> val parDFnew=parDF.withColumn("exporterIp", 'exporterIp.cast("String")).withColumn("srcIp", 'srcIp.cast("String")).withColumn("dstIp", 'dstIp.cast("String")).withColumn("nextHopIp", 'nextHopIp.cast("String")).withColumn("bgpNextHopIp", 'bgpNextHopIp.cast("String")).withColumn("appId", 'appId.cast("String")).withColumn("policyQosClassificationHierarchy", 'policyQosClassificationHierarchy.cast("String")).withColumn("protocolId", 'protocolId.cast("Int")).withColumn("srcTos", 'srcTos.cast("Int")).withColumn("dstTos", 'dstTos.cast("Int")).withColumn("srcMask", 'srcMask.cast("Int")).withColumn("dstMask", 'dstMask.cast("Int")).withColumn("direction", 'direction.cast("String")).select('epoch, 'srcIp, 'dstIp, 'exporterIp, 'rowNum, 'exportMs, 'pktSeqNum, 'flowSeqNum, 'protocolId, 'srcTos, 'dstTos, 'srcMask, 'dstMask, 'tcpBits, 'srcPort, 'inIfId, 'inIfEntityId, 'inIfEnabled, 'dstPort, 'outIfId, 'outIfEntityId, 'outIfEnabled, 'direction, 'inOctets, 'outOctets, 'inPackets, 'outPackets, 'nextHopIp, 'bgpSrcAsNum, 'bgpDstAsNum, 'bgpNextHopIp, 'endMs, 'startMs, 'appId, 'appName, 'srcIpGroup, 'dstIpGroup, 'policyQosClassificationHierarchy, 'policyQosQueueId, 'workerId)
parDFnew: org.apache.spark.sql.DataFrame = [epoch: bigint, srcIp: string ... 38 more fields]

parDFnew.write.format("filodb.spark").
option("dataset", "parDFNF").
option("row_keys", "appId").
option("partition_keys", "exportMs").
mode(SaveMode.Overwrite).save()

Logs

some log

or as attached file (see below)

Unused parts of this template should be removed (including this line).

@shiwanshujalan
Copy link

when we are using bulk_write mode as true we are getting null point exception while turning it off resolves it. for reference the rows_key column used is not unique but that's the requirement and we need to ingest all the records without overwriting them which happens when we set bulk_write as false.

@velvia
Copy link
Member

velvia commented May 22, 2018 via email

@shiwanshujalan
Copy link

Here is my use case.

We are trying to insert 18 million records in filoDB using parquet files.
we have around 39 columns in records.
Below is the schema definition for dataset.
scala> parDF.printSchema
root
|-- epoch: long (nullable = true)
|-- rowNum: long (nullable = true)
|-- exportMs: long (nullable = true)
|-- exporterIp: binary (nullable = true)
|-- pktSeqNum: long (nullable = true)
|-- flowSeqNum: integer (nullable = true)
|-- srcIp: binary (nullable = true)
|-- dstIp: binary (nullable = true)
|-- protocolId: short (nullable = true)
|-- srcTos: short (nullable = true)
|-- dstTos: short (nullable = true)
|-- srcMask: byte (nullable = true)
|-- dstMask: byte (nullable = true)
|-- tcpBits: integer (nullable = true)
|-- srcPort: integer (nullable = true)
|-- inIfId: long (nullable = true)
|-- inIfEntityId: long (nullable = true)
|-- inIfEnabled: boolean (nullable = true)
|-- dstPort: integer (nullable = true)
|-- outIfId: long (nullable = true)
|-- outIfEntityId: long (nullable = true)
|-- outIfEnabled: boolean (nullable = true)
|-- direction: byte (nullable = true)
|-- inOctets: long (nullable = true)
|-- outOctets: long (nullable = true)
|-- inPackets: long (nullable = true)
|-- outPackets: long (nullable = true)
|-- nextHopIp: binary (nullable = true)
|-- bgpSrcAsNum: long (nullable = true)
|-- bgpDstAsNum: long (nullable = true)
|-- bgpNextHopIp: binary (nullable = true)
|-- endMs: long (nullable = true)
|-- startMs: long (nullable = true)
|-- appId: binary (nullable = true)
|-- appName: string (nullable = true)
|-- srcIpGroup: string (nullable = true)
|-- dstIpGroup: string (nullable = true)
|-- policyQosClassificationHierarchy: binary (nullable = true)
|-- policyQosQueueId: long (nullable = true)
|-- workerId: integer (nullable = true)

we are converting the non supported datatypes using below command.
scala> val dfResult = parDF.withColumn("exporterIp", inetToString(parDF("exporterIp"))).withColumn("srcIp", inetToString(parDF("srcIp"))).withColumn("dstIp", inetToString(parDF("dstIp"))).withColumn("nextHopIp", inetToString(parDF("nextHopIp"))).withColumn("bgpNextHopIp", inetToString(parDF("bgpNextHopIp"))).withColumn("appId", inetToString(parDF("appId"))).withColumn("policyQosClassificationHierarchy", inetToString(parDF("policyQosClassificationHierarchy"))).withColumn("protocolId", parDF("protocolId").cast(IntegerType)).withColumn("srcTos", parDF("srcTos").cast(IntegerType)).withColumn("dstTos", parDF("dstTos").cast(IntegerType)).withColumn("srcMask", parDF("srcMask").cast(StringType)).withColumn("dstMask", parDF("dstMask").cast(StringType)).withColumn("direction", parDF("direction").cast(StringType)).withColumn("inIfEnabled", parDF("inIfEnabled").cast(StringType)).withColumn("outIfEnabled", parDF("outIfEnabled").cast(StringType))
dfResult: org.apache.spark.sql.DataFrame = [epoch: bigint, rowNum: bigint ... 38 more fields]

after conversion the schema looks like below.
scala> dfResult.printSchema
root
|-- epoch: long (nullable = true)
|-- rowNum: long (nullable = true)
|-- exportMs: long (nullable = true)
|-- exporterIp: string (nullable = true)
|-- pktSeqNum: long (nullable = true)
|-- flowSeqNum: integer (nullable = true)
|-- srcIp: string (nullable = true)
|-- dstIp: string (nullable = true)
|-- protocolId: integer (nullable = true)
|-- srcTos: integer (nullable = true)
|-- dstTos: integer (nullable = true)
|-- srcMask: string (nullable = true)
|-- dstMask: string (nullable = true)
|-- tcpBits: integer (nullable = true)
|-- srcPort: integer (nullable = true)
|-- inIfId: long (nullable = true)
|-- inIfEntityId: long (nullable = true)
|-- inIfEnabled: string (nullable = true)
|-- dstPort: integer (nullable = true)
|-- outIfId: long (nullable = true)
|-- outIfEntityId: long (nullable = true)
|-- outIfEnabled: string (nullable = true)
|-- direction: string (nullable = true)
|-- inOctets: long (nullable = true)
|-- outOctets: long (nullable = true)
|-- inPackets: long (nullable = true)
|-- outPackets: long (nullable = true)
|-- nextHopIp: string (nullable = true)
|-- bgpSrcAsNum: long (nullable = true)
|-- bgpDstAsNum: long (nullable = true)
|-- bgpNextHopIp: string (nullable = true)
|-- endMs: long (nullable = true)
|-- startMs: long (nullable = true)
|-- appId: string (nullable = true)
|-- appName: string (nullable = true)
|-- srcIpGroup: string (nullable = true)
|-- dstIpGroup: string (nullable = true)
|-- policyQosClassificationHierarchy: string (nullable = true)
|-- policyQosQueueId: long (nullable = true)
|-- workerId: integer (nullable = true)

Here is the distribution for all the column values.

+-----+-------+--------+----------+---------+----------+--------+--------+----------+------+------+-------+-------+-------+-------+------+------------+-----------+-------+-------+-------------+------------+---------+--------+---------+---------+----------+---------+-----------+-----------+------------+------+-------+-----+-------+----------+----------+--------------------------------+----------------+--------+
|epoch| rowNum|exportMs|exporterIp|pktSeqNum|flowSeqNum| srcIp| dstIp|protocolId|srcTos|dstTos|srcMask|dstMask|tcpBits|srcPort|inIfId|inIfEntityId|inIfEnabled|dstPort|outIfId|outIfEntityId|outIfEnabled|direction|inOctets|outOctets|inPackets|outPackets|nextHopIp|bgpSrcAsNum|bgpDstAsNum|bgpNextHopIp| endMs|startMs|appId|appName|srcIpGroup|dstIpGroup|policyQosClassificationHierarchy|policyQosQueueId|workerId|
+-----+-------+--------+----------+---------+----------+--------+--------+----------+------+------+-------+-------+-------+-------+------+------------+-----------+-------+-------+-------------+------------+---------+--------+---------+---------+----------+---------+-----------+-----------+------------+------+-------+-----+-------+----------+----------+--------------------------------+----------------+--------+
| 11|2400000| 481| 1| 400200| 30|15721052|15720630| 4| 1| 0| 1| 1| 1| 32767| 5| 5| 1| 7014| 5| 5| 1| 1| 1000000| 0| 9958| 0| 17962532| 1| 1| 0|521452| 625455| 7015| 24159| 0| 0| 0| 0| 1|
+-----+-------+--------+----------+---------+----------+--------+--------+----------+------+------+-------+-------+-------+-------+------+------------+-----------+-------+-------+-------------+------------+---------+--------+---------+---------+----------+---------+-----------+-----------+------------+------+-------+-----+-------+----------+----------+--------------------------------+----------------+--------+

From above distribution you can see that we have only one column("nextHopIp") which has most number of unique keys (near to 18 million).
So in our present use case we are using partition key ->protocolId(total distinct count = 4)
and row_keys -> nextHopIp(total distinct count = 17962532)
Below are the command which are executed to insert the data.

val dfResult1 = dfResult.sort($"protocolId")
dfResult1.write.format("filodb.spark").option("dataset", "flow25").option("row_keys", "nextHopIp").option("partition_keys", "protocolId").mode(SaveMode.Append).save()

wirte command fails with NULLPOINTER exceptions.

Earlier we used below keys partition key -> protocolId(total distinct count = 4)
and row_keys ->exportMs(total distinct count = 481)
This combination didn't throw any error but since the distinct values are very less with row_key hence we cannot use this key as duplicate primary key data is overwritten because of bulk write set as false.
We have also tried to set this value to true but it doesn't work as the total number of records which gets inserted in filoDB are very less (around 3k).
PFB the write command used.
df.write.format("filodb.spark").
option("dataset", "flow25").
option("row_keys", "workerId,inIfEntityId,outIfEntityId").
option("partition_keys", "workerId,exportMs").
option("filodb.reprojector.bulk-write-mode","true").
mode(SaveMode.Append).save()

@velvia
Copy link
Member

velvia commented May 23, 2018

@shiwanshujalan the fix is simple, I'll see if I can get something for you to try.
However, just a warning that we are making major changes under the hood, so the version on master will not really be maintained.

@shiwanshujalan
Copy link

Hi,
I will be much appreciated if we can get some timeline for the fix as this is a complete blocked for our POC.
Currently we have around 3 POCs which are centred around FiloDB.
Also can you tell us when the next version will be released.

Apart from the above mentioned issue i have somehow ingested 18 million records in FiloDB (using small chunks) but the space occupied is very high(around 1.5G) as compared to parquet file used(900 Mb). Do you have any suggestion on how to reduce the space in FiloDB.

Once this data is inserted we will be donig the adhoc aggregation on this data hence we would also appreciate if you could help us with the Data modelling.

Thanks.

@velvia
Copy link
Member

velvia commented May 24, 2018 via email

@shiwanshujalan
Copy link

Hi,
Thanks for providing valuable suggestion for the data compression.
I have changed the compression algo in the filodb-defaults.conf file and currently using DeflateCompression. Now the chunck size has been reduced around 800Mb from 1.5 Gb.
Can you suggest if there is any further compression possible to get it reduced to somewhere around 500 Mb that would be great.

ALso does the fix which you are working will resolve bulk write issue?

Let me explain you the entire use case.

Currently we are ingesting around 18 million records in the form of parquet files and apart from that we are running around 32 aggregations on that data with the time interval of 1 mins , 15 mins, 30 mins ,1 hour and 6 hrs and 24 hrs.
The issue which we are currently facing is data corruption because of the parquet file size.
Hence we are looking for a solution where we can have the size compressed which will serve 2 purpose.

  1. Better storage
  2. Faster reading time.
    Hence we have Zeroed down to FIloDb.

So in typical use case we will be having continuous stream of data comming into the system (roughly around 18 million records/ min) and we will be ingesting that data into FiloDB.
Once the data is inserted then we would be reading the data from FiloDB and carrying out aggregations on the time series data, we would be using exportMs as out range column.
Expectations are to get all the 32 aggregations done withing a min so that user will have a data available through visualization with a mins granularity.

i am attaching the aggregations file for your reference.

vidyut_agg.txt

in the attached Aggragation file pipe seperated columns are the grouping keys on the data and we will be carrying out 4 aggregations i.e. sum, min, max, count on the entire raw data, all the 4 aggregations will be acted on 4 different columns hence in all for each record in the attached aggregation file we are applying 16 computations (4 computations on each column).
Currently we have tried performing aggregations using SPARK while reading data from parquet file on 3 node cluster and getting a performance of around 75 secs.
Each node is having 32 cpu and 64 Gb RAM.

Our prob is as the data grows in parquet files performance starts degrading. Hence we are looking for a faster retrieval time from DB where FIloDB comes into picture.

Few queries which i want to ask are

  1. Is thera any api which FiloDb supports where we can read the filtered data on the basis of time range from FiloDB. Currently from documentations i can see below command which loads the data from FIloDB into Dataframe.
    val df2=spark.read.format("filodb.spark").option("dataset","flow_protocol_cmprss1_nextHopIp_deflate").load()
    My question is this command loads the entire dataset into the dataframe and as the dataset size grows there will be performance degradation hence we are trying to find out a way where we only loads filtered data from FiloDB on the basis of time interval to avoid unwanted i/o.

  2. can you provide some suggestions on the data modelling side on the basis of attached aggregation sheet.

  3. I ingested around 10 million records in FiloDB in one go and it is taking around 4 mins to load the data. Is there any optimization possible to improve ingestion and what are the performance stats for data ingestion in FIloDB.

I am also attaching snapshot of our data for your reference.
data.txt

Thanks.

@velvia
Copy link
Member

velvia commented May 28, 2018

@PallaviPersistent I just pushed a new version of filo-scala 0.3.7. So try modifying the dependency in build.sbt for filo-scala to 0.3.7 and see if that solves the problem - it should fix the NPE at least.

@shiwanshujalan
Copy link

Hi,

Thanks for providing fix for NPE in quick time. I have tested the fix against 18 million records, unfortunately it is still failing with NPE intermitently.
PFB the error logs.
scala> dfResult1.write.format("filodb.spark").option("dataset", "Netflow_protocol_3").option("row_keys", "exportMs,nextHopIp").option("partition_keys", "inIfId").option("filodb.reprojector.bulk-write-mode", "true").mode(SaveMode.Append).save()
[INFO] [05/29/2018 18:10:33.288] [main] [StatsDExtension(akka://kamon)] Starting the Kamon(StatsD) extension
18/05/29 18:10:42 WARN Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
[Stage 4:===========> (5 + 12) / 24]18/05/29 18:12:56 ERROR DatasetCoordinatorActor: Error in reprojection task (filodb.Netflow_protocol_3/0)
java.lang.NullPointerException
at org.velvia.filo.vectors.DictUTF8Vector$.shouldMakeDict(DictUTF8Vector.scala:46)
at org.velvia.filo.vectors.UTF8PtrAppendable.optimizedVector(UTF8Vector.scala:314)
at org.velvia.filo.vectors.UTF8PtrAppendable.suboptimize(UTF8Vector.scala:304)
at org.velvia.filo.vectors.ObjectVector.optimize(ObjectVector.scala:59)
at org.velvia.filo.BinaryVectorBuilder.toFiloBuffer(BinaryVector.scala:341)
at org.velvia.filo.RowToVectorBuilder$$anonfun$2.apply(RowToVectorBuilder.scala:86)
at org.velvia.filo.RowToVectorBuilder$$anonfun$2.apply(RowToVectorBuilder.scala:86)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at org.velvia.filo.RowToVectorBuilder.convertToBytes(RowToVectorBuilder.scala:86)
at filodb.core.store.ChunkSet$.apply(ChunkSetInfo.scala:60)
at filodb.core.store.ChunkSetSegment.addChunkSet(Segment.scala:194)
at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2$$anonfun$apply$1.apply$mcV$sp(Reprojector.scala:94)
at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2$$anonfun$apply$1.apply(Reprojector.scala:93)
at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2$$anonfun$apply$1.apply(Reprojector.scala:93)
at kamon.trace.TraceContext$class.withNewSegment(TraceContext.scala:53)
at kamon.trace.MetricsOnlyContext.withNewSegment(MetricsOnlyContext.scala:28)
at filodb.core.Perftools$.subtrace(Perftools.scala:26)
at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2.apply(Reprojector.scala:92)
at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2.apply(Reprojector.scala:79)
at kamon.trace.Tracer$$anonfun$withNewContext$1.apply(TracerModule.scala:62)
at kamon.trace.Tracer$.withContext(TracerModule.scala:53)
at kamon.trace.Tracer$.withNewContext(TracerModule.scala:61)
at kamon.trace.Tracer$.withNewContext(TracerModule.scala:77)
at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1.apply(Reprojector.scala:79)
at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1.apply(Reprojector.scala:78)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at filodb.core.reprojector.DefaultReprojector.toSegments(Reprojector.scala:78)
at filodb.core.reprojector.DefaultReprojector$$anonfun$reproject$1$$anonfun$apply$3.apply(Reprojector.scala:118)
at filodb.core.reprojector.DefaultReprojector$$anonfun$reproject$1$$anonfun$apply$3.apply(Reprojector.scala:117)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at filodb.core.reprojector.DefaultReprojector$$anonfun$reproject$1.apply(Reprojector.scala:117)
at filodb.core.reprojector.DefaultReprojector$$anonfun$reproject$1.apply(Reprojector.scala:117)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
18/05/29 18:12:56 ERROR OneForOneStrategy:
java.lang.NullPointerException
at org.velvia.filo.vectors.DictUTF8Vector$.shouldMakeDict(DictUTF8Vector.scala:46)
at org.velvia.filo.vectors.UTF8PtrAppendable.optimizedVector(UTF8Vector.scala:314)
at org.velvia.filo.vectors.UTF8PtrAppendable.suboptimize(UTF8Vector.scala:304)
at org.velvia.filo.vectors.ObjectVector.optimize(ObjectVector.scala:59)
at org.velvia.filo.BinaryVectorBuilder.toFiloBuffer(BinaryVector.scala:341)
at org.velvia.filo.RowToVectorBuilder$$anonfun$2.apply(RowToVectorBuilder.scala:86)
at org.velvia.filo.RowToVectorBuilder$$anonfun$2.apply(RowToVectorBuilder.scala:86)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at org.velvia.filo.RowToVectorBuilder.convertToBytes(RowToVectorBuilder.scala:86)
at filodb.core.store.ChunkSet$.apply(ChunkSetInfo.scala:60)
at filodb.core.store.ChunkSetSegment.addChunkSet(Segment.scala:194)
at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2$$anonfun$apply$1.apply$mcV$sp(Reprojector.scala:94)
at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2$$anonfun$apply$1.apply(Reprojector.scala:93)
at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2$$anonfun$apply$1.apply(Reprojector.scala:93)
at kamon.trace.TraceContext$class.withNewSegment(TraceContext.scala:53)
at kamon.trace.MetricsOnlyContext.withNewSegment(MetricsOnlyContext.scala:28)
at filodb.core.Perftools$.subtrace(Perftools.scala:26)
at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2.apply(Reprojector.scala:92)
at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2.apply(Reprojector.scala:79)
at kamon.trace.Tracer$$anonfun$withNewContext$1.apply(TracerModule.scala:62)
at kamon.trace.Tracer$.withContext(TracerModule.scala:53)
at kamon.trace.Tracer$.withNewContext(TracerModule.scala:61)
at kamon.trace.Tracer$.withNewContext(TracerModule.scala:77)
at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1.apply(Reprojector.scala:79)
at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1.apply(Reprojector.scala:78)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at filodb.core.reprojector.DefaultReprojector.toSegments(Reprojector.scala:78)
at filodb.core.reprojector.DefaultReprojector$$anonfun$reproject$1$$anonfun$apply$3.apply(Reprojector.scala:118)
at filodb.core.reprojector.DefaultReprojector$$anonfun$reproject$1$$anonfun$apply$3.apply(Reprojector.scala:117)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at filodb.core.reprojector.DefaultReprojector$$anonfun$reproject$1.apply(Reprojector.scala:117)
at filodb.core.reprojector.DefaultReprojector$$anonfun$reproject$1.apply(Reprojector.scala:117)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
18/05/29 18:12:56 WARN NodeCoordinatorActor: Actor Actor[akka://filo-spark/user/coordinator/ds-coord-Netflow_protocol_3-0#432767698] has terminated! Ingestion for (filodb.Netflow_protocol_3,0) will stop.
18/05/29 18:12:56 ERROR Executor: Exception in task 3.0 in stage 4.0 (TID 1034)
java.lang.RuntimeException: Ingestion actors shut down from ref Actor[akka://filo-spark/user/coordinator#48637466], check error logs
at filodb.spark.package$.ingestRddRows(package.scala:105)
at filodb.spark.FiloContext$$anonfun$insertIntoFilo$extension$1.apply(FiloContext.scala:167)
at filodb.spark.FiloContext$$anonfun$insertIntoFilo$extension$1.apply(FiloContext.scala:162)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:801)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:801)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
18/05/29 18:12:56 WARN TaskSetManager: Lost task 3.0 in stage 4.0 (TID 1034, localhost): java.lang.RuntimeException: Ingestion actors shut down from ref Actor[akka://filo-spark/user/coordinator#48637466], check error logs
at filodb.spark.package$.ingestRddRows(package.scala:105)
at filodb.spark.FiloContext$$anonfun$insertIntoFilo$extension$1.apply(FiloContext.scala:167)
at filodb.spark.FiloContext$$anonfun$insertIntoFilo$extension$1.apply(FiloContext.scala:162)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:801)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:801)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

18/05/29 18:12:56 ERROR TaskSetManager: Task 3 in stage 4.0 failed 1 times; aborting job
18/05/29 18:12:56 WARN TaskSetManager: Lost task 17.0 in stage 4.0 (TID 1048, localhost): TaskKilled (killed intentionally)
org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 4.0 failed 1 times, most recent failure: Lost task 3.0 in stage 4.0 (TID 1034, localhost): java.lang.RuntimeException: Ingestion actors shut down from ref Actor[akka://filo-spark/user/coordinator#48637466], check error logs
at filodb.spark.package$.ingestRddRows(package.scala:105)
at filodb.spark.FiloContext$$anonfun$insertIntoFilo$extension$1.apply(FiloContext.scala:167)
at filodb.spark.FiloContext$$anonfun$insertIntoFilo$extension$1.apply(FiloContext.scala:162)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:801)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:801)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1884)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1897)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1911)
at org.apache.spark.rdd.RDD.count(RDD.scala:1115)
at filodb.spark.FiloContext$.insertIntoFilo$extension(FiloContext.scala:170)
at filodb.spark.FiloContext$.saveAsFilo$extension(FiloContext.scala:119)
at filodb.spark.DefaultSource.createRelation(DefaultSource.scala:63)
at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:429)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:211)
... 48 elided
Caused by: java.lang.RuntimeException: Ingestion actors shut down from ref Actor[akka://filo-spark/user/coordinator#48637466], check error logs
at filodb.spark.package$.ingestRddRows(package.scala:105)
at filodb.spark.FiloContext$$anonfun$insertIntoFilo$extension$1.apply(FiloContext.scala:167)
at filodb.spark.FiloContext$$anonfun$insertIntoFilo$extension$1.apply(FiloContext.scala:162)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:801)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:801)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

Also when i take a look at memory consumption it is taking lot of memory for computation, is it normal behavior with FiloDB?

I tried to use the bulk write option to check whether it appends the duplicate records in FiloDB but it doesn't seems to be working for us.
Can you also provide a solution for this as it will be very useful for us.

Also currently its taking around 7 mins to ingest 18 million records into FiloDB, can you provide some suggestions around this as our expectation are around a min for ingestion.

Appreciate your prompt help on this.

Thanks.

@velvia
Copy link
Member

velvia commented May 29, 2018 via email

@shiwanshujalan
Copy link

Hi,
Yes i have replaced the filo-scala version in build.sbt file and changed it to 0.3.7. then i have recreated the build using below commands
sbt clean
./fil0-cli --command init
sbt spark/assembly.
Also what we are currently observing is that whenever we are ingesting around 2.4 million records/min continuously in filodb we are getting null pointer exception in almost every third run.

For Bulk write option when we are ingesting duplicate data it is still overwriting the existing primary key records hence this option in not working in our case.

The time taken to ingest 18 million is comming around 7 mins, this time is for the round when there were no NPE and entire 18 million records got ingested in one shot.

On Memory front what we are observing is when we are ingesting data thorugh spark shell the first time 18 million records gets ingested but on the same shell if we again try to ingest another 18 million records(all records are duplicate) , it gives us JAVA RUNTIME MEMORY errors . When we exit the spark shell and enter it again and then try to load the data it works fine.

As part of out POC we will be ingesting around 2.4 million reccords/min in FIloDB for continuos 2 days but since we are getting NPE we are stuck with Ingestion.

Can you suggest on the above points and also look at the NPE.

@shiwanshujalan
Copy link

Hi,
I have rebuild the Filo jar using 0.3.7 version in build.sbt.

PFB the snapshot of the jar imported during initialization.

nfo] Including: logback-core-1.0.7.jar
[info] Including: jnr-ffi-2.1.6.jar
[info] Including: jffi-1.2.16-native.jar
[info] Including: filo-scala_2.11-0.3.7.jar
[info] Including: boon-0.33.jar
[info] Including: akka-actor_2.11-2.3.15.jar

As you can see the lates filo-scala jar is created.

But i am getting NPE during ingestion of 18 million records.

I am attaching the entire run logs for your reference.

filodb_run.txt

From it i can confirm that the fix is not working as expected. Could you please check if somthing is missing from my end from the logs.

Also can we connect during your working hours so that i can showcase the entire run and if something needs to tried and tested we can do it right away as it will really expedite our work.

Thanks.

@velvia
Copy link
Member

velvia commented Jun 4, 2018 via email

@PallaviPersistent
Copy link
Author

PallaviPersistent commented Jun 5, 2018

We can try doing that by applying filter on the records and writing the same but the issue is intermittent. Is the new version available on git so that we can use it for our testing.
We work in IST hours.

@shiwanshujalan
Copy link

Hi,

We are in india Time Zone. It will be really helpful if we could connect for one session, We will be able to present the entire scenario since this issue is intermittent and we cannot provide the entire data set.

Thanks.

@velvia
Copy link
Member

velvia commented Jun 5, 2018 via email

@shiwanshujalan
Copy link

9:45 am pst should be fine with us..Thanks for providing time..shall i send you webex link for tomorrow meet?

@velvia
Copy link
Member

velvia commented Jun 6, 2018 via email

@shiwanshujalan
Copy link

Hi,
PFB the meeting invite.
https://persistent.webex.com/persistent/j.php?MTID=m62e37f692b46aa3c4d713281507ec0fa
Password: Y8iHfTZP
I have scheduled the meeting for 9:30 PM IST.

Let me know in case you have any issues while joining meeting.
Thanks.

@velvia
Copy link
Member

velvia commented Jun 6, 2018 via email

@shiwanshujalan
Copy link

Hi, Our 9:30 PM IST is 10 AM PST.

@shiwanshujalan
Copy link

Hi,

We are on the WEBex. Let me know in case you are facing any trouble in joining WEBex.

@velvia
Copy link
Member

velvia commented Jun 6, 2018 via email

@PallaviPersistent
Copy link
Author

Hi Evan,

Thanks for your time on the last webex call. As suggested, we tried to perform the ingestion by doing the below
-> Removing Columns having NULL values
We still are getting NPE.
-> We tried removing string columns and then writing the data and we could write 100 million + rows successfully.

So we suspect the issue to be with data type String.

We have few columns which are originally String and some which are originally Binary, Byte and have been converted to String. We tried ingesting columns originally String and received NPE. Snapshot of the column values
ipv6:67
ipv6:118
ipv6:4703
ipv6:80
ipv6:443
ipv6:21
ipv6:43
ipv6:80
ipv6:1039
ipv6:80
ipv6:3125
ipv6:53
ipv6:5804
ipv6:22
ipv6:22
ipv6:21
ipv6:5884
ipv6:5922
ipv6:1308
ipv6:443
ipv6:68
ipv6:194
ipv6:118
ipv6:118
ipv6:53
ipv6:20
ipv6:67
ipv6:194
ipv6:445
ipv6:443
ipv6:43
ipv6:119
ipv6:118
ipv6:20
ipv6:21
ipv6:4560
ipv6:53
ipv6:80

Also we tried to ingest columns converted to String and it again resulted in NPE. Snapshot of the column values
53.218.50.27
96.35.2.107
186.127.105.182
141.195.32.146
183.85.199.206
84.90.252.182
219.123.171.140
194.65.227.130
115.233.179.117
128.50.50.250

Thanks,
Pallavi

@velvia
Copy link
Member

velvia commented Jun 19, 2018 via email

@PallaviPersistent
Copy link
Author

Hi Evan,

Any luck with the unit test on the values we provided.

Regards,
Pallavi

@velvia
Copy link
Member

velvia commented Jul 12, 2018 via email

@velvia
Copy link
Member

velvia commented Jul 12, 2018 via email

@PallaviPersistent
Copy link
Author

PallaviPersistent commented Jul 16, 2018

Hi Evan,

Yes the issue mostly appears when we try to insert large amounts of data. Also we see this error if we try inserting small amount of data in the same dataset.

PFB the stack trace for NPE

[INFO] [05/29/2018 18:10:33.288] [main] [StatsDExtension(akka://kamon)] Starting the Kamon(StatsD) extension
18/05/29 18:10:42 WARN Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
[Stage 4:===========> (5 + 12) / 24]18/05/29 18:12:56 ERROR DatasetCoordinatorActor: Error in reprojection task (filodb.Netflow_protocol_3/0)
java.lang.NullPointerException
at org.velvia.filo.vectors.DictUTF8Vector$.shouldMakeDict(DictUTF8Vector.scala:46)
at org.velvia.filo.vectors.UTF8PtrAppendable.optimizedVector(UTF8Vector.scala:314)
at org.velvia.filo.vectors.UTF8PtrAppendable.suboptimize(UTF8Vector.scala:304)
at org.velvia.filo.vectors.ObjectVector.optimize(ObjectVector.scala:59)
at org.velvia.filo.BinaryVectorBuilder.toFiloBuffer(BinaryVector.scala:341)
at org.velvia.filo.RowToVectorBuilder$$anonfun$2.apply(RowToVectorBuilder.scala:86)
at org.velvia.filo.RowToVectorBuilder$$anonfun$2.apply(RowToVectorBuilder.scala:86)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at org.velvia.filo.RowToVectorBuilder.convertToBytes(RowToVectorBuilder.scala:86)
at filodb.core.store.ChunkSet$.apply(ChunkSetInfo.scala:60)
at filodb.core.store.ChunkSetSegment.addChunkSet(Segment.scala:194)
at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2$$anonfun$apply$1.apply$mcV$sp(Reprojector.scala:94)
at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2$$anonfun$apply$1.apply(Reprojector.scala:93)
at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2$$anonfun$apply$1.apply(Reprojector.scala:93)
at kamon.trace.TraceContext$class.withNewSegment(TraceContext.scala:53)
at kamon.trace.MetricsOnlyContext.withNewSegment(MetricsOnlyContext.scala:28)
at filodb.core.Perftools$.subtrace(Perftools.scala:26)
at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2.apply(Reprojector.scala:92)
at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2.apply(Reprojector.scala:79)
at kamon.trace.Tracer$$anonfun$withNewContext$1.apply(TracerModule.scala:62)
at kamon.trace.Tracer$.withContext(TracerModule.scala:53)
at kamon.trace.Tracer$.withNewContext(TracerModule.scala:61)
at kamon.trace.Tracer$.withNewContext(TracerModule.scala:77)
at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1.apply(Reprojector.scala:79)
at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1.apply(Reprojector.scala:78)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at filodb.core.reprojector.DefaultReprojector.toSegments(Reprojector.scala:78)
at filodb.core.reprojector.DefaultReprojector$$anonfun$reproject$1$$anonfun$apply$3.apply(Reprojector.scala:118)
at filodb.core.reprojector.DefaultReprojector$$anonfun$reproject$1$$anonfun$apply$3.apply(Reprojector.scala:117)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at filodb.core.reprojector.DefaultReprojector$$anonfun$reproject$1.apply(Reprojector.scala:117)
at filodb.core.reprojector.DefaultReprojector$$anonfun$reproject$1.apply(Reprojector.scala:117)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
18/05/29 18:12:56 ERROR OneForOneStrategy:
java.lang.NullPointerException
at org.velvia.filo.vectors.DictUTF8Vector$.shouldMakeDict(DictUTF8Vector.scala:46)
at org.velvia.filo.vectors.UTF8PtrAppendable.optimizedVector(UTF8Vector.scala:314)
at org.velvia.filo.vectors.UTF8PtrAppendable.suboptimize(UTF8Vector.scala:304)
at org.velvia.filo.vectors.ObjectVector.optimize(ObjectVector.scala:59)
at org.velvia.filo.BinaryVectorBuilder.toFiloBuffer(BinaryVector.scala:341)
at org.velvia.filo.RowToVectorBuilder$$anonfun$2.apply(RowToVectorBuilder.scala:86)
at org.velvia.filo.RowToVectorBuilder$$anonfun$2.apply(RowToVectorBuilder.scala:86)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at org.velvia.filo.RowToVectorBuilder.convertToBytes(RowToVectorBuilder.scala:86)
at filodb.core.store.ChunkSet$.apply(ChunkSetInfo.scala:60)
at filodb.core.store.ChunkSetSegment.addChunkSet(Segment.scala:194)
at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2$$anonfun$apply$1.apply$mcV$sp(Reprojector.scala:94)
at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2$$anonfun$apply$1.apply(Reprojector.scala:93)
at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2$$anonfun$apply$1.apply(Reprojector.scala:93)
at kamon.trace.TraceContext$class.withNewSegment(TraceContext.scala:53)
at kamon.trace.MetricsOnlyContext.withNewSegment(MetricsOnlyContext.scala:28)
at filodb.core.Perftools$.subtrace(Perftools.scala:26)
at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2.apply(Reprojector.scala:92)
at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2.apply(Reprojector.scala:79)
at kamon.trace.Tracer$$anonfun$withNewContext$1.apply(TracerModule.scala:62)
at kamon.trace.Tracer$.withContext(TracerModule.scala:53)
at kamon.trace.Tracer$.withNewContext(TracerModule.scala:61)
at kamon.trace.Tracer$.withNewContext(TracerModule.scala:77)
at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1.apply(Reprojector.scala:79)
at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1.apply(Reprojector.scala:78)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at filodb.core.reprojector.DefaultReprojector.toSegments(Reprojector.scala:78)
at filodb.core.reprojector.DefaultReprojector$$anonfun$reproject$1$$anonfun$apply$3.apply(Reprojector.scala:118)
at filodb.core.reprojector.DefaultReprojector$$anonfun$reproject$1$$anonfun$apply$3.apply(Reprojector.scala:117)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at filodb.core.reprojector.DefaultReprojector$$anonfun$reproject$1.apply(Reprojector.scala:117)
at filodb.core.reprojector.DefaultReprojector$$anonfun$reproject$1.apply(Reprojector.scala:117)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
18/05/29 18:12:56 WARN NodeCoordinatorActor: Actor Actor[akka://filo-spark/user/coordinator/ds-coord-Netflow_protocol_3-0#432767698] has terminated! Ingestion for (filodb.Netflow_protocol_3,0) will stop.
18/05/29 18:12:56 ERROR Executor: Exception in task 3.0 in stage 4.0 (TID 1034)
java.lang.RuntimeException: Ingestion actors shut down from ref Actor[akka://filo-spark/user/coordinator#48637466], check error logs
at filodb.spark.package$.ingestRddRows(package.scala:105)
at filodb.spark.FiloContext$$anonfun$insertIntoFilo$extension$1.apply(FiloContext.scala:167)
at filodb.spark.FiloContext$$anonfun$insertIntoFilo$extension$1.apply(FiloContext.scala:162)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:801)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:801)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
18/05/29 18:12:56 WARN TaskSetManager: Lost task 3.0 in stage 4.0 (TID 1034, localhost): java.lang.RuntimeException: Ingestion actors shut down from ref Actor[akka://filo-spark/user/coordinator#48637466], check error logs
at filodb.spark.package$.ingestRddRows(package.scala:105)
at filodb.spark.FiloContext$$anonfun$insertIntoFilo$extension$1.apply(FiloContext.scala:167)
at filodb.spark.FiloContext$$anonfun$insertIntoFilo$extension$1.apply(FiloContext.scala:162)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:801)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:801)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

18/05/29 18:12:56 ERROR TaskSetManager: Task 3 in stage 4.0 failed 1 times; aborting job
18/05/29 18:12:56 WARN TaskSetManager: Lost task 17.0 in stage 4.0 (TID 1048, localhost): TaskKilled (killed intentionally)
org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 4.0 failed 1 times, most recent failure: Lost task 3.0 in stage 4.0 (TID 1034, localhost): java.lang.RuntimeException: Ingestion actors shut down from ref Actor[akka://filo-spark/user/coordinator#48637466], check error logs
at filodb.spark.package$.ingestRddRows(package.scala:105)
at filodb.spark.FiloContext$$anonfun$insertIntoFilo$extension$1.apply(FiloContext.scala:167)
at filodb.spark.FiloContext$$anonfun$insertIntoFilo$extension$1.apply(FiloContext.scala:162)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:801)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:801)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1884)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1897)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1911)
at org.apache.spark.rdd.RDD.count(RDD.scala:1115)
at filodb.spark.FiloContext$.insertIntoFilo$extension(FiloContext.scala:170)
at filodb.spark.FiloContext$.saveAsFilo$extension(FiloContext.scala:119)
at filodb.spark.DefaultSource.createRelation(DefaultSource.scala:63)
at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:429)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:211)
... 48 elided
Caused by: java.lang.RuntimeException: Ingestion actors shut down from ref Actor[akka://filo-spark/user/coordinator#48637466], check error logs
at filodb.spark.package$.ingestRddRows(package.scala:105)
at filodb.spark.FiloContext$$anonfun$insertIntoFilo$extension$1.apply(FiloContext.scala:167)
at filodb.spark.FiloContext$$anonfun$insertIntoFilo$extension$1.apply(FiloContext.scala:162)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:801)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:801)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

velvia pushed a commit that referenced this issue Oct 1, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants