Skip to content

Commit

Permalink
Add MainSummaryView option to write schema to a file
Browse files Browse the repository at this point in the history
  • Loading branch information
relud committed May 9, 2018
1 parent c004838 commit 06980a9
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 0 deletions.
11 changes: 11 additions & 0 deletions src/main/scala/com/mozilla/telemetry/utils/package.scala
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package com.mozilla.telemetry

import java.net.URI
import java.util.zip.CRC32

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem

package object utils{
import java.rmi.dgc.VMID
Expand Down Expand Up @@ -140,4 +143,12 @@ package object utils{
crc.update(inString.getBytes)
(crc.getValue % numBlocks).toInt
}

def writeTextFile(path: String, body: String): Unit = {
val file = FileSystem
.get(new URI(path), new Configuration())
.create(new Path(path))
file.write(body.getBytes)
file.close()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import com.mozilla.telemetry.utils.{Addon, Attribution, Events,
import com.mozilla.telemetry.utils.{BooleanUserPref, IntegerUserPref, StringUserPref, UserPref}
import org.json4s.{DefaultFormats, JValue}
import com.mozilla.telemetry.metrics._
import com.mozilla.telemetry.utils.writeTextFile

import scala.util.{Success, Try}

Expand Down Expand Up @@ -132,6 +133,7 @@ object MainSummaryView {
val maxRecordsPerFile = opt[Int]("max-records-per-file", descr = "Max number of rows to write to output files before splitting", required = false, default=Some(500000))
val readMode = choice(Seq("fixed", "aligned"), name="read-mode", descr="Read fixed-sized partitions or a multiple of defaultParallelism partitions", default=Some("fixed"))
val inputPartitionMultiplier = opt[Int]("input-partition-multiplier", descr="Partition multiplier for aligned read-mode", default=Some(4))
val schemaReportLocation = opt[String]("schema-report-location", descr="Write schema.treeString to this file")
verify()
}

Expand Down Expand Up @@ -248,6 +250,10 @@ object MainSummaryView {
// existing data.
partitioned.write.partitionBy("sample_id").mode("overwrite").option("maxRecordsPerFile", maxRecordsPerFile).parquet(s3path)

conf.schemaReportLocation.get match {
case Some(path) => writeTextFile(path, partitioned.schema.treeString)
}

// Then remove the _SUCCESS file so we don't break Spark partition discovery.
S3Store.deleteKey(conf.outputBucket(), s"$s3prefix/_SUCCESS")
}
Expand Down

0 comments on commit 06980a9

Please sign in to comment.