From 24227b654e3d6275c0b459e56e907ea576d0884e Mon Sep 17 00:00:00 2001 From: Chris Lindholm Date: Thu, 23 Mar 2023 16:22:22 -0600 Subject: [PATCH] Remove unsafeRunSync from GroupOperation --- .../src/main/scala/latis/ops/GroupByBin.scala | 14 ++--- .../main/scala/latis/ops/GroupOperation.scala | 57 +++++++------------ 2 files changed, 27 insertions(+), 44 deletions(-) diff --git a/core/src/main/scala/latis/ops/GroupByBin.scala b/core/src/main/scala/latis/ops/GroupByBin.scala index 3e2fc95dc..98ef6a326 100644 --- a/core/src/main/scala/latis/ops/GroupByBin.scala +++ b/core/src/main/scala/latis/ops/GroupByBin.scala @@ -1,7 +1,8 @@ package latis.ops -import scala.collection.mutable -import scala.collection.mutable.ListBuffer +import scala.collection.immutable.SortedMap + +import cats.data.Chain import latis.data._ import latis.model._ @@ -60,12 +61,9 @@ case class GroupByBin( * to start with an entry for each element of the given DomainSet. * Otherwise, there would be no entry for empty bins. */ - override def makeSortedMap(model: DataType): mutable.SortedMap[DomainData, ListBuffer[Sample]] = { - val smap = super.makeSortedMap(model) - domainSet.elements.foreach { dd => - smap += (dd -> ListBuffer[Sample]()) - } - smap + override def makeSortedMap(model: DataType): SortedMap[DomainData, Chain[Sample]] = { + val pairs = domainSet.elements.toList.map(dd => (dd, Chain.empty)) + SortedMap.from(pairs)(ordering(model)) } /** diff --git a/core/src/main/scala/latis/ops/GroupOperation.scala b/core/src/main/scala/latis/ops/GroupOperation.scala index 11f997277..79f2d8b04 100644 --- a/core/src/main/scala/latis/ops/GroupOperation.scala +++ b/core/src/main/scala/latis/ops/GroupOperation.scala @@ -1,10 +1,9 @@ package latis.ops -import scala.collection.mutable -import scala.collection.mutable.ListBuffer +import scala.collection.immutable.SortedMap +import cats.data.Chain import cats.effect.IO -import cats.effect.unsafe.implicits.global import fs2.Pipe import fs2.Stream @@ -14,7 +13,6 @@ import latis.model.Function import latis.util.CartesianDomainOrdering import latis.util.LatisException import latis.util.LatisOrdering -import latis.util.StreamUtils /** * Defines a type of Operation that groups Samples @@ -70,40 +68,27 @@ trait GroupOperation extends StreamOperation { self => * Constructs a SortedMap to use as a temporary data structure * to accumulate samples into groups. */ - def makeSortedMap(model: DataType): mutable.SortedMap[DomainData, ListBuffer[Sample]] = - mutable.SortedMap[DomainData, ListBuffer[Sample]]()(ordering(model)) + def makeSortedMap(model: DataType): SortedMap[DomainData, Chain[Sample]] = + SortedMap.empty[DomainData, Chain[Sample]](ordering(model)) - def pipe(model: DataType): Pipe[IO, Sample, Sample] = { - val sortedMap = makeSortedMap(model) + def pipe(model: DataType): Pipe[IO, Sample, Sample] = in => { + /* + TODO: NNAgg also needs dd + optional arg to aggregation.aggregateFunction? + */ + val aggF = aggregation.aggregateFunction(model) + val grpF = groupByFunction(model) - (stream: Stream[IO, Sample]) => { - stream.map { sample => - // Group the Samples into the sortedMap - groupByFunction(model)(sample) match { //Option[DomainData] - case Some(dd) => sortedMap.get(dd) match { //Option[ListBuffer] - case Some(buffer) => buffer += sample; () - case None => sortedMap += (dd -> ListBuffer(sample)); () - } - case None => //No valid DomainData found so drop Sample - } - }.compile.drain.unsafeRunSync() - /* - TODO: avoid unsafe run - fold the stream? - could GroupOp be a FoldOp? - */ - - val aggF: Iterable[Sample] => Data = - aggregation.aggregateFunction(model) - - // For each buffer, aggregate the Samples to make a new Sample. - StreamUtils.seqToIOStream(sortedMap.toSeq) map { - case (dd, ss) => - Sample(dd, RangeData(aggF(ss))) - /* - TODO: NNAgg also needs dd - optional arg to aggregation.aggregateFunction? - */ + in.fold(makeSortedMap(model)) { case (groups, sample) => + grpF(sample) match { + case Some(dd) => + val ss = groups.getOrElse(dd, Chain.empty) + groups + (dd -> (ss :+ sample)) + case None => groups + } + }.flatMap { groups => + Stream.emits(groups.toSeq).map { + case (dd, ss) => Sample(dd, RangeData(aggF(ss.toList))) } } }