diff --git a/src/main/scala/com/mozilla/telemetry/utils/package.scala b/src/main/scala/com/mozilla/telemetry/utils/package.scala index d1e4855c..ab53632e 100644 --- a/src/main/scala/com/mozilla/telemetry/utils/package.scala +++ b/src/main/scala/com/mozilla/telemetry/utils/package.scala @@ -1,9 +1,12 @@ package com.mozilla.telemetry +import java.net.URI import java.util.zip.CRC32 import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.FileSystem package object utils{ import java.rmi.dgc.VMID @@ -140,4 +143,12 @@ package object utils{ crc.update(inString.getBytes) (crc.getValue % numBlocks).toInt } + + def writeTextFile(path: String, body: String): Unit = { + val file = FileSystem + .get(new URI(path), new Configuration()) + .create(new Path(path)) + file.write(body.getBytes) + file.close() + } } diff --git a/src/main/scala/com/mozilla/telemetry/views/MainSummaryView.scala b/src/main/scala/com/mozilla/telemetry/views/MainSummaryView.scala index 364522ad..d329f11e 100644 --- a/src/main/scala/com/mozilla/telemetry/views/MainSummaryView.scala +++ b/src/main/scala/com/mozilla/telemetry/views/MainSummaryView.scala @@ -15,6 +15,7 @@ import com.mozilla.telemetry.utils.{Addon, Attribution, Events, import com.mozilla.telemetry.utils.{BooleanUserPref, IntegerUserPref, StringUserPref, UserPref} import org.json4s.{DefaultFormats, JValue} import com.mozilla.telemetry.metrics._ +import com.mozilla.telemetry.utils.writeTextFile import scala.util.{Success, Try} @@ -132,6 +133,7 @@ object MainSummaryView { val maxRecordsPerFile = opt[Int]("max-records-per-file", descr = "Max number of rows to write to output files before splitting", required = false, default=Some(500000)) val readMode = choice(Seq("fixed", "aligned"), name="read-mode", descr="Read fixed-sized partitions or a multiple of defaultParallelism partitions", default=Some("fixed")) val inputPartitionMultiplier = opt[Int]("input-partition-multiplier", descr="Partition multiplier for aligned read-mode", default=Some(4)) + val schemaReportLocation = opt[String]("schema-report-location", descr="Write schema.treeString to this file") verify() } @@ -248,6 +250,10 @@ object MainSummaryView { // existing data. partitioned.write.partitionBy("sample_id").mode("overwrite").option("maxRecordsPerFile", maxRecordsPerFile).parquet(s3path) + conf.schemaReportLocation.get match { + case Some(path) => writeTextFile(path, partitioned.schema.treeString) + } + // Then remove the _SUCCESS file so we don't break Spark partition discovery. S3Store.deleteKey(conf.outputBucket(), s"$s3prefix/_SUCCESS") }