Skip to content

Commit

Permalink
Merge pull request #42 from HubertTatar/master
Browse files Browse the repository at this point in the history
Remove org.anarres.lzo:lzo-commons as its GPL-3, drop Compression.LZOP
  • Loading branch information
HubertTatar authored Aug 22, 2023
2 parents b159faf + 111f135 commit 1852081
Show file tree
Hide file tree
Showing 6 changed files with 7 additions and 16 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ lazy val `stream-loader-core` = project
"org.scala-lang" % "scala-reflect" % scalaVersion.value,
"org.apache.kafka" % "kafka-clients" % "3.5.1",
"org.log4s" %% "log4s" % "1.10.0",
"org.anarres.lzo" % "lzo-commons" % "1.0.6",
"org.apache.commons" % "commons-compress" % "1.21",
"org.xerial.snappy" % "snappy-java" % "1.1.10.3",
"org.lz4" % "lz4-java" % "1.8.0",
"com.github.luben" % "zstd-jni" % "1.5.5-5",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,6 @@ object Compression {
override def fileExtension: Option[String] = Some("bz2")
}

case object LZOP extends Compression {
override def fileExtension: Option[String] = Some("lzo")
}

case object SNAPPY extends Compression {
override def fileExtension: Option[String] = Some("snappy")
}
Expand All @@ -55,7 +51,6 @@ object Compression {
case "zstd" => Some(Compression.ZSTD)
case "gzip" => Some(Compression.GZIP)
case "bzip" => Some(Compression.BZIP)
case "lzop" => Some(Compression.LZOP)
case "snappy" => Some(Compression.SNAPPY)
case "lz4" => Some(Compression.LZ4)
case _ => None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import com.adform.streamloader.sink.batch.RecordStreamWriter
import com.adform.streamloader.util.Logging
import com.github.luben.zstd.ZstdOutputStream
import net.jpountz.lz4.LZ4BlockOutputStream
import org.anarres.lzo.{LzoCompressor1x_999, LzopOutputStream}
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream
import org.xerial.snappy.SnappyHadoopCompatibleOutputStream

Expand Down Expand Up @@ -43,7 +42,6 @@ class StreamFileBuilder[-R](
case Compression.ZSTD => new BufferedOutputStream(new ZstdOutputStream(fileStream), bufferSizeBytes)
case Compression.GZIP => new GZIPOutputStream(fileStream, bufferSizeBytes)
case Compression.BZIP => new BZip2CompressorOutputStream(fileStream)
case Compression.LZOP => new LzopOutputStream(fileStream, new LzoCompressor1x_999(9), bufferSizeBytes)
case Compression.SNAPPY => new SnappyHadoopCompatibleOutputStream(fileStream, bufferSizeBytes)
case Compression.LZ4 => new LZ4BlockOutputStream(fileStream, bufferSizeBytes)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class TimePartitioningFilePathFormatterTest extends AnyFunSpec with Matchers wit

it("should format filenames for files with a single record range") {
val formatter =
new TimePartitioningFilePathFormatter[LocalDate](Some("'dt='yyyyMMdd"), Compression.LZOP.fileExtension)
new TimePartitioningFilePathFormatter[LocalDate](Some("'dt='yyyyMMdd"), Compression.LZ4.fileExtension)
val ranges =
StreamRange(
"test-topic",
Expand All @@ -62,14 +62,14 @@ class TimePartitioningFilePathFormatterTest extends AnyFunSpec with Matchers wit
val formatted = formatter.formatPath(LocalDate.parse("2019-04-10"), Seq(ranges))

formatted should startWith("dt=20190410")
formatted should endWith(".lzo")
formatted should endWith(".lz4")

noException should be thrownBy UUID.fromString(formatted.substring(12, formatted.length - 4))
}

it("should format filenames for files with multiple record ranges") {
val formatter =
new TimePartitioningFilePathFormatter[LocalDate](Some("'dt='yyyyMMdd"), Compression.LZOP.fileExtension)
new TimePartitioningFilePathFormatter[LocalDate](Some("'dt='yyyyMMdd"), Compression.LZ4.fileExtension)
val ranges = Seq(
StreamRange(
"test-topic",
Expand All @@ -88,19 +88,19 @@ class TimePartitioningFilePathFormatterTest extends AnyFunSpec with Matchers wit
val formatted = formatter.formatPath(LocalDate.parse("2019-04-10"), ranges)

formatted should startWith("dt=20190410")
formatted should endWith(".lzo")
formatted should endWith(".lz4")

noException should be thrownBy UUID.fromString(formatted.substring(12, formatted.length - 4))
}

it("should format filenames correctly for arbitrary ranges") {
val formatter =
new TimePartitioningFilePathFormatter[LocalDate](Some("'dt='yyyyMMdd"), Compression.LZOP.fileExtension)
new TimePartitioningFilePathFormatter[LocalDate](Some("'dt='yyyyMMdd"), Compression.LZ4.fileExtension)
forAll { batches: Seq[StreamRange] =>
val formatted = formatter.formatPath(LocalDate.parse("2019-04-10"), batches)

formatted should startWith("dt=")
formatted should endWith(".lzo")
formatted should endWith(".lz4")

noException should be thrownBy formatted.substring(3, 11).toLong
noException should be thrownBy UUID.fromString(formatted.substring(12, formatted.length - 4))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ case class ParquetConfig(
case Compression.NONE => CompressionCodecName.UNCOMPRESSED
case Compression.ZSTD => CompressionCodecName.ZSTD
case Compression.GZIP => CompressionCodecName.GZIP
case Compression.LZOP => CompressionCodecName.LZO
case Compression.SNAPPY => CompressionCodecName.SNAPPY
case Compression.LZ4 => CompressionCodecName.LZ4
case _ => throw new UnsupportedOperationException(s"Compression '$compression' is unsupported in parquet")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ trait VerticaFileBuilder[-R] extends FileBuilder[R] {
case Compression.ZSTD => "ZSTD"
case Compression.GZIP => "GZIP"
case Compression.BZIP => "BZIP"
case Compression.LZOP => "LZO"
case _ => throw new UnsupportedOperationException(s"Compression $compression is not supported in Vertica")
}

Expand Down

0 comments on commit 1852081

Please sign in to comment.