From 5f4408c7b8a1f0a929af39aa852141b4cefa3f2a Mon Sep 17 00:00:00 2001 From: Jhonatas Rosendo Date: Thu, 17 Oct 2024 11:12:38 -0300 Subject: [PATCH 01/11] Ignoring running events emission. Retrieveing jonName directly on NuFacet instead of sql execution context --- .../SparkApplicationExecutionContext.java | 13 ++- .../lifecycle/SparkSQLExecutionContext.java | 104 +++++++++++------- .../spark/agent/facets/NuFacet.java | 23 +++- 3 files changed, 93 insertions(+), 47 deletions(-) diff --git a/integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/SparkApplicationExecutionContext.java b/integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/SparkApplicationExecutionContext.java index 04b880fe65..205f85bf63 100644 --- a/integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/SparkApplicationExecutionContext.java +++ b/integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/SparkApplicationExecutionContext.java @@ -94,8 +94,11 @@ public void start(SparkListenerApplicationStart applicationStart) { .event(applicationStart) .build()); - log.debug("Posting event for applicationId {} start: {}", applicationId, event); - eventEmitter.emit(event); + + log.info("OpenLineage APPLICATION event has no lineage value an will not be emmited"); + +// log.debug("Posting event for applicationId {} start: {}", applicationId, event); +// eventEmitter.emit(event); } @Override @@ -125,8 +128,10 @@ public void end(SparkListenerApplicationEnd applicationEnd) { .event(applicationEnd) .build()); - log.debug("Posting event for applicationId {} end: {}", applicationId, event); - eventEmitter.emit(event); + log.info("OpenLineage APPLICATION event has no lineage value an will not be emmited"); + +// log.debug("Posting event for applicationId {} end: {}", applicationId, event); +// eventEmitter.emit(event); } private OpenLineage.ParentRunFacet buildApplicationParentFacet() { diff --git a/integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/SparkSQLExecutionContext.java b/integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/SparkSQLExecutionContext.java index 6a95c45d68..61cc9a69e0 100644 --- a/integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/SparkSQLExecutionContext.java +++ b/integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/SparkSQLExecutionContext.java @@ -28,14 +28,7 @@ import java.util.Stack; import java.util.concurrent.atomic.AtomicBoolean; import lombok.extern.slf4j.Slf4j; -import org.apache.spark.scheduler.ActiveJob; -import org.apache.spark.scheduler.JobFailed; -import org.apache.spark.scheduler.SparkListenerApplicationEnd; -import org.apache.spark.scheduler.SparkListenerApplicationStart; -import org.apache.spark.scheduler.SparkListenerJobEnd; -import org.apache.spark.scheduler.SparkListenerJobStart; -import org.apache.spark.scheduler.SparkListenerStageCompleted; -import org.apache.spark.scheduler.SparkListenerStageSubmitted; +import org.apache.spark.scheduler.*; import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; import org.apache.spark.sql.execution.QueryExecution; import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd; @@ -88,30 +81,35 @@ public void start(SparkListenerSQLExecutionStart startEvent) { "OpenLineage received Spark event that is configured to be skipped: SparkListenerSQLExecutionStart"); // return; } - +// olContext.setJobNurn(olContext.getSparkSession().get().conf().get("spark.job.name")); olContext.setActiveJobId(activeJobId); // We shall skip this START event, focusing on the first SparkListenerJobStart event to be the START, because of the presence of the job nurn // only one START event is expected, in case it was already sent with jobStart, we send running - // EventType eventType = emittedOnJobStart ? RUNNING : START; - // emittedOnSqlExecutionStart = true; - -// RunEvent event = -// runEventBuilder.buildRun( -// OpenLineageRunEventContext.builder() -// .applicationParentRunFacet(buildApplicationParentFacet()) -// .event(startEvent) -// .runEventBuilder( -// olContext -// .getOpenLineage() -// .newRunEventBuilder() -// .eventTime(toZonedTime(startEvent.time())) -// .eventType(eventType)) -// .jobBuilder(buildJob()) -// .jobFacetsBuilder(getJobFacetsBuilder(olContext.getQueryExecution().get())) -// .build()); - - // log.debug("Posting event for start {}: {}", executionId, event); - // eventEmitter.emit(event); + EventType eventType = emittedOnJobStart ? RUNNING : START; + emittedOnSqlExecutionStart = true; + + RunEvent event = + runEventBuilder.buildRun( + OpenLineageRunEventContext.builder() + .applicationParentRunFacet(buildApplicationParentFacet()) + .event(startEvent) + .runEventBuilder( + olContext + .getOpenLineage() + .newRunEventBuilder() + .eventTime(toZonedTime(startEvent.time())) + .eventType(eventType)) + .jobBuilder(buildJob()) + .jobFacetsBuilder(getJobFacetsBuilder(olContext.getQueryExecution().get())) + .build()); + + if (RUNNING.equals(event.getEventType())) { + log.info("OpenLineage event is not emitted because the job is still running"); + return; + } + + log.debug("Posting event for start {}: {}", executionId, event); + eventEmitter.emit(event); } @Override @@ -131,7 +129,7 @@ public void end(SparkListenerSQLExecutionEnd endEvent) { "OpenLineage received Spark event that is configured to be skipped: SparkListenerSQLExecutionEnd"); // return; } - +// olContext.setJobNurn(olContext.getSparkSession().get().conf().get("spark.job.name")); // only one COMPLETE event is expected, verify if jobEnd was not emitted EventType eventType; if (emittedOnJobStart && !emittedOnJobEnd) { @@ -157,9 +155,15 @@ public void end(SparkListenerSQLExecutionEnd endEvent) { .jobFacetsBuilder(getJobFacetsBuilder(olContext.getQueryExecution().get())) .build()); + if (RUNNING.equals(event.getEventType())) { + log.info("OpenLineage event is not emitted because the job is still running"); + return; + } + if (log.isDebugEnabled()) { log.debug("Posting event for end {}: {}", executionId, OpenLineageClientUtils.toJson(event)); } + eventEmitter.emit(event); } @@ -174,7 +178,7 @@ public void start(SparkListenerStageSubmitted stageSubmitted) { "OpenLineage received Spark event that is configured to be skipped: SparkListenerStageSubmitted"); return; } - +// olContext.setJobNurn(olContext.getSparkSession().get().conf().get("spark.job.name")); RunEvent event = runEventBuilder.buildRun( OpenLineageRunEventContext.builder() @@ -190,8 +194,9 @@ public void start(SparkListenerStageSubmitted stageSubmitted) { .jobFacetsBuilder(getJobFacetsBuilder(olContext.getQueryExecution().get())) .build()); - log.debug("Posting event for stage submitted {}: {}", executionId, event); - eventEmitter.emit(event); + log.info("OpenLineage event is not emitted because the job is still running"); +// log.debug("Posting event for stage submitted {}: {}", executionId, event); +// eventEmitter.emit(event); } // TODO: not invoked until https://github.com/OpenLineage/OpenLineage/issues/470 is completed @@ -205,6 +210,7 @@ public void end(SparkListenerStageCompleted stageCompleted) { "OpenLineage received Spark event that is configured to be skipped: SparkListenerStageCompleted"); return; } +// olContext.setJobNurn(olContext.getSparkSession().get().conf().get("spark.job.name")); RunEvent event = runEventBuilder.buildRun( OpenLineageRunEventContext.builder() @@ -220,8 +226,9 @@ public void end(SparkListenerStageCompleted stageCompleted) { .jobFacetsBuilder(getJobFacetsBuilder(olContext.getQueryExecution().get())) .build()); - log.debug("Posting event for stage completed {}: {}", executionId, event); - eventEmitter.emit(event); + log.info("OpenLineage event is not emitted because the job is still running"); +// log.debug("Posting event for stage completed {}: {}", executionId, event); +// eventEmitter.emit(event); } @Override @@ -244,12 +251,13 @@ public void setActiveJob(ActiveJob activeJob) { @Override public void start(SparkListenerJobStart jobStart) { log.debug("SparkListenerJobStart - executionId: {}", executionId); - try { - jobName = jobStart.properties().getProperty("spark.job.name"); - } catch (RuntimeException e) { - log.info("spark.job.name property not found in the context"); - } - olContext.setJobNurn(jobName); +// try { +// jobName = jobStart.properties().getProperty("spark.job.name"); +// } catch (RuntimeException e) { +// log.info("spark.job.name property not found in the context"); +// } + +// olContext.setJobNurn(olContext.getSparkSession().get().conf().get("spark.job.name")); if (!olContext.getQueryExecution().isPresent()) { log.info(NO_EXECUTION_INFO, olContext); return; @@ -262,7 +270,7 @@ public void start(SparkListenerJobStart jobStart) { // only one START event is expected, in case it was already sent with sqlExecutionStart, we send // running EventType eventType = emittedOnSqlExecutionStart ? RUNNING : START; - emittedOnSqlExecutionStart = true; +// emittedOnSqlExecutionStart = true; emittedOnJobStart = true; RunEvent event = @@ -280,6 +288,11 @@ public void start(SparkListenerJobStart jobStart) { .jobFacetsBuilder(getJobFacetsBuilder(olContext.getQueryExecution().get())) .build()); + if (RUNNING.equals(event.getEventType())) { + log.info("OpenLineage event is not emitted because the job is still running"); + return; + } + log.debug("Posting event for start {}: {}", executionId, event); eventEmitter.emit(event); } @@ -302,6 +315,8 @@ public void end(SparkListenerJobEnd jobEnd) { // return; } +// olContext.setJobNurn(olContext.getSparkSession().get().conf().get("spark.job.name")); + // only one COMPLETE event is expected, EventType eventType; if (jobEnd.jobResult() instanceof JobFailed) { @@ -329,6 +344,11 @@ public void end(SparkListenerJobEnd jobEnd) { .jobFacetsBuilder(getJobFacetsBuilder(olContext.getQueryExecution().get())) .build()); + if (RUNNING.equals(event.getEventType())) { + log.info("OpenLineage event is not emitted because the job is still running"); + return; + } + log.debug("Posting event for end {}: {}", executionId, event); eventEmitter.emit(event); } diff --git a/integration/spark/shared/src/main/java/io/openlineage/spark/agent/facets/NuFacet.java b/integration/spark/shared/src/main/java/io/openlineage/spark/agent/facets/NuFacet.java index 8702c98a00..f92e5db803 100644 --- a/integration/spark/shared/src/main/java/io/openlineage/spark/agent/facets/NuFacet.java +++ b/integration/spark/shared/src/main/java/io/openlineage/spark/agent/facets/NuFacet.java @@ -8,13 +8,18 @@ import com.fasterxml.jackson.annotation.JsonProperty; import io.openlineage.client.OpenLineage; import io.openlineage.spark.agent.Versions; + +import java.util.NoSuchElementException; import java.util.Properties; import lombok.Getter; import lombok.NonNull; import io.openlineage.spark.api.OpenLineageContext; +import lombok.extern.slf4j.Slf4j; +import org.apache.spark.sql.SparkSession; /** Captures information related to the Apache Spark job. */ @Getter +@Slf4j public class NuFacet extends OpenLineage.DefaultRunFacet { // @JsonProperty("jobId") // @NonNull @@ -26,8 +31,24 @@ public class NuFacet extends OpenLineage.DefaultRunFacet { @JsonProperty("jobNurn") private String jobNurn; + private String fetchJobNurn(OpenLineageContext olContext) { + if (olContext.getSparkSession().isPresent()) { + SparkSession sparkSession = olContext.getSparkSession().get(); + try { + return sparkSession.conf().get("spark.job.name"); + } catch (NoSuchElementException e) { + log.warn("spark.job.name property not found in the context"); + return null; + } + } + + log.warn("spark.job.name property not found because the SparkContext could not be retrieved from OpenLineageContext"); + return null; + } + public NuFacet(@NonNull OpenLineageContext olContext) { super(Versions.OPEN_LINEAGE_PRODUCER_URI); - this.jobNurn = olContext.getJobNurn(); + this.jobNurn = fetchJobNurn(olContext); +// this.jobNurn = olContext.getJobNurn(); } } From 88be3b102840388771b1d11d5b90b7e7241d484d Mon Sep 17 00:00:00 2001 From: Jhonatas Rosendo Date: Thu, 17 Oct 2024 17:51:39 -0300 Subject: [PATCH 02/11] Preventing emission of RUNNING events. Preventing emission of events with APPLICATION job type. Centralizing how the job nurn is fetched in NuFacet with SparkSession property --- .../lifecycle/SparkSQLExecutionContext.java | 16 ---------------- .../openlineage/spark/agent/facets/NuFacet.java | 1 - .../spark/api/OpenLineageContext.java | 6 ------ 3 files changed, 23 deletions(-) diff --git a/integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/SparkSQLExecutionContext.java b/integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/SparkSQLExecutionContext.java index 61cc9a69e0..69a437a200 100644 --- a/integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/SparkSQLExecutionContext.java +++ b/integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/SparkSQLExecutionContext.java @@ -43,7 +43,6 @@ class SparkSQLExecutionContext implements ExecutionContext { private static final String SPARK_PROCESSING_TYPE_BATCH = "BATCH"; private static final String SPARK_PROCESSING_TYPE_STREAMING = "STREAMING"; private final long executionId; - private String jobName; private final OpenLineageContext olContext; private final EventEmitter eventEmitter; private final OpenLineageRunEventBuilder runEventBuilder; @@ -81,9 +80,7 @@ public void start(SparkListenerSQLExecutionStart startEvent) { "OpenLineage received Spark event that is configured to be skipped: SparkListenerSQLExecutionStart"); // return; } -// olContext.setJobNurn(olContext.getSparkSession().get().conf().get("spark.job.name")); olContext.setActiveJobId(activeJobId); - // We shall skip this START event, focusing on the first SparkListenerJobStart event to be the START, because of the presence of the job nurn // only one START event is expected, in case it was already sent with jobStart, we send running EventType eventType = emittedOnJobStart ? RUNNING : START; emittedOnSqlExecutionStart = true; @@ -129,7 +126,6 @@ public void end(SparkListenerSQLExecutionEnd endEvent) { "OpenLineage received Spark event that is configured to be skipped: SparkListenerSQLExecutionEnd"); // return; } -// olContext.setJobNurn(olContext.getSparkSession().get().conf().get("spark.job.name")); // only one COMPLETE event is expected, verify if jobEnd was not emitted EventType eventType; if (emittedOnJobStart && !emittedOnJobEnd) { @@ -178,7 +174,6 @@ public void start(SparkListenerStageSubmitted stageSubmitted) { "OpenLineage received Spark event that is configured to be skipped: SparkListenerStageSubmitted"); return; } -// olContext.setJobNurn(olContext.getSparkSession().get().conf().get("spark.job.name")); RunEvent event = runEventBuilder.buildRun( OpenLineageRunEventContext.builder() @@ -210,7 +205,6 @@ public void end(SparkListenerStageCompleted stageCompleted) { "OpenLineage received Spark event that is configured to be skipped: SparkListenerStageCompleted"); return; } -// olContext.setJobNurn(olContext.getSparkSession().get().conf().get("spark.job.name")); RunEvent event = runEventBuilder.buildRun( OpenLineageRunEventContext.builder() @@ -251,13 +245,6 @@ public void setActiveJob(ActiveJob activeJob) { @Override public void start(SparkListenerJobStart jobStart) { log.debug("SparkListenerJobStart - executionId: {}", executionId); -// try { -// jobName = jobStart.properties().getProperty("spark.job.name"); -// } catch (RuntimeException e) { -// log.info("spark.job.name property not found in the context"); -// } - -// olContext.setJobNurn(olContext.getSparkSession().get().conf().get("spark.job.name")); if (!olContext.getQueryExecution().isPresent()) { log.info(NO_EXECUTION_INFO, olContext); return; @@ -270,7 +257,6 @@ public void start(SparkListenerJobStart jobStart) { // only one START event is expected, in case it was already sent with sqlExecutionStart, we send // running EventType eventType = emittedOnSqlExecutionStart ? RUNNING : START; -// emittedOnSqlExecutionStart = true; emittedOnJobStart = true; RunEvent event = @@ -315,8 +301,6 @@ public void end(SparkListenerJobEnd jobEnd) { // return; } -// olContext.setJobNurn(olContext.getSparkSession().get().conf().get("spark.job.name")); - // only one COMPLETE event is expected, EventType eventType; if (jobEnd.jobResult() instanceof JobFailed) { diff --git a/integration/spark/shared/src/main/java/io/openlineage/spark/agent/facets/NuFacet.java b/integration/spark/shared/src/main/java/io/openlineage/spark/agent/facets/NuFacet.java index f92e5db803..6863fb1224 100644 --- a/integration/spark/shared/src/main/java/io/openlineage/spark/agent/facets/NuFacet.java +++ b/integration/spark/shared/src/main/java/io/openlineage/spark/agent/facets/NuFacet.java @@ -49,6 +49,5 @@ private String fetchJobNurn(OpenLineageContext olContext) { public NuFacet(@NonNull OpenLineageContext olContext) { super(Versions.OPEN_LINEAGE_PRODUCER_URI); this.jobNurn = fetchJobNurn(olContext); -// this.jobNurn = olContext.getJobNurn(); } } diff --git a/integration/spark/shared/src/main/java/io/openlineage/spark/api/OpenLineageContext.java b/integration/spark/shared/src/main/java/io/openlineage/spark/api/OpenLineageContext.java index 4d678f1925..b25b15bfe9 100644 --- a/integration/spark/shared/src/main/java/io/openlineage/spark/api/OpenLineageContext.java +++ b/integration/spark/shared/src/main/java/io/openlineage/spark/api/OpenLineageContext.java @@ -132,12 +132,6 @@ public String getSparkVersion() { */ @Getter @Setter String jobName; - /** - * Job nurn is collected during the Spark runs, and stored for creating a custom facet within - * the Run facets. It should help us to enhance the events further in the lineage pipeline. - */ - @Getter @Setter String jobNurn; - @Setter Integer activeJobId; public Optional getActiveJobId() { From bbfce353c8d8e9b4a7cffbb9e196b6738e4b001f Mon Sep 17 00:00:00 2001 From: Jhonatas Rosendo Date: Thu, 17 Oct 2024 19:55:20 -0300 Subject: [PATCH 03/11] Emitting only events whose job name contain either .execute_insert_into_hadoop_fs_relation_command. or .adaptive_spark_plan. as they are considered to have lineage value. All other events will not be emitted --- .../lifecycle/SparkSQLExecutionContext.java | 67 +++++++++++++------ 1 file changed, 47 insertions(+), 20 deletions(-) diff --git a/integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/SparkSQLExecutionContext.java b/integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/SparkSQLExecutionContext.java index 69a437a200..ff330032cc 100644 --- a/integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/SparkSQLExecutionContext.java +++ b/integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/SparkSQLExecutionContext.java @@ -10,6 +10,7 @@ import static io.openlineage.client.OpenLineage.RunEvent.EventType.RUNNING; import static io.openlineage.client.OpenLineage.RunEvent.EventType.START; import static io.openlineage.spark.agent.util.TimeUtils.toZonedTime; +import static java.util.Objects.isNull; import io.openlineage.client.OpenLineage; import io.openlineage.client.OpenLineage.RunEvent; @@ -27,6 +28,8 @@ import java.util.Optional; import java.util.Stack; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Stream; + import lombok.extern.slf4j.Slf4j; import org.apache.spark.scheduler.*; import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; @@ -67,6 +70,28 @@ public SparkSQLExecutionContext( this.runEventBuilder = runEventBuilder; } + private static Boolean shouldEmit(RunEvent event){ + if (RUNNING.equals(event.getEventType())) { + log.info("OpenLineage event is not emitted because the job is still running"); + return false; + } + + String jobName = event.getJob().getName(); + if (isNull(jobName)) { + log.info("OpenLineage event has no job name and will not be emmited"); + return false; + } + + if (Stream.of(".execute_insert_into_hadoop_fs_relation_command.", + ".adaptive_spark_plan.") + .noneMatch(jobName::contains)) { + log.info("OpenLineage event has no lineage value and will not be emmited"); + return false; + } + + return true; + } + @Override public void start(SparkListenerSQLExecutionStart startEvent) { if (log.isDebugEnabled()) { @@ -100,13 +125,12 @@ public void start(SparkListenerSQLExecutionStart startEvent) { .jobFacetsBuilder(getJobFacetsBuilder(olContext.getQueryExecution().get())) .build()); - if (RUNNING.equals(event.getEventType())) { - log.info("OpenLineage event is not emitted because the job is still running"); - return; - } + if (!shouldEmit(event)) { + return; + } - log.debug("Posting event for start {}: {}", executionId, event); - eventEmitter.emit(event); + log.debug("Posting event for start {}: {}", executionId, event); + eventEmitter.emit(event); } @Override @@ -151,15 +175,13 @@ public void end(SparkListenerSQLExecutionEnd endEvent) { .jobFacetsBuilder(getJobFacetsBuilder(olContext.getQueryExecution().get())) .build()); - if (RUNNING.equals(event.getEventType())) { - log.info("OpenLineage event is not emitted because the job is still running"); - return; + if (!shouldEmit(event)) { + return; } if (log.isDebugEnabled()) { log.debug("Posting event for end {}: {}", executionId, OpenLineageClientUtils.toJson(event)); } - eventEmitter.emit(event); } @@ -174,6 +196,7 @@ public void start(SparkListenerStageSubmitted stageSubmitted) { "OpenLineage received Spark event that is configured to be skipped: SparkListenerStageSubmitted"); return; } + RunEvent event = runEventBuilder.buildRun( OpenLineageRunEventContext.builder() @@ -189,9 +212,11 @@ public void start(SparkListenerStageSubmitted stageSubmitted) { .jobFacetsBuilder(getJobFacetsBuilder(olContext.getQueryExecution().get())) .build()); - log.info("OpenLineage event is not emitted because the job is still running"); -// log.debug("Posting event for stage submitted {}: {}", executionId, event); -// eventEmitter.emit(event); + if (!shouldEmit(event)) { + return; + } + + eventEmitter.emit(event); } // TODO: not invoked until https://github.com/OpenLineage/OpenLineage/issues/470 is completed @@ -205,6 +230,7 @@ public void end(SparkListenerStageCompleted stageCompleted) { "OpenLineage received Spark event that is configured to be skipped: SparkListenerStageCompleted"); return; } + RunEvent event = runEventBuilder.buildRun( OpenLineageRunEventContext.builder() @@ -220,9 +246,12 @@ public void end(SparkListenerStageCompleted stageCompleted) { .jobFacetsBuilder(getJobFacetsBuilder(olContext.getQueryExecution().get())) .build()); - log.info("OpenLineage event is not emitted because the job is still running"); -// log.debug("Posting event for stage completed {}: {}", executionId, event); -// eventEmitter.emit(event); + if (!shouldEmit(event)) { + return; + } + + log.debug("Posting event for stage completed {}: {}", executionId, event); + eventEmitter.emit(event); } @Override @@ -274,8 +303,7 @@ public void start(SparkListenerJobStart jobStart) { .jobFacetsBuilder(getJobFacetsBuilder(olContext.getQueryExecution().get())) .build()); - if (RUNNING.equals(event.getEventType())) { - log.info("OpenLineage event is not emitted because the job is still running"); + if (!shouldEmit(event)) { return; } @@ -328,8 +356,7 @@ public void end(SparkListenerJobEnd jobEnd) { .jobFacetsBuilder(getJobFacetsBuilder(olContext.getQueryExecution().get())) .build()); - if (RUNNING.equals(event.getEventType())) { - log.info("OpenLineage event is not emitted because the job is still running"); + if (!shouldEmit(event)) { return; } From 0f021ceec6bb292f3e7d59d71f93bb325d2bbf96 Mon Sep 17 00:00:00 2001 From: Jhonatas Rosendo Date: Fri, 18 Oct 2024 10:34:35 -0300 Subject: [PATCH 04/11] review adjustments --- .../lifecycle/SparkSQLExecutionContext.java | 21 ++++++++++--------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/SparkSQLExecutionContext.java b/integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/SparkSQLExecutionContext.java index ff330032cc..0e9b769fd7 100644 --- a/integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/SparkSQLExecutionContext.java +++ b/integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/SparkSQLExecutionContext.java @@ -24,11 +24,8 @@ import io.openlineage.spark.api.naming.JobNameBuilder; import java.time.ZoneOffset; import java.time.ZonedDateTime; -import java.util.List; -import java.util.Optional; -import java.util.Stack; +import java.util.*; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.stream.Stream; import lombok.extern.slf4j.Slf4j; import org.apache.spark.scheduler.*; @@ -59,6 +56,11 @@ class SparkSQLExecutionContext implements ExecutionContext { private SparkSQLQueryParser sqlRecorder = new SparkSQLQueryParser(); + private static final Set NU_WANTED_EVENT_NAME_SUBSTRINGS = Set.of( + ".execute_insert_into_hadoop_fs_relation_command.", + ".adaptive_spark_plan." + ); + public SparkSQLExecutionContext( long executionId, EventEmitter eventEmitter, @@ -72,19 +74,17 @@ public SparkSQLExecutionContext( private static Boolean shouldEmit(RunEvent event){ if (RUNNING.equals(event.getEventType())) { - log.info("OpenLineage event is not emitted because the job is still running"); + log.info("OpenLineage event is RUNNING and should not be emmited"); return false; } String jobName = event.getJob().getName(); if (isNull(jobName)) { - log.info("OpenLineage event has no job name and will not be emmited"); + log.info("OpenLineage event has no job name should not be emitted"); return false; } - if (Stream.of(".execute_insert_into_hadoop_fs_relation_command.", - ".adaptive_spark_plan.") - .noneMatch(jobName::contains)) { + if (NU_WANTED_EVENT_NAME_SUBSTRINGS.stream().noneMatch(jobName::contains)) { log.info("OpenLineage event has no lineage value and will not be emmited"); return false; } @@ -150,6 +150,7 @@ public void end(SparkListenerSQLExecutionEnd endEvent) { "OpenLineage received Spark event that is configured to be skipped: SparkListenerSQLExecutionEnd"); // return; } + // only one COMPLETE event is expected, verify if jobEnd was not emitted EventType eventType; if (emittedOnJobStart && !emittedOnJobEnd) { @@ -216,6 +217,7 @@ public void start(SparkListenerStageSubmitted stageSubmitted) { return; } + log.debug("Posting event for stage submitted {}: {}", executionId, event); eventEmitter.emit(event); } @@ -230,7 +232,6 @@ public void end(SparkListenerStageCompleted stageCompleted) { "OpenLineage received Spark event that is configured to be skipped: SparkListenerStageCompleted"); return; } - RunEvent event = runEventBuilder.buildRun( OpenLineageRunEventContext.builder() From 94a1cfe183f201d05e7d3484d7f12ef8b5ff781e Mon Sep 17 00:00:00 2001 From: Jhonatas Rosendo Date: Mon, 21 Oct 2024 16:30:40 -0300 Subject: [PATCH 05/11] Discard column level lineage when event is not COMPLETE --- .../lifecycle/SparkSQLExecutionContext.java | 44 +++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/SparkSQLExecutionContext.java b/integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/SparkSQLExecutionContext.java index 0e9b769fd7..77a9636daf 100644 --- a/integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/SparkSQLExecutionContext.java +++ b/integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/SparkSQLExecutionContext.java @@ -22,10 +22,14 @@ import io.openlineage.spark.agent.util.ScalaConversionUtils; import io.openlineage.spark.api.OpenLineageContext; import io.openlineage.spark.api.naming.JobNameBuilder; + +import java.lang.reflect.Field; import java.time.ZoneOffset; import java.time.ZonedDateTime; import java.util.*; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; +import java.util.stream.Stream; import lombok.extern.slf4j.Slf4j; import org.apache.spark.scheduler.*; @@ -92,6 +96,34 @@ private static Boolean shouldEmit(RunEvent event){ return true; } + private static Boolean shouldKeepColumnLineageFacet(EventType eventType) { + return !(START.equals(eventType) || RUNNING.equals(eventType)); + } + + private static void discardColumnLineage(RunEvent event) { + if (shouldKeepColumnLineageFacet(event.getEventType())) { return; } + + log.info("Discarding column lineage facet for event {}", event.getEventType()); + + try { + Field columnLineageFacetField = OpenLineage.DatasetFacets.class.getDeclaredField("columnLineage"); + columnLineageFacetField.setAccessible(true); + Stream + .concat(event.getInputs().stream(), event.getOutputs().stream()) + .collect(Collectors.toList()) + .forEach(dataset -> { + try { + log.info("Discarding column lineage facet for dataset {} {} {}", dataset.getClass().getName(), dataset.getNamespace(), dataset.getName()); + columnLineageFacetField.set(dataset.getFacets(), null); + } catch (IllegalAccessException e) { + log.warn("Failed to discard column lineage facet", e); + } + }); + } catch (NoSuchFieldException e) { + log.error("Failed to discard column lineage facet: columnLineage field not found at OpenLineage.DatasetFacets", e); + } + } + @Override public void start(SparkListenerSQLExecutionStart startEvent) { if (log.isDebugEnabled()) { @@ -129,6 +161,8 @@ public void start(SparkListenerSQLExecutionStart startEvent) { return; } + discardColumnLineage(event); + log.debug("Posting event for start {}: {}", executionId, event); eventEmitter.emit(event); } @@ -180,6 +214,8 @@ public void end(SparkListenerSQLExecutionEnd endEvent) { return; } + discardColumnLineage(event); + if (log.isDebugEnabled()) { log.debug("Posting event for end {}: {}", executionId, OpenLineageClientUtils.toJson(event)); } @@ -217,6 +253,8 @@ public void start(SparkListenerStageSubmitted stageSubmitted) { return; } + discardColumnLineage(event); + log.debug("Posting event for stage submitted {}: {}", executionId, event); eventEmitter.emit(event); } @@ -251,6 +289,8 @@ public void end(SparkListenerStageCompleted stageCompleted) { return; } + discardColumnLineage(event); + log.debug("Posting event for stage completed {}: {}", executionId, event); eventEmitter.emit(event); } @@ -308,6 +348,8 @@ public void start(SparkListenerJobStart jobStart) { return; } + discardColumnLineage(event); + log.debug("Posting event for start {}: {}", executionId, event); eventEmitter.emit(event); } @@ -361,6 +403,8 @@ public void end(SparkListenerJobEnd jobEnd) { return; } + discardColumnLineage(event); + log.debug("Posting event for end {}: {}", executionId, event); eventEmitter.emit(event); } From 948af9d02b050a5f335f8605d3f872dd62a19f23 Mon Sep 17 00:00:00 2001 From: Jhonatas Rosendo Date: Mon, 21 Oct 2024 16:31:33 -0300 Subject: [PATCH 06/11] Discarding RDD_JOB events --- .../spark/agent/lifecycle/RddExecutionContext.java | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/RddExecutionContext.java b/integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/RddExecutionContext.java index e37be8ef8f..ef89f71bf8 100644 --- a/integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/RddExecutionContext.java +++ b/integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/RddExecutionContext.java @@ -224,8 +224,11 @@ public void start(SparkListenerJobStart jobStart) { .build()) .job(buildJob(jobStart.jobId())) .build(); - log.debug("Posting event for start {}: {}", jobStart, event); - eventEmitter.emit(event); + + log.info("OpenLineage {} event has no lineage value an will not be emmited", SPARK_JOB_TYPE); + +// log.debug("Posting event for start {}: {}", jobStart, event); +// eventEmitter.emit(event); } @Override @@ -257,8 +260,11 @@ public void end(SparkListenerJobEnd jobEnd) { .build()) .job(buildJob(jobEnd.jobId())) .build(); - log.debug("Posting event for end {}: {}", jobEnd, event); - eventEmitter.emit(event); + + log.info("OpenLineage {} event has no lineage value an will not be emmited", SPARK_JOB_TYPE); + +// log.debug("Posting event for end {}: {}", jobEnd, event); +// eventEmitter.emit(event); } protected OpenLineage.RunFacets buildRunFacets(ErrorFacet jobError, SparkListenerEvent event) { From afdbd437857f78a070520ba55e9d9ea634964a17 Mon Sep 17 00:00:00 2001 From: Jhonatas Rosendo Date: Mon, 21 Oct 2024 17:53:04 -0300 Subject: [PATCH 07/11] Refactoring Nu validations for event emission. NuEventEmitter was created to hold all event emission validation logic. Edd, Spark Application and Spark SQL were changed accordingly. --- .../spark/agent/NuEventEmitter.java | 102 ++++++++++++++++ .../agent/lifecycle/RddExecutionContext.java | 13 +-- .../SparkApplicationExecutionContext.java | 14 +-- .../lifecycle/SparkSQLExecutionContext.java | 109 ++---------------- 4 files changed, 123 insertions(+), 115 deletions(-) create mode 100644 integration/spark/app/src/main/java/io/openlineage/spark/agent/NuEventEmitter.java diff --git a/integration/spark/app/src/main/java/io/openlineage/spark/agent/NuEventEmitter.java b/integration/spark/app/src/main/java/io/openlineage/spark/agent/NuEventEmitter.java new file mode 100644 index 0000000000..d512777236 --- /dev/null +++ b/integration/spark/app/src/main/java/io/openlineage/spark/agent/NuEventEmitter.java @@ -0,0 +1,102 @@ +package io.openlineage.spark.agent; + +import io.openlineage.client.OpenLineage; +import lombok.extern.slf4j.Slf4j; + +import java.lang.reflect.Field; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static io.openlineage.client.OpenLineage.RunEvent.EventType; +import static io.openlineage.client.OpenLineage.RunEvent; +import static io.openlineage.client.OpenLineage.RunEvent.EventType.*; +import static java.util.Objects.isNull; + +@Slf4j +public class NuEventEmitter { + + private static final Set WANTED_JOB_TYPES = Set.of( + "SQL_JOB" // as defined in SparkSQLExecutionContext.SPARK_JOB_TYPE + ); + + private static final Set WANTED_EVENT_NAME_SUBSTRINGS = Set.of( + ".execute_insert_into_hadoop_fs_relation_command.", + ".adaptive_spark_plan." + ); + + private static Boolean isPermittedJobType(RunEvent event) { + String jobType = event.getJob().getFacets().getJobType().getJobType(); + if (WANTED_JOB_TYPES.stream().noneMatch(jobType::equals)) { + log.info("OpenLineage event with job type {} has no lineage value and should not be emitted", jobType); + return false; + } + return true; + } + + private static Boolean isPermitedEventType(RunEvent event) { + if (RUNNING.equals(event.getEventType())) { + log.info("OpenLineage event is {} and should not be emitted", RUNNING); + return false; + } + return true; + } + + private static Boolean isPermittedJobName(RunEvent event) { + String jobName = event.getJob().getName(); + if (isNull(jobName)) { + log.info("OpenLineage event has no job name and should not be emitted"); + return false; + } + if (WANTED_EVENT_NAME_SUBSTRINGS.stream().noneMatch(jobName::contains)) { + log.info("OpenLineage event job name has no permitted substring and should not be emitted"); + return false; + } + return true; + } + + private static Boolean shouldEmit(RunEvent event) { + return Stream.of( + isPermittedJobType(event), + isPermitedEventType(event), + isPermittedJobName(event) + ).noneMatch(Boolean.FALSE::equals); + } + + private static Boolean shouldDiscardColumnLineageFacet(EventType eventType) { + return !COMPLETE.equals(eventType); + } + + private static void discardColumnLineageFacet(RunEvent event) { + try { + Field columnLineageFacetField = OpenLineage.DatasetFacets.class.getDeclaredField("columnLineage"); + columnLineageFacetField.setAccessible(true); + Stream + .concat(event.getInputs().stream(), event.getOutputs().stream()) + .collect(Collectors.toList()) + .forEach(dataset -> { + try { + log.info("Discarding column lineage facet for dataset {} {} {}", + dataset.getClass().getName(), dataset.getNamespace(), dataset.getName()); + columnLineageFacetField.set(dataset.getFacets(), null); + } catch (IllegalAccessException e) { + log.warn("Failed to discard column lineage facet", e); + } + }); + } catch (NoSuchFieldException e) { + log.error("Failed to discard column lineage facet: columnLineage field not found at OpenLineage.DatasetFacets", e); + } + } + + public static void emit(RunEvent event, EventEmitter eventEmitter) { + if (!shouldEmit(event)) { + return; + } + + if (shouldDiscardColumnLineageFacet(event.getEventType())) { + discardColumnLineageFacet(event); + } + + eventEmitter.emit(event); + } +} diff --git a/integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/RddExecutionContext.java b/integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/RddExecutionContext.java index ef89f71bf8..3858e103ff 100644 --- a/integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/RddExecutionContext.java +++ b/integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/RddExecutionContext.java @@ -11,6 +11,7 @@ import io.openlineage.client.utils.DatasetIdentifier; import io.openlineage.client.utils.UUIDUtils; import io.openlineage.spark.agent.EventEmitter; +import io.openlineage.spark.agent.NuEventEmitter; import io.openlineage.spark.agent.OpenLineageSparkListener; import io.openlineage.spark.agent.facets.ErrorFacet; import io.openlineage.spark.agent.facets.builder.GcpJobFacetBuilder; @@ -225,10 +226,8 @@ public void start(SparkListenerJobStart jobStart) { .job(buildJob(jobStart.jobId())) .build(); - log.info("OpenLineage {} event has no lineage value an will not be emmited", SPARK_JOB_TYPE); - -// log.debug("Posting event for start {}: {}", jobStart, event); -// eventEmitter.emit(event); + log.debug("Posting event for start {}: {}", jobStart, event); + NuEventEmitter.emit(event, eventEmitter); } @Override @@ -261,10 +260,8 @@ public void end(SparkListenerJobEnd jobEnd) { .job(buildJob(jobEnd.jobId())) .build(); - log.info("OpenLineage {} event has no lineage value an will not be emmited", SPARK_JOB_TYPE); - -// log.debug("Posting event for end {}: {}", jobEnd, event); -// eventEmitter.emit(event); + log.debug("Posting event for end {}: {}", jobEnd, event); + NuEventEmitter.emit(event, eventEmitter); } protected OpenLineage.RunFacets buildRunFacets(ErrorFacet jobError, SparkListenerEvent event) { diff --git a/integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/SparkApplicationExecutionContext.java b/integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/SparkApplicationExecutionContext.java index 205f85bf63..a95e50a644 100644 --- a/integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/SparkApplicationExecutionContext.java +++ b/integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/SparkApplicationExecutionContext.java @@ -12,6 +12,7 @@ import io.openlineage.client.OpenLineage; import io.openlineage.client.OpenLineage.RunEvent; import io.openlineage.spark.agent.EventEmitter; +import io.openlineage.spark.agent.NuEventEmitter; import io.openlineage.spark.agent.filters.EventFilterUtils; import io.openlineage.spark.api.OpenLineageContext; import io.openlineage.spark.api.naming.JobNameBuilder; @@ -94,11 +95,8 @@ public void start(SparkListenerApplicationStart applicationStart) { .event(applicationStart) .build()); - - log.info("OpenLineage APPLICATION event has no lineage value an will not be emmited"); - -// log.debug("Posting event for applicationId {} start: {}", applicationId, event); -// eventEmitter.emit(event); + log.debug("Posting event for applicationId {} start: {}", applicationId, event); + NuEventEmitter.emit(event, eventEmitter); } @Override @@ -128,10 +126,8 @@ public void end(SparkListenerApplicationEnd applicationEnd) { .event(applicationEnd) .build()); - log.info("OpenLineage APPLICATION event has no lineage value an will not be emmited"); - -// log.debug("Posting event for applicationId {} end: {}", applicationId, event); -// eventEmitter.emit(event); + log.debug("Posting event for applicationId {} end: {}", applicationId, event); + NuEventEmitter.emit(event, eventEmitter); } private OpenLineage.ParentRunFacet buildApplicationParentFacet() { diff --git a/integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/SparkSQLExecutionContext.java b/integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/SparkSQLExecutionContext.java index 77a9636daf..32f768e911 100644 --- a/integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/SparkSQLExecutionContext.java +++ b/integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/SparkSQLExecutionContext.java @@ -10,26 +10,23 @@ import static io.openlineage.client.OpenLineage.RunEvent.EventType.RUNNING; import static io.openlineage.client.OpenLineage.RunEvent.EventType.START; import static io.openlineage.spark.agent.util.TimeUtils.toZonedTime; -import static java.util.Objects.isNull; import io.openlineage.client.OpenLineage; import io.openlineage.client.OpenLineage.RunEvent; import io.openlineage.client.OpenLineage.RunEvent.EventType; import io.openlineage.client.OpenLineageClientUtils; import io.openlineage.spark.agent.EventEmitter; +import io.openlineage.spark.agent.NuEventEmitter; import io.openlineage.spark.agent.filters.EventFilterUtils; import io.openlineage.spark.agent.util.PlanUtils; import io.openlineage.spark.agent.util.ScalaConversionUtils; import io.openlineage.spark.api.OpenLineageContext; import io.openlineage.spark.api.naming.JobNameBuilder; -import java.lang.reflect.Field; import java.time.ZoneOffset; import java.time.ZonedDateTime; import java.util.*; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.stream.Collectors; -import java.util.stream.Stream; import lombok.extern.slf4j.Slf4j; import org.apache.spark.scheduler.*; @@ -60,10 +57,10 @@ class SparkSQLExecutionContext implements ExecutionContext { private SparkSQLQueryParser sqlRecorder = new SparkSQLQueryParser(); - private static final Set NU_WANTED_EVENT_NAME_SUBSTRINGS = Set.of( - ".execute_insert_into_hadoop_fs_relation_command.", - ".adaptive_spark_plan." - ); +// private static final Set NU_WANTED_EVENT_NAME_SUBSTRINGS = Set.of( +// ".execute_insert_into_hadoop_fs_relation_command.", +// ".adaptive_spark_plan." +// ); public SparkSQLExecutionContext( long executionId, @@ -76,54 +73,6 @@ public SparkSQLExecutionContext( this.runEventBuilder = runEventBuilder; } - private static Boolean shouldEmit(RunEvent event){ - if (RUNNING.equals(event.getEventType())) { - log.info("OpenLineage event is RUNNING and should not be emmited"); - return false; - } - - String jobName = event.getJob().getName(); - if (isNull(jobName)) { - log.info("OpenLineage event has no job name should not be emitted"); - return false; - } - - if (NU_WANTED_EVENT_NAME_SUBSTRINGS.stream().noneMatch(jobName::contains)) { - log.info("OpenLineage event has no lineage value and will not be emmited"); - return false; - } - - return true; - } - - private static Boolean shouldKeepColumnLineageFacet(EventType eventType) { - return !(START.equals(eventType) || RUNNING.equals(eventType)); - } - - private static void discardColumnLineage(RunEvent event) { - if (shouldKeepColumnLineageFacet(event.getEventType())) { return; } - - log.info("Discarding column lineage facet for event {}", event.getEventType()); - - try { - Field columnLineageFacetField = OpenLineage.DatasetFacets.class.getDeclaredField("columnLineage"); - columnLineageFacetField.setAccessible(true); - Stream - .concat(event.getInputs().stream(), event.getOutputs().stream()) - .collect(Collectors.toList()) - .forEach(dataset -> { - try { - log.info("Discarding column lineage facet for dataset {} {} {}", dataset.getClass().getName(), dataset.getNamespace(), dataset.getName()); - columnLineageFacetField.set(dataset.getFacets(), null); - } catch (IllegalAccessException e) { - log.warn("Failed to discard column lineage facet", e); - } - }); - } catch (NoSuchFieldException e) { - log.error("Failed to discard column lineage facet: columnLineage field not found at OpenLineage.DatasetFacets", e); - } - } - @Override public void start(SparkListenerSQLExecutionStart startEvent) { if (log.isDebugEnabled()) { @@ -157,14 +106,8 @@ public void start(SparkListenerSQLExecutionStart startEvent) { .jobFacetsBuilder(getJobFacetsBuilder(olContext.getQueryExecution().get())) .build()); - if (!shouldEmit(event)) { - return; - } - - discardColumnLineage(event); - log.debug("Posting event for start {}: {}", executionId, event); - eventEmitter.emit(event); + NuEventEmitter.emit(event, eventEmitter); } @Override @@ -210,16 +153,10 @@ public void end(SparkListenerSQLExecutionEnd endEvent) { .jobFacetsBuilder(getJobFacetsBuilder(olContext.getQueryExecution().get())) .build()); - if (!shouldEmit(event)) { - return; - } - - discardColumnLineage(event); - if (log.isDebugEnabled()) { log.debug("Posting event for end {}: {}", executionId, OpenLineageClientUtils.toJson(event)); } - eventEmitter.emit(event); + NuEventEmitter.emit(event, eventEmitter); } // TODO: not invoked until https://github.com/OpenLineage/OpenLineage/issues/470 is completed @@ -249,14 +186,8 @@ public void start(SparkListenerStageSubmitted stageSubmitted) { .jobFacetsBuilder(getJobFacetsBuilder(olContext.getQueryExecution().get())) .build()); - if (!shouldEmit(event)) { - return; - } - - discardColumnLineage(event); - log.debug("Posting event for stage submitted {}: {}", executionId, event); - eventEmitter.emit(event); + NuEventEmitter.emit(event, eventEmitter); } // TODO: not invoked until https://github.com/OpenLineage/OpenLineage/issues/470 is completed @@ -285,14 +216,8 @@ public void end(SparkListenerStageCompleted stageCompleted) { .jobFacetsBuilder(getJobFacetsBuilder(olContext.getQueryExecution().get())) .build()); - if (!shouldEmit(event)) { - return; - } - - discardColumnLineage(event); - log.debug("Posting event for stage completed {}: {}", executionId, event); - eventEmitter.emit(event); + NuEventEmitter.emit(event, eventEmitter); } @Override @@ -344,14 +269,8 @@ public void start(SparkListenerJobStart jobStart) { .jobFacetsBuilder(getJobFacetsBuilder(olContext.getQueryExecution().get())) .build()); - if (!shouldEmit(event)) { - return; - } - - discardColumnLineage(event); - log.debug("Posting event for start {}: {}", executionId, event); - eventEmitter.emit(event); + NuEventEmitter.emit(event, eventEmitter); } @Override @@ -399,14 +318,8 @@ public void end(SparkListenerJobEnd jobEnd) { .jobFacetsBuilder(getJobFacetsBuilder(olContext.getQueryExecution().get())) .build()); - if (!shouldEmit(event)) { - return; - } - - discardColumnLineage(event); - log.debug("Posting event for end {}: {}", executionId, event); - eventEmitter.emit(event); + NuEventEmitter.emit(event, eventEmitter); } @Override From e2ef5ef7c5f5d988488e0ea1750c31ab4c58da3d Mon Sep 17 00:00:00 2001 From: Jhonatas Rosendo Date: Mon, 21 Oct 2024 18:02:24 -0300 Subject: [PATCH 08/11] removing unused commented code --- .../spark/agent/lifecycle/SparkSQLExecutionContext.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/SparkSQLExecutionContext.java b/integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/SparkSQLExecutionContext.java index 32f768e911..959fd3e30d 100644 --- a/integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/SparkSQLExecutionContext.java +++ b/integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/SparkSQLExecutionContext.java @@ -57,11 +57,6 @@ class SparkSQLExecutionContext implements ExecutionContext { private SparkSQLQueryParser sqlRecorder = new SparkSQLQueryParser(); -// private static final Set NU_WANTED_EVENT_NAME_SUBSTRINGS = Set.of( -// ".execute_insert_into_hadoop_fs_relation_command.", -// ".adaptive_spark_plan." -// ); - public SparkSQLExecutionContext( long executionId, EventEmitter eventEmitter, From bd86aab5797283dfc715ea9ec1f0b6ceb1dcac3c Mon Sep 17 00:00:00 2001 From: Jhonatas Rosendo Date: Mon, 21 Oct 2024 18:23:50 -0300 Subject: [PATCH 09/11] Restoring imports --- .../agent/lifecycle/SparkSQLExecutionContext.java | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/SparkSQLExecutionContext.java b/integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/SparkSQLExecutionContext.java index 959fd3e30d..31c742e4f6 100644 --- a/integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/SparkSQLExecutionContext.java +++ b/integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/SparkSQLExecutionContext.java @@ -25,11 +25,20 @@ import java.time.ZoneOffset; import java.time.ZonedDateTime; -import java.util.*; +import java.util.List; +import java.util.Optional; +import java.util.Stack; import java.util.concurrent.atomic.AtomicBoolean; import lombok.extern.slf4j.Slf4j; -import org.apache.spark.scheduler.*; +import org.apache.spark.scheduler.ActiveJob; +import org.apache.spark.scheduler.JobFailed; +import org.apache.spark.scheduler.SparkListenerApplicationEnd; +import org.apache.spark.scheduler.SparkListenerApplicationStart; +import org.apache.spark.scheduler.SparkListenerJobEnd; +import org.apache.spark.scheduler.SparkListenerJobStart; +import org.apache.spark.scheduler.SparkListenerStageCompleted; +import org.apache.spark.scheduler.SparkListenerStageSubmitted; import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; import org.apache.spark.sql.execution.QueryExecution; import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd; From 73b104c29a1d1fba285ab2b898122e5065ae2eda Mon Sep 17 00:00:00 2001 From: Jhonatas Rosendo Date: Tue, 22 Oct 2024 16:54:05 -0300 Subject: [PATCH 10/11] changing NuEventEmitter logs level from into to debug --- .../io/openlineage/spark/agent/NuEventEmitter.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/integration/spark/app/src/main/java/io/openlineage/spark/agent/NuEventEmitter.java b/integration/spark/app/src/main/java/io/openlineage/spark/agent/NuEventEmitter.java index d512777236..ae426aba68 100644 --- a/integration/spark/app/src/main/java/io/openlineage/spark/agent/NuEventEmitter.java +++ b/integration/spark/app/src/main/java/io/openlineage/spark/agent/NuEventEmitter.java @@ -28,7 +28,7 @@ public class NuEventEmitter { private static Boolean isPermittedJobType(RunEvent event) { String jobType = event.getJob().getFacets().getJobType().getJobType(); if (WANTED_JOB_TYPES.stream().noneMatch(jobType::equals)) { - log.info("OpenLineage event with job type {} has no lineage value and should not be emitted", jobType); + log.debug("OpenLineage event with job type {} has no lineage value and should not be emitted", jobType); return false; } return true; @@ -36,7 +36,7 @@ private static Boolean isPermittedJobType(RunEvent event) { private static Boolean isPermitedEventType(RunEvent event) { if (RUNNING.equals(event.getEventType())) { - log.info("OpenLineage event is {} and should not be emitted", RUNNING); + log.debug("OpenLineage event is {} and should not be emitted", RUNNING); return false; } return true; @@ -45,11 +45,11 @@ private static Boolean isPermitedEventType(RunEvent event) { private static Boolean isPermittedJobName(RunEvent event) { String jobName = event.getJob().getName(); if (isNull(jobName)) { - log.info("OpenLineage event has no job name and should not be emitted"); + log.debug("OpenLineage event has no job name and should not be emitted"); return false; } if (WANTED_EVENT_NAME_SUBSTRINGS.stream().noneMatch(jobName::contains)) { - log.info("OpenLineage event job name has no permitted substring and should not be emitted"); + log.debug("OpenLineage event job name has no permitted substring and should not be emitted"); return false; } return true; @@ -76,11 +76,11 @@ private static void discardColumnLineageFacet(RunEvent event) { .collect(Collectors.toList()) .forEach(dataset -> { try { - log.info("Discarding column lineage facet for dataset {} {} {}", + log.debug("Discarding column lineage facet for dataset {} {} {}", dataset.getClass().getName(), dataset.getNamespace(), dataset.getName()); columnLineageFacetField.set(dataset.getFacets(), null); } catch (IllegalAccessException e) { - log.warn("Failed to discard column lineage facet", e); + log.error("Failed to discard column lineage facet", e); } }); } catch (NoSuchFieldException e) { From 975f83199769ab8b9e378625c986a1cc9371b5bc Mon Sep 17 00:00:00 2001 From: Jhonatas Rosendo Date: Tue, 22 Oct 2024 17:04:42 -0300 Subject: [PATCH 11/11] changing nu event emitter log --- .../main/java/io/openlineage/spark/agent/NuEventEmitter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration/spark/app/src/main/java/io/openlineage/spark/agent/NuEventEmitter.java b/integration/spark/app/src/main/java/io/openlineage/spark/agent/NuEventEmitter.java index ae426aba68..b647c4fb41 100644 --- a/integration/spark/app/src/main/java/io/openlineage/spark/agent/NuEventEmitter.java +++ b/integration/spark/app/src/main/java/io/openlineage/spark/agent/NuEventEmitter.java @@ -77,7 +77,7 @@ private static void discardColumnLineageFacet(RunEvent event) { .forEach(dataset -> { try { log.debug("Discarding column lineage facet for dataset {} {} {}", - dataset.getClass().getName(), dataset.getNamespace(), dataset.getName()); + dataset.getClass().getSimpleName(), dataset.getNamespace(), dataset.getName()); columnLineageFacetField.set(dataset.getFacets(), null); } catch (IllegalAccessException e) { log.error("Failed to discard column lineage facet", e);