Skip to content

Commit

Permalink
Merge branch 'develop'
Browse files Browse the repository at this point in the history
  • Loading branch information
kupferk committed Jan 15, 2020
2 parents 4af808d + 6771c96 commit 0f9c501
Show file tree
Hide file tree
Showing 22 changed files with 235 additions and 83 deletions.
10 changes: 9 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
# Version 0.12.0 - 2020-01-07
# Version 0.12.1 -

* Improve support for Swagger Schema
* Fix infinite loop in recursiveSql


# Version 0.12.0 - 2020-01-09

* Add new RecursiveSqlMapping
* Refactor 'describe' method of mappings
* Fix TemplateRelation to return correct partitions and fields
* Add 'filter' attribute to many mappings


# Version 0.11.6 - 2019-12-17

Expand Down
2 changes: 1 addition & 1 deletion docker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
<parent>
<groupId>com.dimajix.flowman</groupId>
<artifactId>flowman-root</artifactId>
<version>0.12.0</version>
<version>0.12.1</version>
<relativePath>..</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion flowman-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
<parent>
<groupId>com.dimajix.flowman</groupId>
<artifactId>flowman-root</artifactId>
<version>0.12.0</version>
<version>0.12.1</version>
<relativePath>..</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,11 @@ extends BaseMapping {
val statement = this.statement

@tailrec
def fix(in:DataFrame) : DataFrame = {
def fix(in:DataFrame, inCount:Long) : DataFrame = {
val result = nextDf(statement, in)
if (result.count() != in.count())
fix(result)
val resultCount = result.count()
if (resultCount != inCount)
fix(result, resultCount)
else
result
}
Expand All @@ -87,7 +88,7 @@ extends BaseMapping {
input.foreach(kv => kv._2.createOrReplaceTempView(kv._1.name))
// Execute query
val first = firstDf(executor.spark, statement)
val result = fix(first)
val result = fix(first, first.count())
// Call SessionCatalog.dropTempView to avoid unpersisting the possibly cached dataset.
input.foreach(kv => executor.spark.sessionState.catalog.dropTempView(kv._1.name))

Expand All @@ -99,7 +100,7 @@ extends BaseMapping {
def findUnion(plan:LogicalPlan) : LogicalPlan = {
plan match {
case union:Union => union
case node:UnaryNode =>findUnion(node)
case node:UnaryNode =>findUnion(node.child)
}
}

Expand All @@ -112,7 +113,7 @@ extends BaseMapping {
private def nextDf(statement:String, prev:DataFrame) : DataFrame = {
val spark = prev.sparkSession
prev.createOrReplaceTempView("__this__")
val result = spark.sql(statement).localCheckpoint()
val result = spark.sql(statement).localCheckpoint(false)
spark.sessionState.catalog.dropTempView("__this__")
result
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import org.slf4j.LoggerFactory

import com.dimajix.flowman.execution.Context
import com.dimajix.flowman.hadoop.File
import com.dimajix.flowman.spec.Instance
import com.dimajix.flowman.spec.schema.ExternalSchema.CachedSchema
import com.dimajix.flowman.types.ArrayType
import com.dimajix.flowman.types.BooleanType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.dimajix.flowman.types

import scala.annotation.tailrec
import scala.collection.JavaConverters._

import com.fasterxml.jackson.databind.JsonNode
Expand Down Expand Up @@ -114,7 +115,7 @@ object SwaggerSchemaUtils {

// Fix nested "allOf" nodes, which have to be in "definitions->[Entity]->[Definition]"
val definitions = rootNode.path("definitions")
val entities = definitions.elements().asScala.flatMap(_.elements().asScala).toSeq
val entities = definitions.elements().asScala.toSeq
entities.foreach(replaceAllOf)
entities.foreach(fixRequired)

Expand All @@ -130,21 +131,28 @@ object SwaggerSchemaUtils {
*/
private def replaceAllOf(jsonNode: JsonNode) : Unit = {
jsonNode match {
case obj:ObjectNode =>
if (obj.get("allOf") != null) {
val children = obj.get("allOf").elements().asScala.toSeq
val required = children.flatMap(c => Option(c.get("required")).toSeq.flatMap(_.elements().asScala))
val properties = children.flatMap(c => Option(c.get("properties")).toSeq.flatMap(_.fields().asScala))
val desc = children.flatMap(c => Option(c.get("description"))).headOption
obj.without("allOf")
obj.set("type", TextNode.valueOf("object"))
obj.withArray("required").addAll(required.asJava)
properties.foreach(x => obj.`with`("properties").set(x.getKey, x.getValue):AnyRef)
desc.foreach(d => obj.set("description", d))
}
case _:JsonNode =>
case obj: ObjectNode if obj.get("allOf") != null =>
val children = obj.get("allOf").elements().asScala.toSeq
children.foreach(replaceAllOf)

val properties = children.flatMap(c => Option(c.get("properties")).toSeq
.flatMap(_.fields().asScala))
val required = children.flatMap(c => Option(c.get("required")).toSeq.flatMap(_.elements().asScala))
val desc = children.flatMap(c => Option(c.get("description"))).headOption
obj.without("allOf")
obj.set("type", TextNode.valueOf("object"))
obj.withArray("required").addAll(required.asJava)
properties.foreach(x => obj.`with`("properties").set(x.getKey, x.getValue): AnyRef)
desc.foreach(d => obj.set("description", d))

case obj: ObjectNode if obj.get("items") != null =>
replaceAllOf(obj.get("items"))

case obj: ObjectNode if obj.get("properties") != null =>
obj.get("properties").elements().asScala.foreach(replaceAllOf)

case _: JsonNode =>
}
jsonNode.elements().asScala.foreach(replaceAllOf)
}

private def fixRequired(jsonNode: JsonNode) : Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,4 +99,52 @@ class RecursiveSqlMappingTest extends FlatSpec with Matchers with LocalSparkSess
))
))
}

it should "support UNION DISTINCT" in {
val spark = this.spark
import spark.implicits._

val session = Session.builder().withSparkSession(spark).build()
val context = session.context
val executor = session.executor

val mapping = RecursiveSqlMapping(
Mapping.Properties(context),
"""
|SELECT
| 0 AS n,
| 1 AS fact
|
|UNION DISTINCT
|
|SELECT
| n+1 AS n,
| (n+1)*fact AS fact
|FROM __this__
|WHERE n < 6
|""".stripMargin,
null,
null
)

val resultDf = mapping.execute(executor, Map())("main")
val resultRecords = resultDf.orderBy("n").as[(Int,Int)].collect()
resultRecords should be (Array(
(0,1),
(1,1),
(2,2),
(3,6),
(4,24),
(5,120),
(6,720)
))

val resultSchema = mapping.describe(executor, Map())
resultSchema should be (Map(
"main" -> StructType(Seq(
Field("n", IntegerType, false),
Field("fact", IntegerType, false)
))
))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,69 @@ class SwaggerSchemaUtilsTest extends FlatSpec with Matchers {
))
}

it should "support allOf" in {
it should "support simple arrays" in {
val spec =
"""
|swagger: "2.0"
|definitions:
| Address:
| type: object
| properties:
| AddressLine:
| type: "array"
| items:
| type: string
|""".stripMargin

val fields = SwaggerSchemaUtils.fromSwagger(spec, Some("Address"), false)
fields should be (Seq(
Field("AddressLine", ArrayType(StringType))
))
}

it should "support struct arrays" in {
val spec =
"""
|swagger: "2.0"
|definitions:
| Address:
| description: "Service Provider's address"
| additionalProperties: false
| type: object
| properties:
| num:
| type: "integer"
| maximum: 255
| minimum: 0
| AddressLine:
| type: "array"
| items:
| description: "One of multiple address lines"
| type: "object"
| required:
| - "Latin"
| properties:
| Latin:
| type: "string"
| example: "John Doe/MR"
| LocalSpelling:
| type: "string"
| example: "李四/ MR"
|""".stripMargin

val fields = SwaggerSchemaUtils.fromSwagger(spec, Some("Address"), false)
fields should be (Seq(
Field("num", IntegerType),
Field("AddressLine", ArrayType(
StructType(Seq(
Field("Latin", StringType, false),
Field("LocalSpelling", StringType)
))
))
))
}

it should "support allOf for properties" in {
val spec =
"""
|swagger: "2.0"
Expand Down Expand Up @@ -153,24 +215,66 @@ class SwaggerSchemaUtilsTest extends FlatSpec with Matchers {
|""".stripMargin

val fields = SwaggerSchemaUtils.fromSwagger(spec, Some("Pet"), false)
fields.size should be (3)

fields(0).nullable should be (false)
fields(0).name should be ("name")
fields(0).description should be (Some("The Pets name"))
fields(0).ftype should be (StringType)
fields should be (Seq(
Field("name", StringType, false, description = Some("The Pets name")),
Field("tag", StringType),
Field("id", LongType, false, description = Some("The Pets ID"), format=Some("int64"))
))
}

fields(1).nullable should be (true)
fields(1).name should be ("tag")
fields(1).ftype should be (StringType)
it should "support allOf for nested properties" in {
val spec =
"""
|swagger: "2.0"
|info:
| version: 1.0.0
| title: Swagger Petstore
| description: A sample API that uses a petstore as an example to demonstrate features in the swagger-2.0 specification
| termsOfService: http://swagger.io/terms/
| contact:
| name: Swagger API Team
| email: [email protected]
| url: http://swagger.io
| license:
| name: Apache 2.0
|url: https://www.apache.org/licenses/LICENSE-2.0.html
|definitions:
| Pet:
| properties:
| info:
| allOf:
| -
| type: object
| required:
| - name
| properties:
| name:
| type: string
| description: The Pets name
| tag:
| type: string
| -
| type: object
| required:
| - id
| properties:
| id:
| type: integer
| format: int64
| description: The Pets ID
|""".stripMargin

fields(2).nullable should be (false)
fields(2).name should be ("id")
fields(2).description should be (Some("The Pets ID"))
fields(2).ftype should be (LongType)
val fields = SwaggerSchemaUtils.fromSwagger(spec, Some("Pet"), false)
fields should be (Seq(
Field("info", StructType(Seq(
Field("name", StringType, false, description = Some("The Pets name")),
Field("tag", StringType),
Field("id", LongType, false, description = Some("The Pets ID"), format=Some("int64"))
)))
))
}

it should "support nested allOf" in {
it should "support allOf in arrays" in {
val spec =
"""
|swagger: "2.0"
Expand Down Expand Up @@ -212,32 +316,16 @@ class SwaggerSchemaUtilsTest extends FlatSpec with Matchers {
|""".stripMargin

val fields = SwaggerSchemaUtils.fromSwagger(spec, Some("Address"), false)
fields.size should be (2)

fields(0).nullable should be (true)
fields(0).name should be ("num")
fields(0).ftype should be (IntegerType)

fields(1).nullable should be (true)
fields(1).name should be ("AddressLine")
fields(1).ftype shouldBe an[ArrayType]

val array = fields(1).ftype.asInstanceOf[ArrayType]
array.containsNull should be (true)
array.elementType shouldBe a[StructType]
val struct = array.elementType.asInstanceOf[StructType]
struct.fields.size should be (3)
struct.fields(0).nullable should be (false)
struct.fields(0).name should be ("Latin")
struct.fields(0).ftype should be (StringType)

struct.fields(1).nullable should be (true)
struct.fields(1).name should be ("LocalSpelling")
struct.fields(1).ftype should be (StringType)

struct.fields(2).nullable should be (false)
struct.fields(2).name should be ("num")
struct.fields(2).ftype should be (IntegerType)
fields should be (Seq(
Field("num", IntegerType),
Field("AddressLine", ArrayType(
StructType(Seq(
Field("Latin", StringType, false),
Field("LocalSpelling", StringType),
Field("num", IntegerType, false)
))
))
))
}

it should "support untyped enums" in {
Expand Down
Loading

0 comments on commit 0f9c501

Please sign in to comment.