diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 5858425f66462..b408fcefcfb26 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -3494,7 +3494,7 @@ class AstBuilder extends DataTypeAstBuilder /** * Create an [[UnresolvedTableOrView]] from a multi-part identifier. */ - private def createUnresolvedTableOrView( + protected def createUnresolvedTableOrView( ctx: IdentifierReferenceContext, commandName: String, allowTempView: Boolean = true): LogicalPlan = withOrigin(ctx) { @@ -5198,47 +5198,6 @@ class AstBuilder extends DataTypeAstBuilder visitLocationSpec(ctx.locationSpec)) } - /** - * Create a [[DescribeColumn]] or [[DescribeRelation]] commands. - */ - override def visitDescribeRelation(ctx: DescribeRelationContext): LogicalPlan = withOrigin(ctx) { - val isExtended = ctx.EXTENDED != null || ctx.FORMATTED != null - val asJson = ctx.JSON != null - if (asJson && !isExtended) { - val tableName = ctx.identifierReference.getText.split("\\.").lastOption.getOrElse("table") - throw QueryCompilationErrors.describeJsonNotExtendedError(tableName) - } - val relation = createUnresolvedTableOrView(ctx.identifierReference, "DESCRIBE TABLE") - if (ctx.describeColName != null) { - if (ctx.partitionSpec != null) { - throw QueryParsingErrors.descColumnForPartitionUnsupportedError(ctx) - } else if (asJson) { - throw QueryCompilationErrors.describeColJsonUnsupportedError() - } else { - DescribeColumn( - relation, - UnresolvedAttribute(ctx.describeColName.nameParts.asScala.map(_.getText).toSeq), - isExtended) - } - } else { - val partitionSpec = if (ctx.partitionSpec != null) { - // According to the syntax, visitPartitionSpec returns `Map[String, Option[String]]`. - visitPartitionSpec(ctx.partitionSpec).map { - case (key, Some(value)) => key -> value - case (key, _) => - throw QueryParsingErrors.emptyPartitionKeyError(key, ctx.partitionSpec) - } - } else { - Map.empty[String, String] - } - if (asJson) { - DescribeRelationJson(relation, partitionSpec, isExtended) - } else { - DescribeRelation(relation, partitionSpec, isExtended) - } - } - } - /** * Create an [[AnalyzeTable]], or an [[AnalyzeColumn]]. * Example SQL for analyzing a table or a set of partitions : diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index b486a1fd0a72a..58c62a90225aa 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -691,19 +691,6 @@ object DescribeRelation { def getOutputAttrs: Seq[Attribute] = DescribeCommandSchema.describeTableAttributes() } -/** - * The logical plan of the DESCRIBE relation_name AS JSON command. - */ -case class DescribeRelationJson( - relation: LogicalPlan, - partitionSpec: TablePartitionSpec, - isExtended: Boolean) extends UnaryCommand { - override val output: Seq[Attribute] = DescribeCommandSchema.describeJsonTableAttributes() - override def child: LogicalPlan = relation - override protected def withNewChildInternal(newChild: LogicalPlan): DescribeRelationJson = - copy(relation = newChild) -} - /** * The logical plan of the DESCRIBE relation_name col_name command. */ diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisExceptionPositionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisExceptionPositionSuite.scala index 55f59f7a22574..325862127d366 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisExceptionPositionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisExceptionPositionSuite.scala @@ -40,7 +40,6 @@ class AnalysisExceptionPositionSuite extends AnalysisTest { } test("SPARK-34057: UnresolvedTableOrView should retain sql text position") { - verifyTableOrViewPosition("DESCRIBE TABLE unknown", "unknown") verifyTableOrPermanentViewPosition("ANALYZE TABLE unknown COMPUTE STATISTICS", "unknown") verifyTableOrViewPosition("ANALYZE TABLE unknown COMPUTE STATISTICS FOR COLUMNS col", "unknown") verifyTableOrViewPosition("ANALYZE TABLE unknown COMPUTE STATISTICS FOR ALL COLUMNS", "unknown") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index 3b58518b98da9..b73ea2f80452b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -152,10 +152,6 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) case RenameTable(ResolvedV1TableOrViewIdentifier(oldIdent), newName, isView) => AlterTableRenameCommand(oldIdent, newName.asTableIdentifier, isView) - case DescribeRelationJson( - ResolvedV1TableOrViewIdentifier(ident), partitionSpec, isExtended) => - DescribeTableJsonCommand(ident, partitionSpec, isExtended) - // Use v1 command to describe (temp) view, as v2 catalog doesn't support view yet. case DescribeRelation( ResolvedV1TableOrViewIdentifier(ident), partitionSpec, isExtended, output) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 744ab03d5d037..2b7be9b34b9aa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -27,7 +27,7 @@ import org.antlr.v4.runtime.tree.TerminalNode import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} -import org.apache.spark.sql.catalyst.analysis.{GlobalTempView, LocalTempView, PersistedView, PlanWithUnresolvedIdentifier, SchemaEvolution, SchemaTypeEvolution, UnresolvedFunctionName, UnresolvedIdentifier, UnresolvedNamespace} +import org.apache.spark.sql.catalyst.analysis.{GlobalTempView, LocalTempView, PersistedView, PlanWithUnresolvedIdentifier, SchemaEvolution, SchemaTypeEvolution, UnresolvedAttribute, UnresolvedFunctionName, UnresolvedIdentifier, UnresolvedNamespace} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions.{Expression, Literal} import org.apache.spark.sql.catalyst.parser._ @@ -1153,4 +1153,46 @@ class SparkSqlAstBuilder extends AstBuilder { withIdentClause(ctx.identifierReference(), UnresolvedNamespace(_)), cleanedProperties) } + + /** + * Create a [[DescribeColumn]] or [[DescribeRelation]] or [[DescribeRelationAsJsonCommand]] + * command. + */ + override def visitDescribeRelation(ctx: DescribeRelationContext): LogicalPlan = withOrigin(ctx) { + val isExtended = ctx.EXTENDED != null || ctx.FORMATTED != null + val asJson = ctx.JSON != null + if (asJson && !isExtended) { + val tableName = ctx.identifierReference.getText.split("\\.").lastOption.getOrElse("table") + throw QueryCompilationErrors.describeJsonNotExtendedError(tableName) + } + val relation = createUnresolvedTableOrView(ctx.identifierReference, "DESCRIBE TABLE") + if (ctx.describeColName != null) { + if (ctx.partitionSpec != null) { + throw QueryParsingErrors.descColumnForPartitionUnsupportedError(ctx) + } else if (asJson) { + throw QueryCompilationErrors.describeColJsonUnsupportedError() + } else { + DescribeColumn( + relation, + UnresolvedAttribute(ctx.describeColName.nameParts.asScala.map(_.getText).toSeq), + isExtended) + } + } else { + val partitionSpec = if (ctx.partitionSpec != null) { + // According to the syntax, visitPartitionSpec returns `Map[String, Option[String]]`. + visitPartitionSpec(ctx.partitionSpec).map { + case (key, Some(value)) => key -> value + case (key, _) => + throw QueryParsingErrors.emptyPartitionKeyError(key, ctx.partitionSpec) + } + } else { + Map.empty[String, String] + } + if (asJson) { + DescribeRelationJsonCommand(relation, partitionSpec, isExtended) + } else { + DescribeRelation(relation, partitionSpec, isExtended) + } + } + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DescribeRelationJsonCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DescribeRelationJsonCommand.scala new file mode 100644 index 0000000000000..6abe34f0ea156 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DescribeRelationJsonCommand.scala @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import scala.collection.mutable + +import org.json4s._ +import org.json4s.JsonAST.JObject +import org.json4s.jackson.JsonMethods._ + +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.catalyst.analysis.{ResolvedPersistentView, ResolvedTable, ResolvedTempView} +import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, SessionCatalog} +import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.util.quoteIfNeeded +import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ +import org.apache.spark.sql.connector.catalog.V1Table +import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.types._ +import org.apache.spark.sql.util.PartitioningUtils + +/** + * The command for `DESCRIBE ... AS JSON`. + */ +case class DescribeRelationJsonCommand( + child: LogicalPlan, + partitionSpec: TablePartitionSpec, + isExtended: Boolean, + override val output: Seq[Attribute] = Seq( + AttributeReference( + "json_metadata", + StringType, + nullable = false, + new MetadataBuilder().putString("comment", "JSON metadata of the table").build())() + )) extends UnaryRunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { + val jsonMap = mutable.LinkedHashMap[String, JValue]() + child match { + case v: ResolvedTempView => + if (partitionSpec.nonEmpty) { + throw QueryCompilationErrors.descPartitionNotAllowedOnTempView(v.identifier.name()) + } + describeIdentifier(Seq("system", "session", v.identifier.name()), jsonMap) + describeColsJson(v.metadata.schema, jsonMap) + describeFormattedTableInfoJson(v.metadata, jsonMap) + + case v: ResolvedPersistentView => + if (partitionSpec.nonEmpty) { + throw QueryCompilationErrors.descPartitionNotAllowedOnView(v.identifier.name()) + } + describeIdentifier(v.identifier.toQualifiedNameParts(v.catalog), jsonMap) + describeColsJson(v.metadata.schema, jsonMap) + describeFormattedTableInfoJson(v.metadata, jsonMap) + + case ResolvedTable(catalog, identifier, V1Table(metadata), _) => + describeIdentifier(identifier.toQualifiedNameParts(catalog), jsonMap) + val schema = if (metadata.schema.isEmpty) { + // In older versions of Spark, + // the table schema can be empty and should be inferred at runtime. + sparkSession.table(metadata.identifier).schema + } else { + metadata.schema + } + describeColsJson(schema, jsonMap) + describeClusteringInfoJson(metadata, jsonMap) + if (partitionSpec.nonEmpty) { + // Outputs the partition-specific info for the DDL command: + // "DESCRIBE [EXTENDED|FORMATTED] table_name PARTITION (partitionVal*)" + describePartitionInfoJson( + sparkSession, sparkSession.sessionState.catalog, metadata, jsonMap) + } else { + describeFormattedTableInfoJson(metadata, jsonMap) + } + + case _ => throw QueryCompilationErrors.describeAsJsonNotSupportedForV2TablesError() + } + + Seq(Row(compact(render(JObject(jsonMap.toList))))) + } + + private def addKeyValueToMap( + key: String, + value: JValue, + jsonMap: mutable.LinkedHashMap[String, JValue]): Unit = { + // Rename some JSON keys that are pre-named in describe table implementation + val renames = Map( + "inputformat" -> "input_format", + "outputformat" -> "output_format" + ) + + val normalizedKey = key.toLowerCase().replace(" ", "_") + val renamedKey = renames.getOrElse(normalizedKey, normalizedKey) + + if (!jsonMap.contains(renamedKey) && !excludedKeys.contains(renamedKey)) { + jsonMap += renamedKey -> value + } + } + + private def describeIdentifier( + ident: Seq[String], + jsonMap: mutable.LinkedHashMap[String, JValue]): Unit = { + addKeyValueToMap("table_name", JString(ident.last), jsonMap) + addKeyValueToMap("catalog_name", JString(ident.head), jsonMap) + val namespace = ident.init.tail + addKeyValueToMap("namespace", JArray(namespace.map(JString).toList), jsonMap) + if (namespace.nonEmpty) { + addKeyValueToMap("schema_name", JString(namespace.last), jsonMap) + } + } + + /** + * Util to recursively form JSON string representation of data type, used for DESCRIBE AS JSON. + * Differs from `json` in DataType.scala by providing additional fields for some types. + */ + private def jsonType(dataType: DataType): JValue = { + dataType match { + case arrayType: ArrayType => + JObject( + "name" -> JString("array"), + "element_type" -> jsonType(arrayType.elementType), + "element_nullable" -> JBool(arrayType.containsNull) + ) + + case mapType: MapType => + JObject( + "name" -> JString("map"), + "key_type" -> jsonType(mapType.keyType), + "value_type" -> jsonType(mapType.valueType), + "value_nullable" -> JBool(mapType.valueContainsNull) + ) + + case structType: StructType => + val fieldsJson = structType.fields.map { field => + val baseJson = List( + "name" -> JString(field.name), + "type" -> jsonType(field.dataType), + "nullable" -> JBool(field.nullable) + ) + val commentJson = field.getComment().map(comment => "comment" -> JString(comment)).toList + val defaultJson = + field.getCurrentDefaultValue().map(default => "default" -> JString(default)).toList + + JObject(baseJson ++ commentJson ++ defaultJson: _*) + }.toList + + JObject( + "name" -> JString("struct"), + "fields" -> JArray(fieldsJson) + ) + + case decimalType: DecimalType => + JObject( + "name" -> JString("decimal"), + "precision" -> JInt(decimalType.precision), + "scale" -> JInt(decimalType.scale) + ) + + case varcharType: VarcharType => + JObject( + "name" -> JString("varchar"), + "length" -> JInt(varcharType.length) + ) + + case charType: CharType => + JObject( + "name" -> JString("char"), + "length" -> JInt(charType.length) + ) + + // Only override TimestampType; TimestampType_NTZ type is already timestamp_ntz + case _: TimestampType => + JObject("name" -> JString("timestamp_ltz")) + + case yearMonthIntervalType: YearMonthIntervalType => + def getFieldName(field: Byte): String = YearMonthIntervalType.fieldToString(field) + + JObject( + "name" -> JString("interval"), + "start_unit" -> JString(getFieldName(yearMonthIntervalType.startField)), + "end_unit" -> JString(getFieldName(yearMonthIntervalType.endField)) + ) + + case dayTimeIntervalType: DayTimeIntervalType => + def getFieldName(field: Byte): String = DayTimeIntervalType.fieldToString(field) + + JObject( + "name" -> JString("interval"), + "start_unit" -> JString(getFieldName(dayTimeIntervalType.startField)), + "end_unit" -> JString(getFieldName(dayTimeIntervalType.endField)) + ) + + case _ => + JObject("name" -> JString(dataType.simpleString)) + } + } + + private def describeColsJson( + schema: StructType, + jsonMap: mutable.LinkedHashMap[String, JValue]): Unit = { + val columnsJson = jsonType(StructType(schema.fields)) + .asInstanceOf[JObject].find(_.isInstanceOf[JArray]).get + addKeyValueToMap("columns", columnsJson, jsonMap) + } + + private def describeClusteringInfoJson( + table: CatalogTable, jsonMap: mutable.LinkedHashMap[String, JValue]): Unit = { + table.clusterBySpec.foreach { clusterBySpec => + val clusteringColumnsJson: JValue = JArray( + clusterBySpec.columnNames.map { fieldNames => + val nestedFieldOpt = table.schema.findNestedField(fieldNames.fieldNames.toIndexedSeq) + assert(nestedFieldOpt.isDefined, + "The clustering column " + + s"${fieldNames.fieldNames.map(quoteIfNeeded).mkString(".")} " + + s"was not found in the table schema ${table.schema.catalogString}." + ) + val (path, field) = nestedFieldOpt.get + JObject( + "name" -> JString((path :+ field.name).map(quoteIfNeeded).mkString(".")), + "type" -> jsonType(field.dataType), + "comment" -> field.getComment().map(JString).getOrElse(JNull) + ) + }.toList + ) + addKeyValueToMap("clustering_information", clusteringColumnsJson, jsonMap) + } + } + + private def describeFormattedTableInfoJson( + table: CatalogTable, jsonMap: mutable.LinkedHashMap[String, JValue]): Unit = { + table.bucketSpec match { + case Some(spec) => + spec.toJsonLinkedHashMap.foreach { case (key, value) => + addKeyValueToMap(key, value, jsonMap) + } + case _ => + } + table.storage.toJsonLinkedHashMap.foreach { case (key, value) => + addKeyValueToMap(key, value, jsonMap) + } + + val filteredTableInfo = table.toJsonLinkedHashMap + + filteredTableInfo.map { case (key, value) => + addKeyValueToMap(key, value, jsonMap) + } + } + + private def describePartitionInfoJson( + spark: SparkSession, + catalog: SessionCatalog, + metadata: CatalogTable, + jsonMap: mutable.LinkedHashMap[String, JValue]): Unit = { + if (metadata.tableType == CatalogTableType.VIEW) { + throw QueryCompilationErrors.descPartitionNotAllowedOnView(metadata.identifier.identifier) + } + + DDLUtils.verifyPartitionProviderIsHive(spark, metadata, "DESC PARTITION") + val normalizedPartSpec = PartitioningUtils.normalizePartitionSpec( + partitionSpec, + metadata.partitionSchema, + metadata.identifier.quotedString, + spark.sessionState.conf.resolver) + val partition = catalog.getPartition(metadata.identifier, normalizedPartSpec) + + // First add partition details to jsonMap. + // `addKeyValueToMap` only adds unique keys, so this ensures the + // more detailed partition information is added + // in the case of duplicated key names (e.g. storage_information). + partition.toJsonLinkedHashMap.foreach { case (key, value) => + addKeyValueToMap(key, value, jsonMap) + } + + metadata.toJsonLinkedHashMap.foreach { case (key, value) => + addKeyValueToMap(key, value, jsonMap) + } + + metadata.bucketSpec match { + case Some(spec) => + spec.toJsonLinkedHashMap.foreach { case (key, value) => + addKeyValueToMap(key, value, jsonMap) + } + case _ => + } + metadata.storage.toJsonLinkedHashMap.foreach { case (key, value) => + addKeyValueToMap(key, value, jsonMap) + } + } + + // Already added to jsonMap in DescribeTableJsonCommand + private val excludedKeys = Set("catalog", "schema", "database", "table") + + override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = { + copy(child = newChild) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 73aaed0627946..a58e8fac6e36d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -19,16 +19,12 @@ package org.apache.spark.sql.execution.command import java.net.{URI, URISyntaxException} -import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.jdk.CollectionConverters._ import scala.util.control.NonFatal import org.apache.hadoop.fs.{FileContext, FsConstants, Path} import org.apache.hadoop.fs.permission.{AclEntry, AclEntryScope, AclEntryType, FsAction, FsPermission} -import org.json4s._ -import org.json4s.JsonAST.JObject -import org.json4s.jackson.JsonMethods._ import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.{SQLConfHelper, TableIdentifier} @@ -750,261 +746,6 @@ case class DescribeTableCommand( } } -/** - * Command that looks like - * {{{ - * DESCRIBE [EXTENDED|FORMATTED] table_name partitionSpec? [AS JSON]; - * }}} - */ -case class DescribeTableJsonCommand( - table: TableIdentifier, - partitionSpec: TablePartitionSpec, - isExtended: Boolean) extends LeafRunnableCommand { - override val output = DescribeCommandSchema.describeJsonTableAttributes() - // Already added to jsonMap in DescribeTableJsonCommand - private val excludedKeys = Set("catalog", "schema", "database", "table") - - override def run(sparkSession: SparkSession): Seq[Row] = { - val jsonMap = mutable.LinkedHashMap[String, JValue]() - val catalog = sparkSession.sessionState.catalog - - if (catalog.isTempView(table)) { - if (partitionSpec.nonEmpty) { - throw QueryCompilationErrors.descPartitionNotAllowedOnTempView(table.identifier) - } - val schema = catalog.getTempViewOrPermanentTableMetadata(table).schema - describeColsJson(schema, jsonMap, header = false) - } else { - val metadata = catalog.getTableRawMetadata(table) - val schema = if (metadata.schema.isEmpty) { - // In older versions of Spark, - // the table schema can be empty and should be inferred at runtime. - sparkSession.table(metadata.identifier).schema - } else { - metadata.schema - } - - addKeyValueToMap("table_name", JString(metadata.identifier.table), jsonMap) - table.catalog.foreach(catalog => addKeyValueToMap("catalog_name", JString(catalog), jsonMap)) - table.database.foreach { db => - addKeyValueToMap("namespace", JArray(List(JString(db))), jsonMap) - addKeyValueToMap("schema_name", JString(db), jsonMap) - } - - describeColsJson(schema, jsonMap, header = false) - describeClusteringInfoJson(metadata, jsonMap) - - if (partitionSpec.nonEmpty) { - // Outputs the partition-specific info for the DDL command: - // "DESCRIBE [EXTENDED|FORMATTED] table_name PARTITION (partitionVal*)" - describePartitionInfoJson(sparkSession, catalog, metadata, jsonMap) - } else { - describeFormattedTableInfoJson(metadata, jsonMap) - } - } - - Seq(Row(compact(render(JObject(jsonMap.toList))))) - } - - private def addKeyValueToMap( - key: String, - value: JValue, - jsonMap: mutable.LinkedHashMap[String, JValue]): Unit = { - // Rename some JSON keys that are pre-named in describe table implementation - val renames = Map( - "inputformat" -> "input_format", - "outputformat" -> "output_format" - ) - - val normalizedKey = key.toLowerCase().replace(" ", "_") - val renamedKey = renames.getOrElse(normalizedKey, normalizedKey) - - if (!jsonMap.contains(renamedKey) && !excludedKeys.contains(renamedKey)) { - jsonMap += renamedKey -> value - } - } - - /** - * Util to recursively form JSON string representation of data type, used for DESCRIBE AS JSON. - * Differs from `json` in DataType.scala by providing additional fields for some types. - */ - private def jsonType( - dataType: DataType): JValue = { - dataType match { - case arrayType: ArrayType => - JObject( - "name" -> JString("array"), - "element_type" -> jsonType(arrayType.elementType), - "element_nullable" -> JBool(arrayType.containsNull) - ) - - case mapType: MapType => - JObject( - "name" -> JString("map"), - "key_type" -> jsonType(mapType.keyType), - "value_type" -> jsonType(mapType.valueType), - "value_nullable" -> JBool(mapType.valueContainsNull) - ) - - case structType: StructType => - val fieldsJson = structType.fields.map { field => - val baseJson = List( - "name" -> JString(field.name), - "type" -> jsonType(field.dataType), - "nullable" -> JBool(field.nullable) - ) - val commentJson = field.getComment().map(comment => "comment" -> JString(comment)).toList - val defaultJson = - field.getCurrentDefaultValue().map(default => "default" -> JString(default)).toList - - JObject(baseJson ++ commentJson ++ defaultJson: _*) - }.toList - - JObject( - "name" -> JString("struct"), - "fields" -> JArray(fieldsJson) - ) - - case decimalType: DecimalType => - JObject( - "name" -> JString("decimal"), - "precision" -> JInt(decimalType.precision), - "scale" -> JInt(decimalType.scale) - ) - - case varcharType: VarcharType => - JObject( - "name" -> JString("varchar"), - "length" -> JInt(varcharType.length) - ) - - case charType: CharType => - JObject( - "name" -> JString("char"), - "length" -> JInt(charType.length) - ) - - // Only override TimestampType; TimestampType_NTZ type is already timestamp_ntz - case _: TimestampType => - JObject("name" -> JString("timestamp_ltz")) - - case yearMonthIntervalType: YearMonthIntervalType => - def getFieldName(field: Byte): String = YearMonthIntervalType.fieldToString(field) - - JObject( - "name" -> JString("interval"), - "start_unit" -> JString(getFieldName(yearMonthIntervalType.startField)), - "end_unit" -> JString(getFieldName(yearMonthIntervalType.endField)) - ) - - case dayTimeIntervalType: DayTimeIntervalType => - def getFieldName(field: Byte): String = DayTimeIntervalType.fieldToString(field) - - JObject( - "name" -> JString("interval"), - "start_unit" -> JString(getFieldName(dayTimeIntervalType.startField)), - "end_unit" -> JString(getFieldName(dayTimeIntervalType.endField)) - ) - - case _ => - JObject("name" -> JString(dataType.simpleString)) - } - } - - private def describeColsJson( - schema: StructType, - jsonMap: mutable.LinkedHashMap[String, JValue], - header: Boolean): Unit = { - val columnsJson = jsonType(StructType(schema.fields)) - .asInstanceOf[JObject].find(_.isInstanceOf[JArray]).get - addKeyValueToMap("columns", columnsJson, jsonMap) - } - - private def describeClusteringInfoJson( - table: CatalogTable, jsonMap: mutable.LinkedHashMap[String, JValue]): Unit = { - table.clusterBySpec.foreach { clusterBySpec => - val clusteringColumnsJson: JValue = JArray( - clusterBySpec.columnNames.map { fieldNames => - val nestedFieldOpt = table.schema.findNestedField(fieldNames.fieldNames.toIndexedSeq) - assert(nestedFieldOpt.isDefined, - "The clustering column " + - s"${fieldNames.fieldNames.map(quoteIfNeeded).mkString(".")} " + - s"was not found in the table schema ${table.schema.catalogString}." - ) - val (path, field) = nestedFieldOpt.get - JObject( - "name" -> JString((path :+ field.name).map(quoteIfNeeded).mkString(".")), - "type" -> jsonType(field.dataType), - "comment" -> field.getComment().map(JString).getOrElse(JNull) - ) - }.toList - ) - addKeyValueToMap("clustering_information", clusteringColumnsJson, jsonMap) - } - } - - private def describeFormattedTableInfoJson( - table: CatalogTable, jsonMap: mutable.LinkedHashMap[String, JValue]): Unit = { - table.bucketSpec match { - case Some(spec) => - spec.toJsonLinkedHashMap.foreach { case (key, value) => - addKeyValueToMap(key, value, jsonMap) - } - case _ => - } - table.storage.toJsonLinkedHashMap.foreach { case (key, value) => - addKeyValueToMap(key, value, jsonMap) - } - - val filteredTableInfo = table.toJsonLinkedHashMap - - filteredTableInfo.map { case (key, value) => - addKeyValueToMap(key, value, jsonMap) - } - } - - private def describePartitionInfoJson( - spark: SparkSession, - catalog: SessionCatalog, - metadata: CatalogTable, - jsonMap: mutable.LinkedHashMap[String, JValue]): Unit = { - if (metadata.tableType == CatalogTableType.VIEW) { - throw QueryCompilationErrors.descPartitionNotAllowedOnView(table.identifier) - } - - DDLUtils.verifyPartitionProviderIsHive(spark, metadata, "DESC PARTITION") - val normalizedPartSpec = PartitioningUtils.normalizePartitionSpec( - partitionSpec, - metadata.partitionSchema, - table.quotedString, - spark.sessionState.conf.resolver) - val partition = catalog.getPartition(table, normalizedPartSpec) - - // First add partition details to jsonMap. - // `addKeyValueToMap` only adds unique keys, so this ensures the - // more detailed partition information is added - // in the case of duplicated key names (e.g. storage_information). - partition.toJsonLinkedHashMap.map { case (key, value) => - addKeyValueToMap(key, value, jsonMap) - } - - metadata.toJsonLinkedHashMap.map { case (key, value) => - addKeyValueToMap(key, value, jsonMap) - } - - metadata.bucketSpec match { - case Some(spec) => - spec.toJsonLinkedHashMap.map { case (key, value) => - addKeyValueToMap(key, value, jsonMap) - } - case _ => - } - metadata.storage.toJsonLinkedHashMap.map { case (key, value) => - addKeyValueToMap(key, value, jsonMap) - } - } -} - /** * Command that looks like * {{{ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala index f654c846c8a57..6428583c9e1ea 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -793,7 +793,6 @@ object ViewHelper extends SQLConfHelper with Logging { originalText: String, tempFunctions: Seq[String]): CatalogTable = { - val catalog = session.sessionState.catalog val tempViews = collectTemporaryViews(analyzedPlan) val tempVariables = collectTemporaryVariables(analyzedPlan) // TBLPROPERTIES is not allowed for temporary view, so we don't use it for @@ -808,6 +807,7 @@ object ViewHelper extends SQLConfHelper with Logging { storage = CatalogStorageFormat.empty, schema = viewSchema, viewText = Some(originalText), + createVersion = org.apache.spark.SPARK_VERSION, properties = newProperties) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index f7a3be9254758..499721fbae4e8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -344,9 +344,6 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat case DescribeNamespace(ResolvedNamespace(catalog, ns, _), extended, output) => DescribeNamespaceExec(output, catalog.asNamespaceCatalog, ns, extended) :: Nil - case DescribeRelationJson(_, _, _) => - throw QueryCompilationErrors.describeAsJsonNotSupportedForV2TablesError() - case DescribeRelation(r: ResolvedTable, partitionSpec, isExtended, output) => if (partitionSpec.nonEmpty) { throw QueryCompilationErrors.describeDoesNotSupportPartitionForV2TablesError() diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/describe.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/describe.sql.out index f52f69a5ff808..307a0a3e25fbd 100644 --- a/sql/core/src/test/resources/sql-tests/analyzer-results/describe.sql.out +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/describe.sql.out @@ -59,7 +59,8 @@ DescribeTableCommand `spark_catalog`.`default`.`t`, false, [col_name#x, data_typ -- !query DESCRIBE EXTENDED t AS JSON -- !query analysis -DescribeTableJsonCommand `spark_catalog`.`default`.`t`, true +DescribeRelationJsonCommand true, [json_metadata#x] ++- ResolvedTable V2SessionCatalog(spark_catalog), default.t, V1Table(default.t), [a#x, b#x, c#x, d#x] -- !query @@ -142,7 +143,8 @@ DescribeTableCommand `spark_catalog`.`default`.`t`, [c=Us, d=1], false, [col_nam -- !query DESC EXTENDED t PARTITION (c='Us', d=1) AS JSON -- !query analysis -DescribeTableJsonCommand `spark_catalog`.`default`.`t`, [c=Us, d=1], true +DescribeRelationJsonCommand [c=Us, d=1], true, [json_metadata#x] ++- ResolvedTable V2SessionCatalog(spark_catalog), default.t, V1Table(default.t), [a#x, b#x, c#x, d#x] -- !query @@ -328,7 +330,7 @@ ExplainCommand 'DescribeRelation [c=Us, d=2], false, [col_name#x, data_type#x, c -- !query EXPLAIN DESCRIBE EXTENDED t PARTITION (c='Us', d=2) AS JSON -- !query analysis -ExplainCommand 'DescribeRelationJson [c=Us, d=2], true, SimpleMode +ExplainCommand 'DescribeRelationJsonCommand [c=Us, d=2], true, [json_metadata#x], SimpleMode -- !query diff --git a/sql/core/src/test/resources/sql-tests/results/describe.sql.out b/sql/core/src/test/resources/sql-tests/results/describe.sql.out index 870ad02e71414..70870131163e5 100644 --- a/sql/core/src/test/resources/sql-tests/results/describe.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/describe.sql.out @@ -693,8 +693,9 @@ EXPLAIN DESCRIBE EXTENDED t PARTITION (c='Us', d=2) AS JSON struct -- !query output == Physical Plan == -Execute DescribeTableJsonCommand - +- DescribeTableJsonCommand `spark_catalog`.`default`.`t`, [c=Us, d=2], true +Execute DescribeRelationJsonCommand + +- DescribeRelationJsonCommand [c=Us, d=2], true, [json_metadata#x] + +- ResolvedTable V2SessionCatalog(spark_catalog), default.t, V1Table(default.t), [a#x, b#x, c#x, d#x] -- !query diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DescribeTableParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DescribeTableParserSuite.scala index d81f007e2a4d3..f8174d24c9499 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DescribeTableParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DescribeTableParserSuite.scala @@ -19,10 +19,12 @@ package org.apache.spark.sql.execution.command import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, UnresolvedAttribute, UnresolvedTableOrView} -import org.apache.spark.sql.catalyst.parser.CatalystSqlParser.parsePlan import org.apache.spark.sql.catalyst.plans.logical.{DescribeColumn, DescribeRelation} +import org.apache.spark.sql.test.SharedSparkSession + +class DescribeTableParserSuite extends SharedSparkSession with AnalysisTest { + private def parsePlan(statement: String) = spark.sessionState.sqlParser.parsePlan(statement) -class DescribeTableParserSuite extends AnalysisTest { test("SPARK-17328: Fix NPE with EXPLAIN DESCRIBE TABLE") { comparePlans(parsePlan("describe t"), DescribeRelation( @@ -92,4 +94,17 @@ class DescribeTableParserSuite extends AnalysisTest { start = 0, stop = 47)) } + + test("retain sql text position") { + val tbl = "unknown" + val sqlStatement = s"DESCRIBE TABLE $tbl" + val startPos = sqlStatement.indexOf(tbl) + assert(startPos != -1) + assertAnalysisErrorCondition( + parsePlan(sqlStatement), + "TABLE_OR_VIEW_NOT_FOUND", + Map("relationName" -> s"`$tbl`"), + Array(ExpectedContext(tbl, startPos, startPos + tbl.length - 1)) + ) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala index 541fec1cb3740..2cc203129817b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala @@ -31,8 +31,8 @@ import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, AnalysisTest, An import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType, InMemoryCatalog, SessionCatalog, TempVariableManager} import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Cast, EqualTo, Expression, InSubquery, IntegerLiteral, ListQuery, Literal, StringLiteral} import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke -import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException} -import org.apache.spark.sql.catalyst.plans.logical.{AlterColumn, AnalysisOnlyCommand, AppendData, Assignment, CreateTable, CreateTableAsSelect, DeleteAction, DeleteFromTable, DescribeRelation, DescribeRelationJson, DropTable, InsertAction, InsertIntoStatement, LocalRelation, LogicalPlan, MergeIntoTable, OneRowRelation, OverwriteByExpression, OverwritePartitionsDynamic, Project, SetTableLocation, SetTableProperties, ShowTableProperties, SubqueryAlias, UnsetTableProperties, UpdateAction, UpdateTable} +import org.apache.spark.sql.catalyst.parser.ParseException +import org.apache.spark.sql.catalyst.plans.logical.{AlterColumn, AnalysisOnlyCommand, AppendData, Assignment, CreateTable, CreateTableAsSelect, DeleteAction, DeleteFromTable, DescribeRelation, DropTable, InsertAction, InsertIntoStatement, LocalRelation, LogicalPlan, MergeIntoTable, OneRowRelation, OverwriteByExpression, OverwritePartitionsDynamic, Project, SetTableLocation, SetTableProperties, ShowTableProperties, SubqueryAlias, UnsetTableProperties, UpdateAction, UpdateTable} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.util.TypeUtils.toSQLId import org.apache.spark.sql.connector.FakeV2Provider @@ -45,11 +45,12 @@ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} import org.apache.spark.sql.internal.SQLConf.{PARTITION_OVERWRITE_MODE, PartitionOverwriteMode} import org.apache.spark.sql.sources.SimpleScanSource +import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.{BooleanType, CharType, DoubleType, IntegerType, LongType, StringType, StructField, StructType, VarcharType} import org.apache.spark.unsafe.types.UTF8String -class PlanResolutionSuite extends AnalysisTest { - import CatalystSqlParser._ +class PlanResolutionSuite extends SharedSparkSession with AnalysisTest { + private def parsePlan(statement: String) = spark.sessionState.sqlParser.parsePlan(statement) private val v1Format = classOf[SimpleScanSource].getName private val v2Format = classOf[FakeV2Provider].getName @@ -240,7 +241,7 @@ class PlanResolutionSuite extends AnalysisTest { } // We don't check analysis here by default, as we expect the plan to be unresolved // such as `CreateTable`. - val analyzed = analyzer.execute(CatalystSqlParser.parsePlan(query)) + val analyzed = analyzer.execute(parsePlan(query)) if (checkAnalysis) { analyzer.checkAnalysis(analyzed) } @@ -961,43 +962,6 @@ class PlanResolutionSuite extends AnalysisTest { assert(parsed4.isInstanceOf[DescribeTableCommand]) } - test("DESCRIBE AS JSON relation") { - Seq("v1Table" -> true, "v2Table" -> false, "testcat.tab" -> false).foreach { - case (tblName, useV1Command) => - val sql = s"DESC TABLE EXTENDED $tblName AS JSON" - val parsed = parseAndResolve(sql) - if (useV1Command) { - val expected2 = DescribeTableJsonCommand( - TableIdentifier(tblName, Some("default"), Some(SESSION_CATALOG_NAME)), - Map.empty, true) - - comparePlans(parsed, expected2) - } else { - parsed match { - case DescribeRelationJson(_: ResolvedTable, _, isExtended) => - assert(isExtended) - case _ => fail("Expect DescribeTable, but got:\n" + parsed.treeString) - } - } - - val sql2 = s"DESC TABLE EXTENDED $tblName PARTITION(a=1) AS JSON" - val parsed2 = parseAndResolve(sql2) - if (useV1Command) { - val expected2 = DescribeTableJsonCommand( - TableIdentifier(tblName, Some("default"), Some(SESSION_CATALOG_NAME)), - Map("a" -> "1"), true) - comparePlans(parsed2, expected2) - } else { - parsed2 match { - case DescribeRelationJson(_: ResolvedTable, partitionSpec, isExtended) => - assert(isExtended) - assert(partitionSpec == Map("a" -> "1")) - case _ => fail("Expect DescribeTable, but got:\n" + parsed2.treeString) - } - } - } - } - test("DELETE FROM") { Seq("v2Table", "testcat.tab").foreach { tblName => val sql1 = s"DELETE FROM $tblName" @@ -2904,9 +2868,8 @@ class PlanResolutionSuite extends AnalysisTest { exception = intercept[ParseException] { parsePlan(query) }, - condition = "_LEGACY_ERROR_TEMP_0035", - parameters = Map( - "message" -> "CREATE TEMPORARY TABLE ..., use CREATE TEMPORARY VIEW instead"), + condition = "_LEGACY_ERROR_TEMP_0046", + parameters = Map(), context = ExpectedContext(fragment = query, start = 0, stop = 48)) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/DescribeTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/DescribeTableSuite.scala index cae56754ba465..3602853e53aa8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/DescribeTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/DescribeTableSuite.scala @@ -210,108 +210,6 @@ trait DescribeTableSuiteBase extends command.DescribeTableSuiteBase Row("histogram", "NULL"))) } } -} - -/** - * The class contains tests for the `DESCRIBE TABLE` command to check V1 In-Memory - * table catalog. - */ -class DescribeTableSuite extends DescribeTableSuiteBase with CommandSuiteBase { - override def commandVersion: String = super[DescribeTableSuiteBase].commandVersion - - test("DESCRIBE TABLE EXTENDED of a partitioned table") { - withNamespaceAndTable("ns", "table") { tbl => - spark.sql(s"CREATE TABLE $tbl (id bigint, data string) $defaultUsing" + - " PARTITIONED BY (id)" + - " TBLPROPERTIES ('bar'='baz')" + - " COMMENT 'this is a test table'" + - " DEFAULT COLLATION unicode" + - " LOCATION 'file:/tmp/testcat/table_name'") - val descriptionDf = spark.sql(s"DESCRIBE TABLE EXTENDED $tbl") - assert(descriptionDf.schema.map(field => (field.name, field.dataType)) === Seq( - ("col_name", StringType), - ("data_type", StringType), - ("comment", StringType))) - QueryTest.checkAnswer( - descriptionDf.filter("!(col_name in ('Created Time', 'Created By'))"), - Seq( - Row("data", "string", null), - Row("id", "bigint", null), - Row("# Partition Information", "", ""), - Row("# col_name", "data_type", "comment"), - Row("id", "bigint", null), - Row("", "", ""), - Row("# Detailed Table Information", "", ""), - Row("Catalog", SESSION_CATALOG_NAME, ""), - Row("Database", "ns", ""), - Row("Table", "table", ""), - Row("Last Access", "UNKNOWN", ""), - Row("Type", "EXTERNAL", ""), - Row("Provider", getProvider(), ""), - Row("Comment", "this is a test table", ""), - Row("Collation", "UNICODE", ""), - Row("Table Properties", "[bar=baz]", ""), - Row("Location", "file:/tmp/testcat/table_name", ""), - Row("Partition Provider", "Catalog", ""))) - } - } - - test("DESCRIBE TABLE EXTENDED of a table with a default column value") { - withTable("t") { - spark.sql(s"CREATE TABLE t (id bigint default 42) $defaultUsing") - val descriptionDf = spark.sql(s"DESCRIBE TABLE EXTENDED t") - assert(descriptionDf.schema.map { field => - (field.name, field.dataType) - } === Seq( - ("col_name", StringType), - ("data_type", StringType), - ("comment", StringType))) - QueryTest.checkAnswer( - descriptionDf.filter( - "!(col_name in ('Created Time', 'Created By', 'Database', 'Location', " + - "'Provider', 'Type'))"), - Seq( - Row("id", "bigint", null), - Row("", "", ""), - Row("# Detailed Table Information", "", ""), - Row("Catalog", SESSION_CATALOG_NAME, ""), - Row("Table", "t", ""), - Row("Last Access", "UNKNOWN", ""), - Row("", "", ""), - Row("# Column Default Values", "", ""), - Row("id", "bigint", "42") - )) - } - } - - test("DESCRIBE AS JSON throws when not EXTENDED") { - withNamespaceAndTable("ns", "table") { t => - val tableCreationStr = - s""" - |CREATE TABLE $t ( - | employee_id INT, - | employee_name STRING, - | department STRING, - | hire_date DATE - |) USING parquet - |OPTIONS ('compression' = 'snappy', 'max_records' = '1000') - |PARTITIONED BY (department, hire_date) - |CLUSTERED BY (employee_id) SORTED BY (employee_name ASC) INTO 4 BUCKETS - |COMMENT 'Employee data table for testing partitions and buckets' - |TBLPROPERTIES ('version' = '1.0') - |""".stripMargin - spark.sql(tableCreationStr) - - val error = intercept[AnalysisException] { - spark.sql(s"DESCRIBE $t AS JSON") - } - - checkError( - exception = error, - condition = "DESCRIBE_JSON_NOT_EXTENDED", - parameters = Map("tableName" -> "table")) - } - } test("DESCRIBE AS JSON partitions, clusters, buckets") { withNamespaceAndTable("ns", "table") { t => @@ -356,10 +254,11 @@ class DescribeTableSuite extends DescribeTableSuiteBase with CommandSuiteBase { table_properties = Some(Map( "version" -> "1.0" )), - location = Some(""), - serde_library = Some("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"), - inputformat = Some("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"), - outputformat = Some("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"), + serde_library = if (getProvider() == "hive") { + Some("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe") + } else { + None + }, storage_properties = Some(Map( "compression" -> "snappy", "max_records" -> "1000" @@ -368,17 +267,9 @@ class DescribeTableSuite extends DescribeTableSuiteBase with CommandSuiteBase { partition_columns = Some(List("department", "hire_date")) ) - if (getProvider() == "hive") { - assert(expectedOutput == parsedOutput.copy(owner = None, - created_time = None, - location = Some(""))) - } else { - assert(expectedOutput.copy(inputformat = None, outputformat = None, serde_library = None) - == parsedOutput.copy(owner = None, created_time = None, location = Some(""))) - } - parsedOutput.created_time.foreach { createdTime => - assert(iso8601Regex.matches(createdTime)) - } + assert(parsedOutput.location.isDefined) + assert(iso8601Regex.matches(parsedOutput.created_time.get)) + assert(expectedOutput == parsedOutput.copy(location = None, created_time = None)) } } @@ -426,33 +317,20 @@ class DescribeTableSuite extends DescribeTableSuiteBase with CommandSuiteBase { table_properties = Some(Map( "t" -> "test" )), - serde_library = Some("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"), - inputformat = Some("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"), - outputformat = Some("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"), - storage_properties = Some(Map( - "serialization.format" -> "1" - )), + serde_library = if (getProvider() == "hive") { + Some("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe") + } else { + None + }, partition_provider = Some("Catalog"), partition_columns = Some(List("region", "category")), partition_values = Some(Map("region" -> "USA", "category" -> "tech")) ) - val filteredParsedStorageProperties = - parsedOutput.storage_properties.map(_.filterNot { case (key, _) => key == "path" }) - - if (getProvider() == "hive") { - assert(expectedOutput == - parsedOutput.copy(location = None, created_time = None, owner = None, - storage_properties = filteredParsedStorageProperties)) - } else { - assert(expectedOutput.copy( - inputformat = None, outputformat = None, serde_library = None, storage_properties = None) - == parsedOutput.copy(location = None, created_time = None, owner = None, - storage_properties = filteredParsedStorageProperties)) - } - parsedOutput.created_time.foreach { createdTime => - assert(iso8601Regex.matches(createdTime)) - } + assert(parsedOutput.location.isDefined) + assert(iso8601Regex.matches(parsedOutput.created_time.get)) + assert(expectedOutput == parsedOutput.copy( + location = None, created_time = None, storage_properties = None)) } } @@ -494,113 +372,67 @@ class DescribeTableSuite extends DescribeTableSuiteBase with CommandSuiteBase { bucket_columns = Some(Nil), sort_columns = Some(Nil), comment = Some("table_comment"), - serde_library = Some("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"), - inputformat = Some("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"), - outputformat = Some("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"), + serde_library = if (getProvider() == "hive") { + Some("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe") + } else { + None + }, table_properties = None ) - if (getProvider() == "hive") { - assert( - expectedOutput == - parsedOutput.copy(location = None, created_time = None, owner = None) - ) - } else { - assert( - expectedOutput.copy(inputformat = None, outputformat = None, serde_library = None) == - parsedOutput.copy(location = None, created_time = None, owner = None) - ) - } - parsedOutput.created_time.foreach { createdTime => - assert(iso8601Regex.matches(createdTime)) - } - } - } - - test("DESCRIBE AS JSON temp view") { - withNamespaceAndTable("ns", "table") { t => - withTempView("temp_view") { - val tableCreationStr = - s""" - |CREATE TABLE $t (id INT, name STRING, created_at TIMESTAMP) - | USING parquet - | OPTIONS ('compression' 'snappy') - | CLUSTERED BY (id, name) SORTED BY (created_at) INTO 4 BUCKETS - | COMMENT 'test temp view' - | TBLPROPERTIES ('parquet.encryption' = 'true') - |""".stripMargin - spark.sql(tableCreationStr) - spark.sql(s"CREATE TEMPORARY VIEW temp_view AS SELECT * FROM $t") - val descriptionDf = spark.sql(s"DESCRIBE EXTENDED temp_view AS JSON") - val firstRow = descriptionDf.select("json_metadata").head() - val jsonValue = firstRow.getString(0) - val parsedOutput = parse(jsonValue).extract[DescribeTableJson] - - val expectedOutput = DescribeTableJson( - columns = Some(List( - TableColumn("id", Type("int")), - TableColumn("name", Type("string")), - TableColumn("created_at", Type("timestamp_ltz")) - )) - ) - - assert(expectedOutput == parsedOutput) - } + assert(parsedOutput.location.isDefined) + assert(iso8601Regex.matches(parsedOutput.created_time.get)) + assert(expectedOutput == parsedOutput.copy(location = None, created_time = None)) } } - test("DESCRIBE AS JSON persistent view") { - withNamespaceAndTable("ns", "table") { t => - withView("view") { - val tableCreationStr = - s""" - |CREATE TABLE $t (id INT, name STRING, created_at TIMESTAMP) - | USING parquet - | OPTIONS ('compression' 'snappy') - | CLUSTERED BY (id, name) SORTED BY (created_at) INTO 4 BUCKETS - | COMMENT 'test temp view' - | TBLPROPERTIES ('parquet.encryption' = 'true') - |""".stripMargin - spark.sql(tableCreationStr) - spark.sql(s"CREATE VIEW view AS SELECT * FROM $t") - val descriptionDf = spark.sql(s"DESCRIBE EXTENDED view AS JSON") - val firstRow = descriptionDf.select("json_metadata").head() - val jsonValue = firstRow.getString(0) - val parsedOutput = parse(jsonValue).extract[DescribeTableJson] - - val expectedOutput = DescribeTableJson( - table_name = Some("view"), - catalog_name = Some("spark_catalog"), - namespace = Some(List("default")), - schema_name = Some("default"), - columns = Some(List( - TableColumn("id", Type("int")), - TableColumn("name", Type("string")), - TableColumn("created_at", Type("timestamp_ltz")) - )), - serde_library = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"), - inputformat = Some("org.apache.hadoop.mapred.SequenceFileInputFormat"), - outputformat = Some("org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat"), - storage_properties = Some(Map("serialization.format" -> "1")), - last_access = Some("UNKNOWN"), - created_by = Some(s"Spark $SPARK_VERSION"), - `type` = Some("VIEW"), - view_text = Some("SELECT * FROM spark_catalog.ns.table"), - view_original_text = Some("SELECT * FROM spark_catalog.ns.table"), - view_schema_mode = Some("COMPENSATION"), - view_catalog_and_namespace = Some("spark_catalog.default"), - view_query_output_columns = Some(List("id", "name", "created_at")) - ) + test("DESCRIBE AS JSON view") { + Seq(true, false).foreach { isTemp => + withNamespaceAndTable("ns", "table") { t => + withView("view") { + val tableCreationStr = + s""" + |CREATE TABLE $t (id INT, name STRING, created_at TIMESTAMP) + | USING parquet + | OPTIONS ('compression' 'snappy') + | CLUSTERED BY (id, name) SORTED BY (created_at) INTO 4 BUCKETS + | COMMENT 'test temp view' + | TBLPROPERTIES ('parquet.encryption' = 'true') + |""".stripMargin + spark.sql(tableCreationStr) + val viewType = if (isTemp) "TEMP VIEW" else "VIEW" + spark.sql(s"CREATE $viewType view AS SELECT * FROM $t") + val descriptionDf = spark.sql(s"DESCRIBE EXTENDED view AS JSON") + val firstRow = descriptionDf.select("json_metadata").head() + val jsonValue = firstRow.getString(0) + val parsedOutput = parse(jsonValue).extract[DescribeTableJson] + + val expectedOutput = DescribeTableJson( + table_name = Some("view"), + catalog_name = if (isTemp) Some("system") else Some("spark_catalog"), + namespace = if (isTemp) Some(List("session")) else Some(List("default")), + schema_name = if (isTemp) Some("session") else Some("default"), + columns = Some(List( + TableColumn("id", Type("int")), + TableColumn("name", Type("string")), + TableColumn("created_at", Type("timestamp_ltz")) + )), + last_access = Some("UNKNOWN"), + created_by = Some(s"Spark $SPARK_VERSION"), + `type` = Some("VIEW"), + view_text = Some("SELECT * FROM spark_catalog.ns.table"), + view_original_text = if (isTemp) None else Some("SELECT * FROM spark_catalog.ns.table"), + // TODO: this is unexpected and temp view should also use COMPENSATION mode. + view_schema_mode = if (isTemp) Some("BINDING") else Some("COMPENSATION"), + view_catalog_and_namespace = Some("spark_catalog.default"), + view_query_output_columns = Some(List("id", "name", "created_at")) + ) - if (getProvider() == "hive") { - assert(expectedOutput == - parsedOutput.copy(table_properties = None, created_time = None, owner = None)) - } else { - assert(expectedOutput.copy(inputformat = None, - outputformat = None, serde_library = None, storage_properties = None) - == parsedOutput.copy(table_properties = None, created_time = None, owner = None)) - } - parsedOutput.created_time.foreach { createdTime => - assert(iso8601Regex.matches(createdTime)) + assert(iso8601Regex.matches(parsedOutput.created_time.get)) + assert(expectedOutput == parsedOutput.copy( + created_time = None, + table_properties = None, + storage_properties = None, + serde_library = None)) } } } @@ -755,9 +587,11 @@ class DescribeTableSuite extends DescribeTableSuiteBase with CommandSuiteBase { default = None ) )), - serde_library = Some("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"), - inputformat = Some("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"), - outputformat = Some("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"), + serde_library = if (getProvider() == "hive") { + Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe") + } else { + None + }, storage_properties = Some(Map( "option1" -> "value1", "option2" -> "value2" @@ -775,16 +609,82 @@ class DescribeTableSuite extends DescribeTableSuiteBase with CommandSuiteBase { partition_columns = Some(List("id")) ) - if (getProvider() == "hive") { - assert(expectedOutput == - parsedOutput.copy(location = None, created_time = None, owner = None)) - } else { - assert(expectedOutput.copy(inputformat = None, outputformat = None, serde_library = None) - == parsedOutput.copy(location = None, created_time = None, owner = None)) - } - parsedOutput.created_time.foreach { createdTime => - assert(iso8601Regex.matches(createdTime)) - } + assert(parsedOutput.location.isDefined) + assert(iso8601Regex.matches(parsedOutput.created_time.get)) + assert(expectedOutput == parsedOutput.copy(location = None, created_time = None)) + } + } +} + +/** + * The class contains tests for the `DESCRIBE TABLE` command to check V1 In-Memory + * table catalog. + */ +class DescribeTableSuite extends DescribeTableSuiteBase with CommandSuiteBase { + override def commandVersion: String = super[DescribeTableSuiteBase].commandVersion + + test("DESCRIBE TABLE EXTENDED of a partitioned table") { + withNamespaceAndTable("ns", "table") { tbl => + spark.sql(s"CREATE TABLE $tbl (id bigint, data string) $defaultUsing" + + " PARTITIONED BY (id)" + + " TBLPROPERTIES ('bar'='baz')" + + " COMMENT 'this is a test table'" + + " DEFAULT COLLATION unicode" + + " LOCATION 'file:/tmp/testcat/table_name'") + val descriptionDf = spark.sql(s"DESCRIBE TABLE EXTENDED $tbl") + assert(descriptionDf.schema.map(field => (field.name, field.dataType)) === Seq( + ("col_name", StringType), + ("data_type", StringType), + ("comment", StringType))) + QueryTest.checkAnswer( + descriptionDf.filter("!(col_name in ('Created Time', 'Created By'))"), + Seq( + Row("data", "string", null), + Row("id", "bigint", null), + Row("# Partition Information", "", ""), + Row("# col_name", "data_type", "comment"), + Row("id", "bigint", null), + Row("", "", ""), + Row("# Detailed Table Information", "", ""), + Row("Catalog", SESSION_CATALOG_NAME, ""), + Row("Database", "ns", ""), + Row("Table", "table", ""), + Row("Last Access", "UNKNOWN", ""), + Row("Type", "EXTERNAL", ""), + Row("Provider", getProvider(), ""), + Row("Comment", "this is a test table", ""), + Row("Collation", "UNICODE", ""), + Row("Table Properties", "[bar=baz]", ""), + Row("Location", "file:/tmp/testcat/table_name", ""), + Row("Partition Provider", "Catalog", ""))) + } + } + + test("DESCRIBE TABLE EXTENDED of a table with a default column value") { + withTable("t") { + spark.sql(s"CREATE TABLE t (id bigint default 42) $defaultUsing") + val descriptionDf = spark.sql(s"DESCRIBE TABLE EXTENDED t") + assert(descriptionDf.schema.map { field => + (field.name, field.dataType) + } === Seq( + ("col_name", StringType), + ("data_type", StringType), + ("comment", StringType))) + QueryTest.checkAnswer( + descriptionDf.filter( + "!(col_name in ('Created Time', 'Created By', 'Database', 'Location', " + + "'Provider', 'Type'))"), + Seq( + Row("id", "bigint", null), + Row("", "", ""), + Row("# Detailed Table Information", "", ""), + Row("Catalog", SESSION_CATALOG_NAME, ""), + Row("Table", "t", ""), + Row("Last Access", "UNKNOWN", ""), + Row("", "", ""), + Row("# Column Default Values", "", ""), + Row("id", "bigint", "42") + )) } } } @@ -796,7 +696,6 @@ case class DescribeTableJson( namespace: Option[List[String]] = Some(Nil), schema_name: Option[String] = None, columns: Option[List[TableColumn]] = Some(Nil), - owner: Option[String] = None, created_time: Option[String] = None, last_access: Option[String] = None, created_by: Option[String] = None, @@ -808,8 +707,6 @@ case class DescribeTableJson( table_properties: Option[Map[String, String]] = None, location: Option[String] = None, serde_library: Option[String] = None, - inputformat: Option[String] = None, - outputformat: Option[String] = None, storage_properties: Option[Map[String, String]] = None, partition_provider: Option[String] = None, partition_columns: Option[List[String]] = Some(Nil),