Skip to content

Commit

Permalink
Remove unsafeRunSync from GroupOperation
Browse files Browse the repository at this point in the history
  • Loading branch information
lindholc committed Mar 24, 2023
1 parent cd7ba11 commit 24227b6
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 44 deletions.
14 changes: 6 additions & 8 deletions core/src/main/scala/latis/ops/GroupByBin.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package latis.ops

import scala.collection.mutable
import scala.collection.mutable.ListBuffer
import scala.collection.immutable.SortedMap

import cats.data.Chain

import latis.data._
import latis.model._
Expand Down Expand Up @@ -60,12 +61,9 @@ case class GroupByBin(
* to start with an entry for each element of the given DomainSet.
* Otherwise, there would be no entry for empty bins.
*/
override def makeSortedMap(model: DataType): mutable.SortedMap[DomainData, ListBuffer[Sample]] = {
val smap = super.makeSortedMap(model)
domainSet.elements.foreach { dd =>
smap += (dd -> ListBuffer[Sample]())
}
smap
override def makeSortedMap(model: DataType): SortedMap[DomainData, Chain[Sample]] = {
val pairs = domainSet.elements.toList.map(dd => (dd, Chain.empty))
SortedMap.from(pairs)(ordering(model))
}

/**
Expand Down
57 changes: 21 additions & 36 deletions core/src/main/scala/latis/ops/GroupOperation.scala
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package latis.ops

import scala.collection.mutable
import scala.collection.mutable.ListBuffer
import scala.collection.immutable.SortedMap

import cats.data.Chain
import cats.effect.IO
import cats.effect.unsafe.implicits.global
import fs2.Pipe
import fs2.Stream

Expand All @@ -14,7 +13,6 @@ import latis.model.Function
import latis.util.CartesianDomainOrdering
import latis.util.LatisException
import latis.util.LatisOrdering
import latis.util.StreamUtils

/**
* Defines a type of Operation that groups Samples
Expand Down Expand Up @@ -70,40 +68,27 @@ trait GroupOperation extends StreamOperation { self =>
* Constructs a SortedMap to use as a temporary data structure
* to accumulate samples into groups.
*/
def makeSortedMap(model: DataType): mutable.SortedMap[DomainData, ListBuffer[Sample]] =
mutable.SortedMap[DomainData, ListBuffer[Sample]]()(ordering(model))
def makeSortedMap(model: DataType): SortedMap[DomainData, Chain[Sample]] =
SortedMap.empty[DomainData, Chain[Sample]](ordering(model))

def pipe(model: DataType): Pipe[IO, Sample, Sample] = {
val sortedMap = makeSortedMap(model)
def pipe(model: DataType): Pipe[IO, Sample, Sample] = in => {
/*
TODO: NNAgg also needs dd
optional arg to aggregation.aggregateFunction?
*/
val aggF = aggregation.aggregateFunction(model)
val grpF = groupByFunction(model)

(stream: Stream[IO, Sample]) => {
stream.map { sample =>
// Group the Samples into the sortedMap
groupByFunction(model)(sample) match { //Option[DomainData]
case Some(dd) => sortedMap.get(dd) match { //Option[ListBuffer]
case Some(buffer) => buffer += sample; ()
case None => sortedMap += (dd -> ListBuffer(sample)); ()
}
case None => //No valid DomainData found so drop Sample
}
}.compile.drain.unsafeRunSync()
/*
TODO: avoid unsafe run
fold the stream?
could GroupOp be a FoldOp?
*/

val aggF: Iterable[Sample] => Data =
aggregation.aggregateFunction(model)

// For each buffer, aggregate the Samples to make a new Sample.
StreamUtils.seqToIOStream(sortedMap.toSeq) map {
case (dd, ss) =>
Sample(dd, RangeData(aggF(ss)))
/*
TODO: NNAgg also needs dd
optional arg to aggregation.aggregateFunction?
*/
in.fold(makeSortedMap(model)) { case (groups, sample) =>
grpF(sample) match {
case Some(dd) =>
val ss = groups.getOrElse(dd, Chain.empty)
groups + (dd -> (ss :+ sample))
case None => groups
}
}.flatMap { groups =>
Stream.emits(groups.toSeq).map {
case (dd, ss) => Sample(dd, RangeData(aggF(ss.toList)))
}
}
}
Expand Down

0 comments on commit 24227b6

Please sign in to comment.