Skip to content

Commit

Permalink
[INLONG-8838][Sort] IcebergSource support metadata (#8895)
Browse files Browse the repository at this point in the history
  • Loading branch information
vernedeng authored Sep 18, 2023
1 parent 2f8a676 commit dc12a23
Show file tree
Hide file tree
Showing 26 changed files with 2,245 additions and 72 deletions.
58 changes: 2 additions & 56 deletions inlong-manager/manager-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -252,63 +252,9 @@
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.calcite</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.calcite.avatica</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.curator</groupId>
<artifactId>apache-curator</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-registry</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hive</groupId>
<artifactId>hive-llap-tez</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hive</groupId>
<artifactId>hive-vector-code-gen</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.ivy</groupId>
<artifactId>ivy</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.groovy</groupId>
<artifactId>groovy-all</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</exclusion>
<exclusion>
<groupId>org.antlr</groupId>
<artifactId>ST4</artifactId>
</exclusion>
<exclusion>
<groupId>stax</groupId>
<artifactId>stax-api</artifactId>
</exclusion>
</exclusions>
<artifactId>hive-standalone-metastore</artifactId>
<version>${hive3x.version}</version>
</dependency>

<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,29 @@
import com.google.common.collect.Maps;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Order;
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.SkewedInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -81,7 +89,7 @@ public HudiCatalogClient(String uri, String warehouse) throws MetaException {
public void open() {
if (this.client == null) {
try {
this.client = Hive.get(hiveConf).getMSC();
this.client = new HiveMetaStoreClient(hiveConf);
} catch (Exception e) {
throw new RuntimeException("Failed to create hive metastore client", e);
}
Expand Down Expand Up @@ -188,7 +196,7 @@ public void createTable(
HudiTableInfo tableInfo,
boolean useRealTimeInputFormat)
throws TException, IOException {
Table hiveTable = org.apache.hadoop.hive.ql.metadata.Table.getEmptyTable(dbName, tableName);
Table hiveTable = this.getEmptyTable(dbName, tableName);
hiveTable.setOwner(UserGroupInformation.getCurrentUser().getUserName());
hiveTable.setCreateTime((int) (System.currentTimeMillis() / 1000));

Expand Down Expand Up @@ -250,4 +258,43 @@ public void close() {
}
}

public Table getEmptyTable(String databaseName, String tableName) {
StorageDescriptor sd = new StorageDescriptor();
{
sd.setSerdeInfo(new SerDeInfo());
sd.setNumBuckets(-1);
sd.setBucketCols(new ArrayList<String>());
sd.setCols(new ArrayList<FieldSchema>());
sd.setParameters(new HashMap<String, String>());
sd.setSortCols(new ArrayList<Order>());
sd.getSerdeInfo().setParameters(new HashMap<String, String>());
// We have to use MetadataTypedColumnsetSerDe because LazySimpleSerDe does
// not support a table with no columns.
sd.getSerdeInfo().setSerializationLib(MetadataTypedColumnsetSerDe.class.getName());
sd.getSerdeInfo().getParameters().put(serdeConstants.SERIALIZATION_FORMAT, "1");
sd.setInputFormat(SequenceFileInputFormat.class.getName());
SkewedInfo skewInfo = new SkewedInfo();
skewInfo.setSkewedColNames(new ArrayList<String>());
skewInfo.setSkewedColValues(new ArrayList<List<String>>());
skewInfo.setSkewedColValueLocationMaps(new HashMap<List<String>, String>());
sd.setSkewedInfo(skewInfo);
}

org.apache.hadoop.hive.metastore.api.Table t = new org.apache.hadoop.hive.metastore.api.Table();
{
t.setSd(sd);
t.setPartitionKeys(new ArrayList<FieldSchema>());
t.setParameters(new HashMap<String, String>());
t.setTableType(TableType.MANAGED_TABLE.toString());
t.setDbName(databaseName);
t.setTableName(tableName);
// set create time
t.setCreateTime((int) (System.currentTimeMillis() / 1000));
}
// Explictly set the bucketing version
t.getParameters().put(hive_metastoreConstants.TABLE_BUCKETING_VERSION,
"2");
return t;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import org.apache.inlong.manager.service.sink.StreamSinkService;
import org.apache.inlong.manager.service.stream.InlongStreamService;

import org.json.JSONObject;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -140,9 +139,7 @@ public void testSourceEmptyParams() {
@Order(2)
@Transactional
public void testSourceCorrectParamsOfNoTagSortCluster() {
SortSourceConfigResponse response = sortService.getSourceConfig(TEST_CLUSTER_1, TEST_TASK_1, "");
JSONObject jo = new JSONObject(response);
System.out.println(jo);
SortSourceConfigResponse response = sortService.getSourceConfig(TEST_CLUSTER_1, TEST_TASK_1, "");;
Assertions.assertEquals(0, response.getCode());
Assertions.assertNotNull(response.getData());
Assertions.assertNotNull(response.getMd5());
Expand Down Expand Up @@ -193,8 +190,6 @@ public void testClusterEmptyParams() {
@Transactional
public void testClusterCorrectParams() {
SortClusterResponse response = sortService.getClusterConfig(TEST_CLUSTER_1, "");
JSONObject jo = new JSONObject(response);
System.out.println(jo);
Assertions.assertEquals(0, response.getCode());
Assertions.assertNotNull(response.getData());
Assertions.assertNotNull(response.getMd5());
Expand Down Expand Up @@ -232,8 +227,6 @@ public void testClusterErrorClusterName() {
@Transactional
public void testSourceCorrectParamsOfTaggedSortCluster() {
SortSourceConfigResponse response = sortService.getSourceConfig(TEST_CLUSTER_2, TEST_TASK_2, "");
JSONObject jo = new JSONObject(response);
System.out.println(jo);
Assertions.assertEquals(0, response.getCode());
Assertions.assertNotNull(response.getMd5());
Assertions.assertNotNull(response.getMsg());
Expand All @@ -248,8 +241,6 @@ public void testSourceCorrectParamsOfTaggedSortCluster() {
Assertions.assertEquals(1, zone.getTopics().size());

response = sortService.getSourceConfig(TEST_CLUSTER_3, TEST_TASK_3, "");
jo = new JSONObject(response);
System.out.println(jo);
Assertions.assertEquals(0, response.getCode());
Assertions.assertNotNull(response.getMd5());
Assertions.assertNotNull(response.getMsg());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ public final class Constants {

public static final String META_INCREMENTAL = "incremental_inlong";

public static final String META_INLONG_DATA_TIME = "inlong_data_time";

public static final ConfigOption<String> INLONG_METRIC =
ConfigOptions.key("inlong.metric.labels")
.stringType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN;
import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN_FOR_METER;
import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN_PER_SECOND;
import static org.apache.inlong.sort.base.util.CalculateObjectSizeUtils.getDataArraySize;
import static org.apache.inlong.sort.base.util.CalculateObjectSizeUtils.getDataSize;

/**
Expand Down Expand Up @@ -242,6 +243,11 @@ public void outputMetricsWithEstimate(Object data) {
outputMetrics(1, getDataSize(data));
}

public void outputMetricsWithEstimate(Object[] records) {
long size = getDataArraySize(records);
outputMetrics(records.length, size);
}

public void outputMetricsWithEstimate(Object data, long fetchDelay, long emitDelay) {
outputMetrics(1, getDataSize(data));
this.fetchDelay = fetchDelay;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.flink.table.data.binary.BinaryRowData;

import java.nio.charset.StandardCharsets;
import java.util.Arrays;

/**
* calculate tool for object
Expand All @@ -44,4 +45,10 @@ public static long getDataSize(Object object) {
return size;
}

public static long getDataArraySize(Object[] objects) {
return Arrays.stream(objects)
.mapToLong(CalculateObjectSizeUtils::getDataSize)
.sum();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,18 @@
<groupId>org.apache.inlong</groupId>
<artifactId>sort-connector-base</artifactId>
<version>${project.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libfb303</artifactId>
<version>${libfb303.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.inlong</groupId>
Expand All @@ -62,6 +74,7 @@
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>${hive3x.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
Expand All @@ -79,6 +92,17 @@
<artifactId>iceberg-hive-metastore</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version.v1.15}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<scope>compile</scope>
</dependency>
</dependencies>

<build>
Expand All @@ -87,7 +111,6 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>${plugin.shade.version}</version>

<executions>
<execution>
<id>shade-flink</id>
Expand All @@ -108,7 +131,8 @@
<include>org.apache.hive:*</include>
<!-- Include fixed version 18.0-13.0 of flink shaded guava -->
<include>org.apache.flink:flink-shaded-guava</include>
<include>com.google.protobuf:*</include>
<include>org.apache.flink:flink-connector-files</include>
<include>org.apache.flink:flink-connector-base</include>
<include>org.apache.thrift:*</include>
<include>com.facebook.*:*</include>
</includes>
Expand All @@ -123,6 +147,12 @@
<include>META-INF/services/org.apache.flink.table.factories.TableFactory</include>
</includes>
</filter>
<filter>
<artifact>org.apache.hive:hive-exec</artifact>
<excludes>
<exclude>com/google/protobuf/**</exclude>
</excludes>
</filter>
</filters>
<relocations>
<relocation>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.inlong.sort.iceberg;

import org.apache.inlong.sort.iceberg.source.IcebergTableSource;

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
Expand All @@ -38,7 +40,6 @@
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.flink.IcebergTableSink;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.source.IcebergTableSource;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;

Expand Down
Loading

0 comments on commit dc12a23

Please sign in to comment.