Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BugFix][Jar] Fix Flink jar submission and set parallelism failure issue #3165

Merged
merged 4 commits into from
Feb 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.io.File;
import java.util.Optional;

import cn.hutool.core.convert.Convert;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.lang.Opt;
import cn.hutool.core.util.StrUtil;
Expand Down Expand Up @@ -91,7 +92,10 @@ public static StreamGraph getStreamGraph(JarSubmitParam submitParam, CustomTable
.setSavepointRestoreSettings(savepointRestoreSettings)
.setArguments(RunTimeUtil.handleCmds(submitParam.getArgs()))
.build();
Pipeline pipeline = PackagedProgramUtils.getPipelineFromProgram(program, configuration, 1, true);
int parallelism = StrUtil.isNumeric(submitParam.getParallelism())
? Convert.toInt(submitParam.getParallelism())
: tEnv.getStreamExecutionEnvironment().getParallelism();
Pipeline pipeline = PackagedProgramUtils.getPipelineFromProgram(program, configuration, parallelism, true);
program.close();
Assert.isTrue(pipeline instanceof StreamGraph, "can not translate");
return (StreamGraph) pipeline;
Expand Down
10 changes: 5 additions & 5 deletions docs/docs/extend/expand_statements/execute_jar.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ title: EXECUTE JAR

如果使用 `Checkpoint` 或 `Savepoint` ,请在右边作业,选择 `Savepoint策略`,其次检查点 跳过 请使用 execution.savepoint.ignore-unclaimed-state: true 参数控制


此flink sql jar任务支持 `set` 和 `add customjar` 联动使用
:::

## 语法结构
Expand All @@ -23,8 +25,7 @@ title: EXECUTE JAR
EXECUTE JAR WITH (
'uri'='<jar_path>.jar', -- 该参数 必填
'main-class'='<main_class>', -- 该参数 必填
'args'='<args>', -- 主类入参 该参数可选
'parallelism'='<parallelism>', -- 任务并行度 该参数可选
'args'='<args>' -- 主类入参 该参数可选
);

```
Expand All @@ -35,15 +36,14 @@ EXECUTE JAR WITH (
EXECUTE JAR WITH (
'uri'='rs:/jar/flink/demo/SocketWindowWordCount.jar',
'main-class'='org.apache.flink.streaming.examples.socket',
'args'=' --hostname localhost ',
'parallelism'=''
'args'=' --hostname localhost '
);
```
:::warning 注意

1. 以上示例中, uri 的值为 rs:/jar/flink/demo/SocketWindowWordCount.jar, 该值为资源中心中的资源路径,
请确保资源中心中存在该资源,请忽略资源中心 Root 节点(该节点为虚拟节点)
2. 如果要读取S3,HDFS,LCOAL等存储上面的文件均可通过rs协议进行桥接使用,请参考 [资源管理](../../user_guide/register_center/resource) 中 rs 协议使用方式
2. 如果要读取S3,HDFS,LOCAL等存储上面的文件均可通过rs协议进行桥接使用,请参考 [资源管理](../../user_guide/register_center/resource) 中 rs 协议使用方式
:::

## PyFlink 任务提交:
Expand Down
Loading