Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Running multiple pydeequ checks at the same time in emr cluster OSError: [Errno 98] Address already in use :(127.0.0.1:25334) #173

Open
Ashokgoa opened this issue Nov 17, 2023 · 4 comments
Assignees

Comments

@Ashokgoa
Copy link

Describe the bug
How did you handle this issue if you run many spark jobs which is running pydeequ checks at the same time?

in my case only one job is running rest of the other jobs are failing with this issue when the other jobs are running in the emr cluster.

Traceback (most recent call last):
File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 2207, in start
OSError: [Errno 98] Address already in use

During handling of the above exception, another exception occurred: Any solutions or suggestions

Traceback (most recent call last):
File "/opt/ammar/pydeequ_poc_pyspark.py", line 26, in
_check = _check_func(*_args)
File "/usr/local/lib/python3.7/site-packages/pydeequ/checks.py", line 134, in hasSize
assertion_func = ScalaFunction1(self._spark_session.sparkContext._gateway, assertion)
File "/usr/local/lib/python3.7/site-packages/pydeequ/scala_utils.py", line 32, in init
super().init(gateway)
File "/usr/local/lib/python3.7/site-packages/pydeequ/scala_utils.py", line 16, in init
self.gateway.start_callback_server()
File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1894, in start_callback_server
File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 2216, in start
py4j.protocol.Py4JNetworkError: An error occurred while trying to start the callback server (127.0.0.1:25334)
21/11/19 13:44:06 INFO SparkContext: Invoking stop() from shutdown hookcise description of what the bug is.

To Reproduce
Steps to reproduce the behavior:
Run multiple pydeequ checks at the same time in an emr cluster

@chenliu0831
Copy link
Contributor

Please provide a minimal code snippet if possible. Thank you

@Ashokgoa
Copy link
Author

Ashokgoa commented Dec 1, 2023

Hi Chenliu,

currenlty i have multiple airlfow jobs which are triggered at the same time for different spark jobs to emr cluster

in each of these airflow dags we added a step for data quality checks and in some cases when these data quality checks are running at the same time in cluster. the error occurs
this piece of code:
check = Check(spark, CheckLevel.Error, "Integrity checks")

checkResult = VerificationSuite(spark)
.onData(df)
.addCheck(
check
.isComplete("SubID")
.hasCompleteness("ErrorCode_",lambda x: x < 0.2)
.hasDataType("JobID",ConstrainableDataTypes.Integral)
.hasMaxLength("JobID",lambda x: x == 8)
.hasDataType("ListID",ConstrainableDataTypes.Integral)
.containsEmail("Emailaddr")
.hasMaxLength("ListID",lambda x: x == 5))
.useRepository(repository)
.saveOrAppendResult(resultKey)
.run()

most of the jobs fail with error:
Traceback (most recent call last):
File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 2207, in start
OSError: [Errno 98] Address already in use

i can understand that the pydeeque always listens to 25334 port which causing other jobs to fail as they run same time.

please refer to this #19

@chenliu0831
Copy link
Contributor

Thanks for providing more details and the related issue, will research this a bit.

@chenliu0831 chenliu0831 self-assigned this Dec 2, 2023
@Ashokgoa
Copy link
Author

Ashokgoa commented Dec 4, 2023

can we implement this to make the dq to use next available not 25334
https://www.py4j.org/advanced_topics.html#using-py4j-without-pre-determined-ports-dynamic-port-number

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants