Skip to content

Commit

Permalink
Optimize expression variable replacement methods
Browse files Browse the repository at this point in the history
  • Loading branch information
Zzm0809 committed Mar 8, 2024
1 parent 1dd5e07 commit 95673a8
Show file tree
Hide file tree
Showing 14 changed files with 198 additions and 65 deletions.
1 change: 1 addition & 0 deletions dinky-admin/src/main/java/org/dinky/init/SystemInit.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ public void run(ApplicationArguments args) {
initResources();
List<Tenant> tenants = tenantService.list();
sysConfigService.initSysConfig();
sysConfigService.initExpressionVariables();

for (Tenant tenant : tenants) {
taskService.initDefaultFlinkSQLEnv(tenant.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ public interface SysConfigService extends ISuperService<SysConfig> {
*/
void initSysConfig();

/**
* Initialize expression variables.
*/
void initExpressionVariables();

/**
* Update system configurations by key-value pairs.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

package org.dinky.service.impl;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.extern.slf4j.Slf4j;
import org.dinky.context.EngineContextHolder;
import org.dinky.data.model.Configuration;
import org.dinky.data.model.SysConfig;
import org.dinky.data.model.SystemConfiguration;
Expand All @@ -45,8 +48,10 @@
* @since 2021/11/18
*/
@Service
@Slf4j
public class SysConfigServiceImpl extends SuperServiceImpl<SysConfigMapper, SysConfig> implements SysConfigService {


@Override
public Map<String, List<Configuration<?>>> getAll() {
return SystemConfiguration.getInstances().getAllConfiguration();
Expand Down Expand Up @@ -92,13 +97,28 @@ public void initSysConfig() {
systemConfiguration.initSetConfiguration(configMap);
}

/**
* Initialize expression variables.
*/
@Override
public void initExpressionVariables() {
SystemConfiguration systemConfiguration = SystemConfiguration.getInstances();
// to initialize expression variable class and load it into the engine context
EngineContextHolder.loadExpressionVariableClass(systemConfiguration.getExpressionVariable().getValue());
}

@Override
public void updateSysConfigByKv(String key, String value) {
SysConfig config = getOne(new QueryWrapper<SysConfig>().eq("name", key));
SysConfig config = getOne(new LambdaQueryWrapper<>(SysConfig.class).eq(SysConfig::getName, key));
config.setValue(value);
SystemConfiguration systemConfiguration = SystemConfiguration.getInstances();

systemConfiguration.setConfiguration(key, value);
config.updateById();
// if the expression variable is modified, reinitialize the expression variable
if (key.equals(systemConfiguration.getExpressionVariable().getKey())) {
log.info("The expression variable is modified, reinitialize the expression variable to the engine context.");
initExpressionVariables();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.dinky.app.flinksql;

import cn.hutool.core.lang.Dict;
import java.util.regex.Matcher;
import org.dinky.app.db.DBUtil;
import org.dinky.app.model.StatementParam;
import org.dinky.app.model.SysConfig;
Expand All @@ -27,8 +29,10 @@
import org.dinky.classloader.DinkyClassLoader;
import org.dinky.config.Dialect;
import org.dinky.constant.FlinkSQLConstant;
import org.dinky.context.EngineContextHolder;
import org.dinky.data.app.AppParamConfig;
import org.dinky.data.app.AppTask;
import org.dinky.data.constant.CommonConstant;
import org.dinky.data.enums.GatewayType;
import org.dinky.data.model.SystemConfiguration;
import org.dinky.executor.Executor;
Expand Down Expand Up @@ -92,12 +96,14 @@ public class Submitter {
private static final Logger log = LoggerFactory.getLogger(Submitter.class);
public static Executor executor = null;

private static Dict variable = Dict.create();
private static void initSystemConfiguration() throws SQLException {
SystemConfiguration systemConfiguration = SystemConfiguration.getInstances();
List<SysConfig> sysConfigList = DBUtil.getSysConfigList();
Map<String, String> configMap =
CollUtil.toMap(sysConfigList, new HashMap<>(), SysConfig::getName, SysConfig::getValue);
systemConfiguration.initSetConfiguration(configMap);
variable = systemConfiguration.initExpressionVariableList(configMap );
}

public static void submit(AppParamConfig config) throws SQLException {
Expand Down Expand Up @@ -130,6 +136,20 @@ public static void submit(AppParamConfig config) throws SQLException {
log.info("The job configuration is as follows: {}", executorConfig);

String[] statements = SqlUtil.getStatements(sql);
if (appTask.getFragment()){
log.info("The task is enable fragment (Global Variable), replace the global variable");
for (int index = 0; index < statements.length; index++) {
String currentStatement = statements[index];
Matcher matcher = CommonConstant.GLOBAL_VARIABLE_PATTERN.matcher(currentStatement);
// 如果有全局变量,需要替换
if (StringUtils.isNotBlank(currentStatement) && matcher.find()) {
String replaceVariable = executor.getVariableManager(variable).replaceVariable(currentStatement);
statements[index] = replaceVariable;
}
}
}


Optional<JobClient> jobClient = Optional.empty();
try {
if (Dialect.FLINK_JAR == appTask.getDialect()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.dinky.context;

import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.lang.Dict;
import cn.hutool.core.util.StrUtil;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.dinky.utils.StringUtil;

/**
* The EngineContextHolder
*/

@Slf4j
public class EngineContextHolder {
private static final Dict ENGINE_CONTEXT = Dict.create();

/**
* Get the engine contextload class
* @param variables the variables
* @return the class loader variable jexl class
*/
private static List<String> getClassLoaderVariableJexlClass(String variables) {
if (StrUtil.isBlank(variables)) {
log.warn("The variable is empty, please check the configuration.");
return Collections.emptyList();
}
return Arrays.stream(variables.split(",")).map(String::trim).filter(s -> !s.isEmpty()).collect(Collectors.toList());
}

/**
* Load expression variable class to the engine context
* @param variables the variables
*/
public static void loadExpressionVariableClass(String variables) {
getClassLoaderVariableJexlClass(variables).forEach(className -> {
try {
String classSimpleName =
BeanUtil.getBeanDesc(Class.forName(className)).getSimpleName();
String snakeCaseClassName = StringUtil.toSnakeCase(true, classSimpleName);
ENGINE_CONTEXT.set(snakeCaseClassName, Class.forName(className));
log.info("load class : {}", className);
} catch (ClassNotFoundException e) {
log.error(
"The class [{}] that needs to be loaded may not be loaded by dinky or there is no jar file of this class under dinky's lib/plugins/extends. Please check, and try again. {}",
className,
e.getMessage(),
e);
}
});
}

/**
* Get the engine context
* @return the engine context
*/
public static Dict getEngineContext() {
return ENGINE_CONTEXT;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

package org.dinky.data.constant;

import java.util.Arrays;
import java.util.regex.Pattern;

/**
* CommonConstant
*
Expand All @@ -31,4 +34,9 @@ public final class CommonConstant {

public static final String DINKY_APP_MAIN_CLASS = "org.dinky.app.MainApp";
public static final String LineSep = System.getProperty("line.separator");

public static final Pattern GLOBAL_VARIABLE_PATTERN = Pattern.compile("\\$\\{(.+?)}");

public static final String DEFAULT_EXPRESSION_VARIABLES = String.join(",", Arrays.asList("cn.hutool.core.date.DateUtil", "cn.hutool.core.util.IdUtil", "cn.hutool.core.util.RandomUtil", "cn.hutool.core.util.StrUtil"));

}
2 changes: 2 additions & 0 deletions dinky-common/src/main/java/org/dinky/data/enums/Status.java
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,8 @@ public enum Status {
SYS_ENV_SETTINGS_MAX_RETAIN_DAYS_NOTE(1172, "sys.env.settings.maxRetainDays.note"),
SYS_ENV_SETTINGS_MAX_RETAIN_COUNT(1173, "sys.env.settings.maxRetainCount"),
SYS_ENV_SETTINGS_MAX_RETAIN_COUNT_NOTE(1174, "sys.env.settings.maxRetainCount.note"),
SYS_ENV_SETTINGS_EXPRESSION_VARIABLE(1175, "sys.env.settings.expressionVariable"),
SYS_ENV_SETTINGS_EXPRESSION_VARIABLE_NOTE(1176, "sys.env.settings.expressionVariable.note"),

SYS_DOLPHINSCHEDULER_SETTINGS_ENABLE(118, "sys.dolphinscheduler.settings.enable"),
SYS_DOLPHINSCHEDULER_SETTINGS_ENABLE_NOTE(119, "sys.dolphinscheduler.settings.enable.note"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

package org.dinky.data.model;

import cn.hutool.core.lang.Dict;
import org.dinky.context.EngineContextHolder;
import org.dinky.data.constant.CommonConstant;
import org.dinky.data.enums.Status;
import org.dinky.data.properties.OssProperties;

Expand Down Expand Up @@ -124,6 +127,13 @@ public static Configuration.OptionBuilder key(Status status) {
.defaultValue(30)
.note(Status.SYS_ENV_SETTINGS_MAX_RETAIN_DAYS_NOTE);

// the default value is the same as the default value of the expressionVariable
private final Configuration<String> expressionVariable = key(Status.SYS_ENV_SETTINGS_EXPRESSION_VARIABLE)
.stringType()
.defaultValue(CommonConstant.DEFAULT_EXPRESSION_VARIABLES)
.note(Status.SYS_ENV_SETTINGS_EXPRESSION_VARIABLE_NOTE);


private final Configuration<Boolean> dolphinschedulerEnable = key(Status.SYS_DOLPHINSCHEDULER_SETTINGS_ENABLE)
.booleanType()
.defaultValue(false)
Expand Down Expand Up @@ -315,6 +325,16 @@ public void initSetConfiguration(Map<String, String> configMap) {
CONFIGURATION_LIST.stream().peek(Configuration::runParameterCheck).forEach(Configuration::runChangeEvent);
}

public Dict initExpressionVariableList(Map<String, String> configMap) {
CONFIGURATION_LIST.forEach(item -> {
if (item.getKey().equals(expressionVariable.getKey())) {
EngineContextHolder.loadExpressionVariableClass(configMap.get(item.getKey()));
}
});
return EngineContextHolder.getEngineContext();
}


public Map<String, List<Configuration<?>>> getAllConfiguration() {
Map<String, List<Configuration<?>>> data = new TreeMap<>();
for (Configuration<?> item : CONFIGURATION_LIST) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,8 @@ sys.env.settings.maxRetainDays=Job history max retained days
sys.env.settings.maxRetainDays.note=The maximum number of days for the history of submitted jobs and auto-registered cluster records to be retained will be automatically deleted when they expire
sys.env.settings.maxRetainCount=Job history max retained counts
sys.env.settings.maxRetainCount.note=The maximum number of submitted job histories and auto-registered cluster records will not be deleted if they are less than that number, even if the retention days have passed

sys.env.settings.expressionVariable=Expression variable list
sys.env.settings.expressionVariable.note= Used to use expression variables in task configuration, use , to separate multiple variables, need to use the fully qualified name of the class, for example: com.dinky.common.utils.DateUtils, please ensure that the class is in the classpath of Dinky
sys.dolphinscheduler.settings.enable=Whether to enable DolphinScheduler
sys.dolphinscheduler.settings.enable.note=Whether to enable DolphinScheduler. Only after enabling it can you use the related functions of DolphinScheduler. Please fill in the following configuration items first, and then enable this configuration after completion. Also: Please ensure that the related configurations of DolphinScheduler are correct.
sys.dolphinscheduler.settings.url=DolphinScheduler address
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,8 @@ sys.env.settings.maxRetainDays=作业历史最大保留天数
sys.env.settings.maxRetainDays.note=提交的作业历史与自动注册的集群记录最大保留天数,过期会被自动删除
sys.env.settings.maxRetainCount=作业历史最大保留数量
sys.env.settings.maxRetainCount.note=提交的作业历史与自动注册的集群记录最大保留数量,如果不足该数量,则不会被删除,即使已经过了最大保留天数

sys.env.settings.expressionVariable=表达式变量列表
sys.env.settings.expressionVariable.note=用于在任务配置中使用表达式变量,逗号分割,需要使用类的全限定名,例如: com.dinky.common.utils.DateUtils,请确保类在Dinky的classpath中
sys.dolphinscheduler.settings.enable=是否启用 DolphinScheduler
sys.dolphinscheduler.settings.enable.note=是否启用 DolphinScheduler ,启用后才能使用 DolphinScheduler 的相关功能,请先填写下列配置项,完成后再开启此项配置, 另:请确保 DolphinScheduler 的相关配置正确
sys.dolphinscheduler.settings.url=DolphinScheduler 地址
Expand Down
4 changes: 4 additions & 0 deletions dinky-core/src/main/java/org/dinky/executor/Executor.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.dinky.executor;

import cn.hutool.core.lang.Dict;
import org.dinky.assertion.Asserts;
import org.dinky.classloader.DinkyClassLoader;
import org.dinky.context.CustomTableEnvironmentContext;
Expand Down Expand Up @@ -100,6 +101,9 @@ public VariableManager getVariableManager() {
return variableManager;
}

public VariableManager getVariableManager( Dict context) {
return new VariableManager(context);
}
public boolean isUseSqlFragment() {
return executorConfig.isUseSqlFragment();
}
Expand Down
Loading

0 comments on commit 95673a8

Please sign in to comment.