Skip to content

Commit

Permalink
[Feature-3973][*] Release 1.2.0-rc4
Browse files Browse the repository at this point in the history
  • Loading branch information
aiwenmo committed Nov 28, 2024
1 parent 5c775d7 commit dbff9c8
Show file tree
Hide file tree
Showing 11 changed files with 345 additions and 42 deletions.
5 changes: 3 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ dinky.db
hs_err_pid*
.idea/*
!.idea/icon.svg
!.idea/vcs.xml
build
target/*
*.iml
Expand Down Expand Up @@ -63,4 +62,6 @@ tmp/*
extends/*
/.run/

.idea
.idea
.idea/vcs.xml
dinky-web/package-lock.json
15 changes: 2 additions & 13 deletions dinky-admin/src/main/java/org/dinky/init/FlinkHistoryServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@

package org.dinky.init;

import org.dinky.data.model.ResourcesModelEnum;
import org.dinky.data.model.S3Configuration;
import org.dinky.data.model.SystemConfiguration;
import org.dinky.data.properties.OssProperties;
import org.dinky.resource.BaseResourceManager;
import org.dinky.service.JobInstanceService;
import org.dinky.service.SysConfigService;

Expand Down Expand Up @@ -63,16 +61,7 @@ public FlinkHistoryServer(JobInstanceService jobInstanceService, SysConfigServic
this.historyRunnable = () -> {
Map<String, String> flinkHistoryServerConfiguration =
SystemConfiguration.getInstances().getFlinkHistoryServerConfiguration();
if (systemConfiguration.getResourcesEnable().getValue()) {
if (systemConfiguration.getResourcesModel().getValue().equals(ResourcesModelEnum.OSS)) {
OssProperties ossProperties = systemConfiguration.getOssProperties();
flinkHistoryServerConfiguration.put(S3Configuration.ENDPOINT, ossProperties.getEndpoint());
flinkHistoryServerConfiguration.put(S3Configuration.ACCESS_KEY, ossProperties.getAccessKey());
flinkHistoryServerConfiguration.put(S3Configuration.SECRET_KEY, ossProperties.getSecretKey());
flinkHistoryServerConfiguration.put(
S3Configuration.PATH_STYLE_ACCESS, String.valueOf(ossProperties.getPathStyleAccess()));
}
}
flinkHistoryServerConfiguration.putAll(BaseResourceManager.convertFlinkResourceConfig());

HistoryServerUtil.run(
(jobId) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneId;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

import javax.xml.bind.DatatypeConverter;
Expand Down Expand Up @@ -117,7 +116,7 @@ public static LogicalType getLogicalType(Column column) {

public static Object convertToRow(Object value, LogicalType logicalType, ZoneId timeZone) {
if (Asserts.isNull(value)) {
return Optional.empty();
return null;
}
switch (logicalType.getTypeRoot()) {
case BOOLEAN:
Expand Down Expand Up @@ -163,7 +162,7 @@ public static Object convertToRow(Object value, LogicalType logicalType, ZoneId

public static Object convertToRowData(Object value, LogicalType logicalType, ZoneId timeZone) {
if (Asserts.isNull(value)) {
return Optional.empty();
return null;
}
switch (logicalType.getTypeRoot()) {
case BOOLEAN:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@
package org.dinky.resource;

import org.dinky.data.exception.DinkyException;
import org.dinky.data.model.ResourcesModelEnum;
import org.dinky.data.model.ResourcesVO;
import org.dinky.data.model.S3Configuration;
import org.dinky.data.model.SystemConfiguration;
import org.dinky.data.properties.OssProperties;
import org.dinky.oss.OssTemplate;
import org.dinky.resource.impl.HdfsResourceManager;
import org.dinky.resource.impl.LocalResourceManager;
Expand All @@ -34,7 +37,9 @@
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import cn.hutool.core.io.FileUtil;
import cn.hutool.core.io.IoUtil;
Expand Down Expand Up @@ -106,6 +111,37 @@ static void initResourceManager() {
}
}

static Map<String, String> convertFlinkResourceConfig() {
Map<String, String> flinkConfig = new HashMap<>();
if (!instances.getResourcesEnable().getValue()) {
return flinkConfig;
}
if (instances.getResourcesModel().getValue().equals(ResourcesModelEnum.OSS)) {
OssProperties ossProperties = instances.getOssProperties();
flinkConfig.put(S3Configuration.ENDPOINT, ossProperties.getEndpoint());
flinkConfig.put(S3Configuration.ACCESS_KEY, ossProperties.getAccessKey());
flinkConfig.put(S3Configuration.SECRET_KEY, ossProperties.getSecretKey());
flinkConfig.put(S3Configuration.PATH_STYLE_ACCESS, String.valueOf(ossProperties.getPathStyleAccess()));
} else if (instances.getResourcesModel().getValue().equals(ResourcesModelEnum.HDFS)) {
final Configuration configuration = new Configuration();
Charset charset = Charset.defaultCharset();
String coreSite = instances.getResourcesHdfsCoreSite().getValue();
Opt.ofBlankAble(coreSite).ifPresent(x -> configuration.addResource(IoUtil.toStream(x, charset)));
String hdfsSite = instances.getResourcesHdfsHdfsSite().getValue();
Opt.ofBlankAble(hdfsSite).ifPresent(x -> configuration.addResource(IoUtil.toStream(x, charset)));
configuration.reloadConfiguration();
if (StrUtil.isEmpty(coreSite)) {
configuration.set(
"fs.defaultFS", instances.getResourcesHdfsDefaultFS().getValue());
}
Map<String, String> hadoopConfig = configuration.getValByRegex(".*");
hadoopConfig.forEach((key, value) -> {
flinkConfig.put("flink.hadoop." + key, value);
});
}
return flinkConfig;
}

default String getFilePath(String path) {
return FileUtil.normalize(FileUtil.file(getBasePath(), path).toString());
}
Expand Down
6 changes: 6 additions & 0 deletions dinky-core/src/main/java/org/dinky/executor/Executor.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.dinky.interceptor.FlinkInterceptor;
import org.dinky.interceptor.FlinkInterceptorResult;
import org.dinky.job.JobStatementPlan;
import org.dinky.resource.BaseResourceManager;
import org.dinky.trans.Operations;
import org.dinky.utils.KerberosUtil;

Expand Down Expand Up @@ -169,6 +170,7 @@ private void initClassloader(DinkyClassLoader classLoader) {

protected void init(DinkyClassLoader classLoader) {
initClassloader(classLoader);
initFileSystem();
this.dinkyClassLoader = classLoader;
Thread.currentThread().setContextClassLoader(classLoader);
if (executorConfig.isValidParallelism()) {
Expand All @@ -195,6 +197,10 @@ protected void init(DinkyClassLoader classLoader) {
isMockTest = false;
}

private void initFileSystem() {
BaseResourceManager.initResourceManager();
}

abstract CustomTableEnvironment createCustomTableEnvironment(ClassLoader classLoader);

public JobStatementPlan parseStatementIntoJobStatementPlan(String[] statements) {
Expand Down
25 changes: 15 additions & 10 deletions dinky-core/src/main/java/org/dinky/explainer/Explainer.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,24 +98,29 @@ public Explainer initialize(JobConfig config, String statement) {
}

public JobStatementPlan parseStatements(String[] statements) {
JobStatementPlan jobStatementPlanWithUDFAndMock = new JobStatementPlan();
JobStatementPlan jobStatementPlanWithMock = new JobStatementPlan();
generateUDFStatement(jobStatementPlanWithMock);

JobStatementPlan jobStatementPlan = executor.parseStatementIntoJobStatementPlan(statements);
jobStatementPlanWithMock.getJobStatementList().addAll(jobStatementPlan.getJobStatementList());
if (!jobManager.isPlanMode() && jobManager.getConfig().isMockSinkFunction()) {
executor.setMockTest(true);
MockStatementExplainer.build(executor.getCustomTableEnvironment())
.jobStatementPlanMock(jobStatementPlanWithMock);
}
return jobStatementPlanWithMock;
}

private void generateUDFStatement(JobStatementPlan jobStatementPlan) {
List<String> udfStatements = new ArrayList<>();
Optional.ofNullable(jobManager.getConfig().getUdfRefer())
.ifPresent(t -> t.forEach((key, value) -> {
String sql = String.format("create temporary function %s as '%s'", value, key);
udfStatements.add(sql);
}));
for (String udfStatement : udfStatements) {
jobStatementPlanWithUDFAndMock.addJobStatement(udfStatement, JobStatementType.DDL, SqlType.CREATE);
}
JobStatementPlan jobStatementPlan = executor.parseStatementIntoJobStatementPlan(statements);
jobStatementPlanWithUDFAndMock.getJobStatementList().addAll(jobStatementPlan.getJobStatementList());
if (!jobManager.isPlanMode() && jobManager.getConfig().isMockSinkFunction()) {
executor.setMockTest(true);
MockStatementExplainer.build(executor.getCustomTableEnvironment())
.jobStatementPlanMock(jobStatementPlanWithUDFAndMock);
jobStatementPlan.addJobStatement(udfStatement, JobStatementType.DDL, SqlType.CREATE);
}
return jobStatementPlanWithUDFAndMock;
}

public List<UDF> parseUDFFromStatements(String[] statements) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.dinky.job.Job;
import org.dinky.job.JobConfig;
import org.dinky.job.JobManager;
import org.dinky.trans.parse.ExecuteJarParseStrategy;
import org.dinky.utils.FlinkStreamEnvironmentUtil;
import org.dinky.utils.LogUtil;
import org.dinky.utils.SqlUtil;
Expand Down Expand Up @@ -66,6 +67,11 @@ public JobPipelineRunner(JobManager jobManager) {

@Override
public void run(JobStatement jobStatement) throws Exception {
if (ExecuteJarParseStrategy.INSTANCE.match(jobStatement.getStatement())) {
JobJarRunner jobJarRunner = new JobJarRunner(jobManager);
jobJarRunner.run(jobStatement);
return;
}
statements.add(jobStatement);
tableResult = jobManager.getExecutor().executeSql(jobStatement.getStatement());
if (statements.size() == 1) {
Expand All @@ -83,6 +89,10 @@ public void run(JobStatement jobStatement) throws Exception {

@Override
public SqlExplainResult explain(JobStatement jobStatement) {
if (ExecuteJarParseStrategy.INSTANCE.match(jobStatement.getStatement())) {
JobJarRunner jobJarRunner = new JobJarRunner(jobManager);
return jobJarRunner.explain(jobStatement);
}
SqlExplainResult.Builder resultBuilder = SqlExplainResult.Builder.newBuilder();
statements.add(jobStatement);
// pipeline job execute to generate stream graph.
Expand Down Expand Up @@ -131,6 +141,10 @@ public SqlExplainResult explain(JobStatement jobStatement) {

@Override
public StreamGraph getStreamGraph(JobStatement jobStatement) {
if (ExecuteJarParseStrategy.INSTANCE.match(jobStatement.getStatement())) {
JobJarRunner jobJarRunner = new JobJarRunner(jobManager);
return jobJarRunner.getStreamGraph(jobStatement);
}
statements.add(jobStatement);
// pipeline job execute to generate stream graph.
jobManager.getExecutor().executeSql(jobStatement.getStatement());
Expand All @@ -144,6 +158,10 @@ public StreamGraph getStreamGraph(JobStatement jobStatement) {

@Override
public JobPlanInfo getJobPlanInfo(JobStatement jobStatement) {
if (ExecuteJarParseStrategy.INSTANCE.match(jobStatement.getStatement())) {
JobJarRunner jobJarRunner = new JobJarRunner(jobManager);
return jobJarRunner.getJobPlanInfo(jobStatement);
}
statements.add(jobStatement);
// pipeline job execute to generate stream graph.
jobManager.getExecutor().executeSql(jobStatement.getStatement());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,21 @@
package org.dinky.function;

import org.dinky.function.data.model.UDF;
import org.dinky.function.util.Reflections;

import org.apache.commons.collections.CollectionUtils;
import org.apache.flink.table.catalog.FunctionLanguage;
import org.apache.flink.table.functions.UserDefinedFunction;

import java.io.File;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

import org.reflections.Reflections;
import org.reflections.scanners.Scanners;
import org.reflections.util.ClasspathHelper;
import org.reflections.util.ConfigurationBuilder;

public class FlinkUDFDiscover {
Expand All @@ -42,9 +45,28 @@ public static List<UDF> getCustomStaticUDFs() {
if (CollectionUtils.isNotEmpty(JAVA_STATIC_UDF_LIST)) {
return JAVA_STATIC_UDF_LIST;
}
Collection<URL> urls = new ArrayList<>();
String javaClassPath = System.getProperty("java.class.path");
if (javaClassPath != null) {
for (String path : javaClassPath.split(File.pathSeparator)) {
if (path.contains("/*")) {
continue;
}
if (!path.contains("extends") && !path.contains("customJar")) {
continue;
}

Reflections reflections =
new Reflections(new ConfigurationBuilder().setUrls(ClasspathHelper.forJavaClassPath()));
try {
urls.add(new File(path).toURI().toURL());
} catch (Exception e) {
if (Reflections.log != null) {
Reflections.log.warn("Could not get URL", e);
}
}
}
}

Reflections reflections = new Reflections(new ConfigurationBuilder().setUrls(urls));
Set<Class<?>> operations =
reflections.get(Scanners.SubTypes.of(UserDefinedFunction.class).asClass());
return operations.stream()
Expand Down
Loading

0 comments on commit dbff9c8

Please sign in to comment.