Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-37510] Avoid duplicate calculations for SlidingEventTimeWindows #26319

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

beliefer
Copy link
Contributor

@beliefer beliefer commented Mar 19, 2025

What is the purpose of the change

This PR aim to avoid duplicate calculations for SlidingEventTimeWindows.
The original code put the expression timestamp - size into the loop, so it causes duplicate calculation.
We should put them out of the loop and just calculate once.

Brief change log

Avoid duplicate calculations for SlidingEventTimeWindows

Verifying this change

This change is already covered by existing tests, such as (SlidingEventTimeWindowsTest).

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)

@flinkbot
Copy link
Collaborator

flinkbot commented Mar 19, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@nilmadhab
Copy link
Contributor

LGTM

@beliefer
Copy link
Contributor Author

@@ -71,7 +71,8 @@ public Collection<TimeWindow> assignWindows(
if (timestamp > Long.MIN_VALUE) {
List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide);
for (long start = lastStart; start > timestamp - size; start -= slide) {
long lower = timestamp - size;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we add final to this and long lastStart

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

Copy link
Member

@1996fanrui 1996fanrui left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, thanks for the improvement!

LGTM

@1996fanrui 1996fanrui self-assigned this Mar 20, 2025
Copy link
Contributor

@snuyanzin snuyanzin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably I missed something
however can you please describe what is the issue here and how we could check that it is gone after this commit?

@beliefer
Copy link
Contributor Author

Probably I missed something however can you please describe what is the issue here and how we could check that it is gone after this commit?

It's obviously. I have added the description.

@snuyanzin
Copy link
Contributor

snuyanzin commented Mar 24, 2025

It's obviously. I have added the description.

Thanks for updating the description
however at least for me it is not obvious even after that update.

I tried to see the difference with help of jmh
like

@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@State(Scope.Benchmark)
@Fork(value = 2, jvmArgs = {"-Xms2G", "-Xmx2G"})
public class FlinkTest {

    public static void main(String[] args) throws RunnerException {

        Options opt = new OptionsBuilder()
                .include(FlinkTest.class.getSimpleName())
                .build();

        new Runner(opt).run();
    }

    @State(Scope.Thread)
    public static class MyState {
        public long offset = 10000;
        public long timestamp = 2000;
        public long size = 1234;
        public long slide = 12;
    }

    @Benchmark
    @BenchmarkMode(Mode.Throughput)
    public void current(Blackhole blackhole, MyState state) {
        long offset = state.offset;
        long timestamp = state.timestamp;
        long size = state.size;
        long slide = state.slide;
        List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
        long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide);
        for (long start = lastStart; start > timestamp - size; start -= slide) {
            windows.add(new TimeWindow(start, start + size));
        }
        blackhole.consume(windows);
    }

    @Benchmark
    @BenchmarkMode(Mode.Throughput)
    public void improved(Blackhole blackhole, MyState state) {
        long offset = state.offset;
        long timestamp = state.timestamp;
        long size = state.size;
        long slide = state.slide;
        List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
        final long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide);
        final long lower = timestamp - size;
        for (long start = lastStart; start > lower; start -= slide) {
            windows.add(new TimeWindow(start, start + size));
        }
        blackhole.consume(windows);
    }

}

my results
for jdk17

 Benchmark            Mode  Cnt     Score    Error   Units
 FlinkTest.current   thrpt   10  2309.684 ± 29.933  ops/ms
 FlinkTest.improved  thrpt   10  2296.488 ± 47.739  ops/ms

for jdk21

 Benchmark            Mode  Cnt     Score    Error   Units
 FlinkTest.current   thrpt   10  2257.881 ± 24.168  ops/ms
 FlinkTest.improved  thrpt   10  2270.597 ± 13.188  ops/ms

so I don't see any significant difference (probably either this optimization has been already done by jdk itself or the rest of the code consumes much more time so the improvement brings so low benefit that it could be neglected)

So the question is still same from my side: how can we check that this PR fixes something?

Or did I miss anything?

@beliefer
Copy link
Contributor Author

@snuyanzin The benchmark ignored some questions. Obviously, the overhead of minus is very small. Otherwise, we should doubt the availability of JAVA. Second, The JVM will optimize it after interpretation.

@beliefer
Copy link
Contributor Author

On the other hand, avoid duplicate expressions is always a best practices know for anybody.

@beliefer
Copy link
Contributor Author

ping @aljoscha @StephanEwen

@beliefer
Copy link
Contributor Author

beliefer commented Apr 1, 2025

@snuyanzin Could you have time to review again ? This Improvement is just avoid the duplicate calculation which is not have a very strong improvement in performance.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants