Skip to content

Commit

Permalink
Ignore null records (#156)
Browse files Browse the repository at this point in the history
* Ignore null records

Sometimes, custom made SMTs overwrite the payload and pass nulls to the connector leading to unintended results.

* Fix the linting error

---------

Co-authored-by: stheppi <[email protected]>
  • Loading branch information
stheppi and stheppi authored Oct 29, 2024
1 parent a737863 commit 2e53f24
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,9 @@ class HttpSinkTask extends SinkTask with LazyLogging with JarManifestProvided {

logger.debug(s"[$sinkName] put call with ${records.size()} records")

val storedErrors = errorRef.get.unsafeRunSync()
val nonEmptyRecords = NonEmptySeq.fromSeq(records.asScala.toSeq)
val storedErrors = errorRef.get.unsafeRunSync()
//Filter out null records since there are users who are sending null records
val nonEmptyRecords = NonEmptySeq.fromSeq(records.asScala.toSeq.filter(_ != null))
(storedErrors, nonEmptyRecords) match {
case (errors, _) if errors.nonEmpty =>
handleStoredErrors(errors)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
*/
package io.lenses.streamreactor.connect.http.sink

import org.apache.kafka.connect.sink.SinkRecord
import org.scalatest.BeforeAndAfterEach
import org.scalatest.EitherValues
import org.scalatest.funsuite.AnyFunSuiteLike
import org.scalatest.matchers.should.Matchers

import java.util
import scala.jdk.CollectionConverters.SeqHasAsJava

class HttpSinkTaskTest extends AnyFunSuiteLike with Matchers with EitherValues with BeforeAndAfterEach {

Expand All @@ -33,4 +35,10 @@ class HttpSinkTaskTest extends AnyFunSuiteLike with Matchers with EitherValues w
}
}

test("put method should handle null records collection") {

noException should be thrownBy {
httpSinkTask.put(List[SinkRecord](null).asJava)
}
}
}

0 comments on commit 2e53f24

Please sign in to comment.