Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scala products are used but Scala API is not on the classpath #177

Open
xandril opened this issue Nov 14, 2024 · 13 comments
Open

Scala products are used but Scala API is not on the classpath #177

xandril opened this issue Nov 14, 2024 · 13 comments

Comments

@xandril
Copy link

xandril commented Nov 14, 2024

Hello! I am not entirely sure if this relates to the flink-scala-api, but I don't know who else can help with the interaction between Flink and Scala.
Currently, I am trying to integrate flink-scala-api using Scala 2.13.
As a base image, I am using https://hub.docker.com/layers/apache/flink/1.20-scala_2.12-java8/images/sha256-37750cd7c7f1f33f295cb9415393276b343c72200860863024973466b54cac03?context=explore - this Docker image.
In my own image, I remove flink-scala2.12-1.20.0.jar and add scala-library-2.13.15.jar in lib directory, then in the configuration classloader.parent-first-patterns.default, I remove 'scala.' As a result, when attempting to submit a jar with Scala code, I get an exception with the message: "Scala products are used but Scala API is not on the classpath."

Can you suggest what I might be missing in this scenario? @novakov-alexey

@novakov-alexey
Copy link
Collaborator

@xandril I think this is because there is no flink-scala-api in the Flink classpath. The error probably comes from here: https://github.com/apache/flink/blob/release-1.20/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessorFactory.java#L100

Did you add this library into your job classpath? If yes, the error "Scala products are used but Scala API is not on the classpath" should disappear then.

@novakov-alexey
Copy link
Collaborator

@xandril the issue is solved? Did you try my suggestion ?

@xandril
Copy link
Author

xandril commented Nov 26, 2024

Sorry for taking so long to respond. Yes, I tried to add it to the classpath in various ways and found this thread: https://github.com/flink-extended/flink-scala-api/issues?q=classpath and tried all the options from it, but the problem remained. I'll prepare a Docker container and repository closer to the weekend (probably on Saturday) to have some reproducibility."

@novakov-alexey
Copy link
Collaborator

I see. Perhaps an example based on the Ververica Flink image would be useful:

FROM registry.ververica.com/v2.13/flink:1.19.0-stream2-scala_2.12-java11
RUN rm /flink/lib/flink-scala_2.12-1.19.0-stream2.jar
# Note: if you plan to use Scala 3, it also requires scala-library 2.13.x due to some deep dependencies between Scalas
ADD https://repo1.maven.org/maven2/org/scala-lang/scala3-library_3/3.6.1/scala3-library_3-3.6.1.jar /flink/lib/
ADD https://repo1.maven.org/maven2/org/scala-lang/scala-library/2.13.15/scala-library-2.13.15.jar /flink/lib/

@xandril
Copy link
Author

xandril commented Nov 30, 2024

@novakov-alexey I uploaded a repository with a simple job - https://github.com/[xandril/scala-flink-playground](https://github.com/xandril/scala-flink-playground), built it using Gradle, and also prepared a ready-to-use image based on your
example with Ververica Flink and uploaded it to github packages - https://github.com/xandril/scala-flink-playground/pkgs/container/scala-flink-1.20.0-test.

When I run a local cluster inside the container and submit a job via flink run ./usrlib/test_job.jar, I get the following exception

WARNING: Unknown module: jdk.compiler specified to --add-exports
WARNING: Unknown module: jdk.compiler specified to --add-exports
WARNING: Unknown module: jdk.compiler specified to --add-exports
WARNING: Unknown module: jdk.compiler specified to --add-exports
WARNING: Unknown module: jdk.compiler specified to --add-exports

------------------------------------------------------------
 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Scala products are used but Scala API is not on the classpath.
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:373)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:223)
        at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:113)
        at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:1026)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:247)
        at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1270)
        at org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$10(CliFrontend.java:1367)
        at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
        at org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1367)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1335)
Caused by: java.lang.IllegalStateException: Scala products are used but Scala API is not on the classpath.
        at org.apache.flink.streaming.util.typeutils.FieldAccessorFactory.getAccessor(FieldAccessorFactory.java:99)
        at org.apache.flink.streaming.api.functions.aggregation.SumAggregator.<init>(SumAggregator.java:41)
        at org.apache.flinkx.api.KeyedStream.aggregate(KeyedStream.scala:387)
        at org.apache.flinkx.api.KeyedStream.sum(KeyedStream.scala:317)
        at org.example.WordCount$.main(WordCount.scala:20)
        at org.example.WordCount.main(WordCount.scala)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
        at java.base/java.lang.reflect.Method.invoke(Unknown Source)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:356)
        ... 9 more

@novakov-alexey
Copy link
Collaborator

novakov-alexey commented Nov 30, 2024

Thanks. I can reproduce the error as well.

Although flink-scala-api is inside the fat JAR, "flink run" and its way of launching the Flink job can't load necessary class such as DefaultScalaProductFieldAccessorFactory.

Here is what I believe why:

"flink run" submits job to the existing standalone/session cluster, if the last one is available around. It must be using the Session Mode.

Solution:

In Session Mode, you need to have all dependencies in the Flink cluster. You need to upload the library JAR https://central.sonatype.com/artifact/org.flinkextended/flink-scala-api_2.13/1.20.0_1.2.0 to the flink/lib folder. Then it will work. I expected that this class DefaultScalaProductFieldAccessorFactory would be loaded from the usr JAR as well, but it seems a package prefix "org/apache/flink" makes Flink to use parent classloader, i.e. load everything from the flink/lib.

My approach:
I usually use Application Mode with Ververica Platform. In this mode the fat JAR approach works fine. Especially if you use Kubernetes with a Flink image which contains your user JAR with all dependencies inside.

What I do not understand, why you need to run flink run via Docker image? Is it just for showing the problem here or you are going to use this Docker image further in some Flink platform?

@xandril
Copy link
Author

xandril commented Nov 30, 2024

Yes, I use the flink run command via a Docker image only to reproduce and demonstrate the error. The problem also occurs when using application mode and deploying in Kubernetes in native mode. I tested it at work, in a company test environment K8s cluster with image based on apacahe/flink-1.20.0

@xandril
Copy link
Author

xandril commented Nov 30, 2024

I also tried to add the Scala library and flink-scala-api to the lib folder, but I got an error which usually appears when multiple Scala versions are mixed up - missing collection or Seq stuff

@novakov-alexey
Copy link
Collaborator

If you add flink-scala-api and scala 2.13 to the lib folder AND remove scala 2.12 from the same lib folder then it should work.
Pleas check locally using Flink standalone cluster with single Job Manager and single Task Manager.

@novakov-alexey
Copy link
Collaborator

 > flink run -c com.example.wordCount target/scala-3.3.3/word-count-assembly-0.1.0-SNAPSHOT.jar
Job has been submitted with JobID 714586f237bcd31fa03ae1f78a46ea9f
Program execution finished
Job with JobID 714586f237bcd31fa03ae1f78a46ea9f has finished.
Job Runtime: 686 ms
Screenshot 2024-11-30 at 23 02 06
% ls ../flink/current/lib
flink-cep-1.18.1.jar			flink-dist-1.18.1.jar			flink-table-api-java-uber-1.18.1.jar	log4j-1.2-api-2.17.1.jar		log4j-slf4j-impl-2.17.1.jar
flink-connector-files-1.18.1.jar	flink-json-1.18.1.jar			flink-table-planner-loader-1.18.1.jar	log4j-api-2.17.1.jar			scala-library-2.13.15.jar
flink-csv-1.18.1.jar			flink-scala-api_3-1.18.1_1.2.0.jar	flink-table-runtime-1.18.1.jar		log4j-core-2.17.1.jar			scala3-library_3-3.6.1.jar

@novakov-alexey
Copy link
Collaborator

novakov-alexey commented Nov 30, 2024

Screenshot 2024-11-30 at 23 05 02

@xandril
Copy link
Author

xandril commented Dec 2, 2024

I'll try to submit the job with a Docker Compose session cluster and attach the Docker Compose file if I encounter any issues to reproduce. Also, could it be a problem that I'm not using the Ververica Flink image?

@novakov-alexey
Copy link
Collaborator

I would try first without Docker just using your local computer with single Job and Task Manager. This should be enough to test serialization of data between the job tasks.
No, I do not think that Ververica Flink image makes a difference.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants