From 2d53daca3ccbee28829e1396ae267e73b8854d44 Mon Sep 17 00:00:00 2001 From: Jehan Bruggeman Date: Mon, 20 Jun 2022 13:49:00 +0200 Subject: [PATCH] Support slice mode for SFTP (#862) * Extract SFTP tests to a separate module for readability * Add an SFTP test where slice mode is activated * Don't mask exception coming from the FTP backend * add `retrieveFileStream` in `SFTPClient` Co-authored-by: Jehan Bruggeman --- .../connect/ftp/source/BySlicesTest.scala | 168 ---------------- .../connect/ftp/source/SftpBySliceTest.scala | 80 ++++++++ .../connect/ftp/source/SftpTest.scala | 180 ++++++++++++++++++ .../connect/ftp/source/FtpMonitor.scala | 3 + .../connect/ftp/source/SFTPClient.scala | 18 +- 5 files changed, 279 insertions(+), 170 deletions(-) create mode 100644 kafka-connect-ftp/src/it/scala/com/datamountaineer/streamreactor/connect/ftp/source/SftpBySliceTest.scala create mode 100644 kafka-connect-ftp/src/it/scala/com/datamountaineer/streamreactor/connect/ftp/source/SftpTest.scala diff --git a/kafka-connect-ftp/src/it/scala/com/datamountaineer/streamreactor/connect/ftp/source/BySlicesTest.scala b/kafka-connect-ftp/src/it/scala/com/datamountaineer/streamreactor/connect/ftp/source/BySlicesTest.scala index 1c2c7f8b1..2d7b7fec8 100644 --- a/kafka-connect-ftp/src/it/scala/com/datamountaineer/streamreactor/connect/ftp/source/BySlicesTest.scala +++ b/kafka-connect-ftp/src/it/scala/com/datamountaineer/streamreactor/connect/ftp/source/BySlicesTest.scala @@ -4,11 +4,8 @@ import com.datamountaineer.streamreactor.connect.ftp.EmbeddedFtpServer import com.datamountaineer.streamreactor.connect.ftp.source.EndToEnd.Append import com.datamountaineer.streamreactor.connect.ftp.source.EndToEnd.DummyOffsetStorage import com.datamountaineer.streamreactor.connect.ftp.source.EndToEnd.FileSystem -import com.github.stefanbirkner.fakesftpserver.lambda.FakeSftpServer.withSftpServer import com.typesafe.scalalogging.StrictLogging -import org.apache.kafka.connect.source.SourceRecord import org.scalatest.BeforeAndAfter -import org.scalatest.concurrent.Eventually.eventually import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.should.Matchers @@ -191,169 +188,4 @@ class BySlicesTest extends AnyFunSuite with Matchers with BeforeAndAfter with St allReadBytes shouldBe fileContent1 ftpServer.stop() } - - /** - * Sftp - * ------ - */ - - test( - "Sftp:Same content mode by slices mode with SimpleFileConverter : " + - "after update file with same data, we detect same info so no data must be sent") { - - withSftpServer { server => - server.addUser("demo", "password") - - val offsets = new DummyOffsetStorage - val configMap = Map() - .updated(FtpSourceConfig.Address, s"localhost:${server.getPort}") - .updated(FtpSourceConfig.protocol, "sftp") - .updated(FtpSourceConfig.User, "demo") - .updated(FtpSourceConfig.Password, "password") - .updated(FtpSourceConfig.RefreshRate, "PT1S") - .updated(FtpSourceConfig.MonitorTail, "/directory/:sftp_update_slice") - .updated(FtpSourceConfig.FileMaxAge, "PT952302H53M5.962S") - .updated(FtpSourceConfig.KeyStyle, "struct") - .updated(FtpSourceConfig.fileFilter, ".*") - - val cfg = new FtpSourceConfig(configMap.asJava) - - val poller = new FtpSourcePoller(cfg, offsets) - - //push file - server.putFile("/directory/file1.txt", fileContent1) - - val allReadBytes: Array[Byte] = sftpPollUntilEnd(poller) - - allReadBytes.length shouldBe fileContent1.size - allReadBytes shouldBe fileContent1 - - logger.info(s"===================================================") - - //append same content to file - server.putFile("/directory/file1.txt", fileContent1) - - val allReadBytes1: Array[Byte] = sftpPollUntilEnd(poller) - - //No event is generated - allReadBytes1.length shouldBe 0 - () - } - } - - test( - "Sftp:Update mode by slices mode with MonitorUpdate and SimpleFileConverter :" + - " after update of file, all file data must be sent") { - withSftpServer { server => - server.addUser("demo", "password") - - val offsets = new DummyOffsetStorage - val configMap = Map() - .updated(FtpSourceConfig.Address, s"localhost:${server.getPort}") - .updated(FtpSourceConfig.protocol, "sftp") - .updated(FtpSourceConfig.User, "demo") - .updated(FtpSourceConfig.Password, "password") - .updated(FtpSourceConfig.RefreshRate, "PT1S") - .updated(FtpSourceConfig.MonitorUpdate, "/directory/:sftp_update_slice") - .updated(FtpSourceConfig.FileMaxAge, "PT952302H53M5.962S") - .updated(FtpSourceConfig.KeyStyle, "struct") - .updated(FtpSourceConfig.fileFilter, ".*") - - val cfg = new FtpSourceConfig(configMap.asJava) - - val poller = new FtpSourcePoller(cfg, offsets) - - //push file - server.putFile("/directory/file1.txt", fileContent1) - - val allReadBytes: Array[Byte] = sftpPollUntilEnd(poller) - - allReadBytes.length shouldBe fileContent1.size - allReadBytes shouldBe fileContent1 - - logger.info(s"===================================================") - - //append content to file - val deltaContent = "extra".getBytes - server.putFile("/directory/file1.txt", fileContent1 ++ deltaContent) - - val allReadBytes1: Array[Byte] = sftpPollUntilEnd(poller) - - //Only the new delta - allReadBytes1.length shouldBe (fileContent1.size + deltaContent.size) - () - } - } - - test( - "Sftp:Update mode by slices mode with MonitorTail and SimpleFileConverter :" + - " after update of file, only new data must be sent") { - withSftpServer { server => - server.addUser("demo", "password") - - val offsets = new DummyOffsetStorage - val configMap = Map() - .updated(FtpSourceConfig.Address, s"localhost:${server.getPort}") - .updated(FtpSourceConfig.protocol, "sftp") - .updated(FtpSourceConfig.User, "demo") - .updated(FtpSourceConfig.Password, "password") - .updated(FtpSourceConfig.RefreshRate, "PT1S") - .updated(FtpSourceConfig.MonitorTail, "/directory/:sftp_update_slice") - .updated(FtpSourceConfig.FileMaxAge, "PT952302H53M5.962S") - .updated(FtpSourceConfig.KeyStyle, "struct") - .updated(FtpSourceConfig.fileFilter, ".*") - - val cfg = new FtpSourceConfig(configMap.asJava) - - val poller: FtpSourcePoller = new FtpSourcePoller(cfg, offsets) - - //push file - server.putFile("/directory/file1.txt", fileContent1) - - val allReadBytes: Array[Byte] = sftpPollUntilEnd(poller) - - allReadBytes.length shouldBe fileContent1.size - allReadBytes shouldBe fileContent1 - - logger.info(s"===================================================") - - //append content to file - val deltaContent = "extra".getBytes - server.putFile("/directory/file1.txt", fileContent1 ++ deltaContent) - - val allReadBytes1: Array[Byte] = sftpPollUntilEnd(poller) - - //Only the new delta - allReadBytes1.length shouldBe deltaContent.size - () - } - } - - def sftpPollUntilEnd(poller: FtpSourcePoller): Array[Byte] = { - var cnt = 0 - logger.info("--------------------poll" + cnt + "-------------------------") - var slice = waitForSlice(poller) - var allReadBytes = new Array[Byte](0) - while (slice.lengthCompare(1) == 0) { - slice.head.topic shouldBe "sftp_update_slice" - val bytes = slice.head.value().asInstanceOf[Array[Byte]] - allReadBytes ++= bytes - logger.info(s"bytes.size=${bytes.length}") - cnt += 1 - logger.info(s"--------------------poll$cnt-------------------------") - slice = waitForSlice(poller) - } - allReadBytes - } - - private def waitForSlice(poller: FtpSourcePoller): Seq[SourceRecord] = { - var slice: Seq[SourceRecord] = poller.poll() - eventually { - Thread.sleep(500) - slice = poller.poll() - println(slice) - slice should not be null - } - slice - } } diff --git a/kafka-connect-ftp/src/it/scala/com/datamountaineer/streamreactor/connect/ftp/source/SftpBySliceTest.scala b/kafka-connect-ftp/src/it/scala/com/datamountaineer/streamreactor/connect/ftp/source/SftpBySliceTest.scala new file mode 100644 index 000000000..54c044ac2 --- /dev/null +++ b/kafka-connect-ftp/src/it/scala/com/datamountaineer/streamreactor/connect/ftp/source/SftpBySliceTest.scala @@ -0,0 +1,80 @@ +package com.datamountaineer.streamreactor.connect.ftp.source + +import com.datamountaineer.streamreactor.connect.ftp.source.EndToEnd.DummyOffsetStorage +import com.github.stefanbirkner.fakesftpserver.lambda.FakeSftpServer.withSftpServer +import com.typesafe.scalalogging.StrictLogging +import org.apache.kafka.connect.source.SourceRecord +import org.scalatest.BeforeAndAfter +import org.scalatest.concurrent.Eventually.eventually +import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.matchers.should.Matchers + +import scala.jdk.CollectionConverters.MapHasAsJava + +class SftpBySliceTest extends AnyFunSuite with Matchers with BeforeAndAfter with StrictLogging { + + val lineSep = System.getProperty("line.separator") + val fileContent1 = (0 to 10000).map(index => s"line_${index}${lineSep}").mkString.getBytes + + test( + "Sftp:Same content mode by slices mode with SimpleFileConverter") { + + withSftpServer { server => + server.addUser("demo", "password") + + val offsets = new DummyOffsetStorage + val configMap = Map() + .updated(FtpSourceConfig.Address, s"localhost:${server.getPort}") + .updated(FtpSourceConfig.protocol, "sftp") + .updated(FtpSourceConfig.User, "demo") + .updated(FtpSourceConfig.Password, "password") + .updated(FtpSourceConfig.RefreshRate, "PT1S") + .updated(FtpSourceConfig.MonitorTail, "/directory/:sftp_update_slice") + .updated(FtpSourceConfig.MonitorSliceSize, "20480") + .updated(FtpSourceConfig.FileMaxAge, "PT952302H53M5.962S") + .updated(FtpSourceConfig.KeyStyle, "struct") + .updated(FtpSourceConfig.fileFilter, ".*") + + val cfg = new FtpSourceConfig(configMap.asJava) + + val poller = new FtpSourcePoller(cfg, offsets) + + //push file + server.putFile("/directory/file1.txt", fileContent1) + + val allReadBytes: Array[Byte] = sftpPollUntilEnd(poller) + + allReadBytes.length shouldBe fileContent1.size + allReadBytes shouldBe fileContent1 + () + } + } + + def sftpPollUntilEnd(poller: FtpSourcePoller): Array[Byte] = { + var cnt = 0 + logger.info("--------------------poll" + cnt + "-------------------------") + var slice = waitForSlice(poller) + var allReadBytes = new Array[Byte](0) + while (slice.lengthCompare(1) == 0) { + slice.head.topic shouldBe "sftp_update_slice" + val bytes = slice.head.value().asInstanceOf[Array[Byte]] + allReadBytes ++= bytes + logger.info(s"bytes.size=${bytes.length}") + cnt += 1 + logger.info(s"--------------------poll$cnt-------------------------") + slice = waitForSlice(poller) + } + allReadBytes + } + + private def waitForSlice(poller: FtpSourcePoller): Seq[SourceRecord] = { + var slice: Seq[SourceRecord] = poller.poll() + eventually { + Thread.sleep(100) + slice = poller.poll() + println(slice) + slice should not be null + } + slice + } +} diff --git a/kafka-connect-ftp/src/it/scala/com/datamountaineer/streamreactor/connect/ftp/source/SftpTest.scala b/kafka-connect-ftp/src/it/scala/com/datamountaineer/streamreactor/connect/ftp/source/SftpTest.scala new file mode 100644 index 000000000..edc7fb368 --- /dev/null +++ b/kafka-connect-ftp/src/it/scala/com/datamountaineer/streamreactor/connect/ftp/source/SftpTest.scala @@ -0,0 +1,180 @@ +package com.datamountaineer.streamreactor.connect.ftp.source + +import com.datamountaineer.streamreactor.connect.ftp.source.EndToEnd.DummyOffsetStorage +import com.github.stefanbirkner.fakesftpserver.lambda.FakeSftpServer.withSftpServer +import com.typesafe.scalalogging.StrictLogging +import org.apache.kafka.connect.source.SourceRecord +import org.scalatest.BeforeAndAfter +import org.scalatest.concurrent.Eventually.eventually +import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.matchers.should.Matchers + +import scala.jdk.CollectionConverters.MapHasAsJava + +class SftpTest extends AnyFunSuite with Matchers with BeforeAndAfter with StrictLogging { + + val lineSep = System.getProperty("line.separator") + val fileContent1 = (0 to 10000).map(index => s"line_${index}${lineSep}").mkString.getBytes + val fileContent2 = (0 to 11000).map(index => s"line_${index}${lineSep}").mkString.getBytes + + test( + "Sftp:Same content mode with SimpleFileConverter : " + + "after update file with same data, we detect same info so no data must be sent") { + + withSftpServer { server => + server.addUser("demo", "password") + + val offsets = new DummyOffsetStorage + val configMap = Map() + .updated(FtpSourceConfig.Address, s"localhost:${server.getPort}") + .updated(FtpSourceConfig.protocol, "sftp") + .updated(FtpSourceConfig.User, "demo") + .updated(FtpSourceConfig.Password, "password") + .updated(FtpSourceConfig.RefreshRate, "PT1S") + .updated(FtpSourceConfig.MonitorTail, "/directory/:sftp_ouptut") + .updated(FtpSourceConfig.FileMaxAge, "PT952302H53M5.962S") + .updated(FtpSourceConfig.KeyStyle, "struct") + .updated(FtpSourceConfig.fileFilter, ".*") + + val cfg = new FtpSourceConfig(configMap.asJava) + + val poller = new FtpSourcePoller(cfg, offsets) + + //push file + server.putFile("/directory/file1.txt", fileContent1) + server.putFile("/directory/file1.txt", fileContent1) + + val allReadBytes: Array[Byte] = sftpPollUntilEnd(poller) + + allReadBytes.length shouldBe fileContent1.size + allReadBytes shouldBe fileContent1 + + logger.info(s"===================================================") + + //append same content to file + server.putFile("/directory/file1.txt", fileContent1) + + val allReadBytes1: Array[Byte] = sftpPollUntilEnd(poller) + + //No event is generated + allReadBytes1.length shouldBe 0 + () + } + } + + test( + "Sftp:Update mode with MonitorUpdate and SimpleFileConverter :" + + " after update of file, all file data must be sent") { + withSftpServer { server => + server.addUser("demo", "password") + + val offsets = new DummyOffsetStorage + val configMap = Map() + .updated(FtpSourceConfig.Address, s"localhost:${server.getPort}") + .updated(FtpSourceConfig.protocol, "sftp") + .updated(FtpSourceConfig.User, "demo") + .updated(FtpSourceConfig.Password, "password") + .updated(FtpSourceConfig.RefreshRate, "PT1S") + .updated(FtpSourceConfig.MonitorUpdate, "/directory/:sftp_ouptut") + .updated(FtpSourceConfig.FileMaxAge, "PT952302H53M5.962S") + .updated(FtpSourceConfig.KeyStyle, "struct") + .updated(FtpSourceConfig.fileFilter, ".*") + + val cfg = new FtpSourceConfig(configMap.asJava) + + val poller = new FtpSourcePoller(cfg, offsets) + + //push file + server.putFile("/directory/file1.txt", fileContent1) + + val allReadBytes: Array[Byte] = sftpPollUntilEnd(poller) + + allReadBytes.length shouldBe fileContent1.size + allReadBytes shouldBe fileContent1 + + logger.info(s"===================================================") + + //append content to file + val deltaContent = "extra".getBytes + server.putFile("/directory/file1.txt", fileContent1 ++ deltaContent) + + val allReadBytes1: Array[Byte] = sftpPollUntilEnd(poller) + + //Only the new delta + allReadBytes1.length shouldBe (fileContent1.size + deltaContent.size) + () + } + } + + test( + "Sftp:Update mode with MonitorTail and SimpleFileConverter :" + + " after update of file, only new data must be sent") { + withSftpServer { server => + server.addUser("demo", "password") + + val offsets = new DummyOffsetStorage + val configMap = Map() + .updated(FtpSourceConfig.Address, s"localhost:${server.getPort}") + .updated(FtpSourceConfig.protocol, "sftp") + .updated(FtpSourceConfig.User, "demo") + .updated(FtpSourceConfig.Password, "password") + .updated(FtpSourceConfig.RefreshRate, "PT1S") + .updated(FtpSourceConfig.MonitorTail, "/directory/:sftp_ouptut") + .updated(FtpSourceConfig.FileMaxAge, "PT952302H53M5.962S") + .updated(FtpSourceConfig.KeyStyle, "struct") + .updated(FtpSourceConfig.fileFilter, ".*") + + val cfg = new FtpSourceConfig(configMap.asJava) + + val poller: FtpSourcePoller = new FtpSourcePoller(cfg, offsets) + + //push file + server.putFile("/directory/file1.txt", fileContent1) + + val allReadBytes: Array[Byte] = sftpPollUntilEnd(poller) + + allReadBytes.length shouldBe fileContent1.size + allReadBytes shouldBe fileContent1 + + logger.info(s"===================================================") + + //append content to file + val deltaContent = "extra".getBytes + server.putFile("/directory/file1.txt", fileContent1 ++ deltaContent) + + val allReadBytes1: Array[Byte] = sftpPollUntilEnd(poller) + + //Only the new delta + allReadBytes1.length shouldBe deltaContent.size + () + } + } + + def sftpPollUntilEnd(poller: FtpSourcePoller): Array[Byte] = { + var cnt = 0 + logger.info("--------------------poll" + cnt + "-------------------------") + var slice = waitForSlice(poller) + var allReadBytes = new Array[Byte](0) + while (slice.lengthCompare(1) == 0) { + slice.head.topic shouldBe "sftp_ouptut" + val bytes = slice.head.value().asInstanceOf[Array[Byte]] + allReadBytes ++= bytes + logger.info(s"bytes.size=${bytes.length}") + cnt += 1 + logger.info(s"--------------------poll$cnt-------------------------") + slice = waitForSlice(poller) + } + allReadBytes + } + + private def waitForSlice(poller: FtpSourcePoller): Seq[SourceRecord] = { + var slice: Seq[SourceRecord] = poller.poll() + eventually { + Thread.sleep(500) + slice = poller.poll() + println(slice) + slice should not be null + } + slice + } +} diff --git a/kafka-connect-ftp/src/main/scala/com/datamountaineer/streamreactor/connect/ftp/source/FtpMonitor.scala b/kafka-connect-ftp/src/main/scala/com/datamountaineer/streamreactor/connect/ftp/source/FtpMonitor.scala index 123ece551..17dcf3ce9 100644 --- a/kafka-connect-ftp/src/main/scala/com/datamountaineer/streamreactor/connect/ftp/source/FtpMonitor.scala +++ b/kafka-connect-ftp/src/main/scala/com/datamountaineer/streamreactor/connect/ftp/source/FtpMonitor.scala @@ -174,6 +174,9 @@ class FtpMonitor(settings: FtpMonitorSettings, fileConverter: FileConverter) ext Using(ftp.retrieveFileStream(file.path())) { inputStream => IOUtils.copyLarge(inputStream, outputStream, 0, sliceSize.toLong) + } match { + case Failure(exception) => throw exception + case Success(_value) => _value } if (outputStream.size() > 0) Some(outputStream.toByteArray) else None } diff --git a/kafka-connect-ftp/src/main/scala/com/datamountaineer/streamreactor/connect/ftp/source/SFTPClient.scala b/kafka-connect-ftp/src/main/scala/com/datamountaineer/streamreactor/connect/ftp/source/SFTPClient.scala index 538527291..7c68709d0 100644 --- a/kafka-connect-ftp/src/main/scala/com/datamountaineer/streamreactor/connect/ftp/source/SFTPClient.scala +++ b/kafka-connect-ftp/src/main/scala/com/datamountaineer/streamreactor/connect/ftp/source/SFTPClient.scala @@ -36,6 +36,7 @@ class SFTPClient extends FTPClient with StrictLogging { private var maybeExplicitPort: Option[Int] = None private var maybeJschSession: Option[Session] = None private var maybeChannelSftp: Option[ChannelSftp] = None + private var restartOffset: Long = 0 private val dateFormat = DateTimeFormatter.ofPattern("EEE MMM d HH:mm:ss zzz uuuu") @@ -142,12 +143,25 @@ class SFTPClient extends FTPClient with StrictLogging { logger.debug(s"SFTPClient Error, channel not initiated in path $remote.") false } { channel => + if (!channel.isConnected) connectChannel(channel) channel.get(remote, fileBody) - logger.debug(s"SFTPClient Successful retrieving files in path $remote.") true } - private def getFTPFiles(pathname: String, channel: ChannelSftp): List[FTPFile] = + override def retrieveFileStream(remote: String): java.io.InputStream = + maybeChannelSftp.fold { + throw new Exception(s"SFTPClient Error, channel not initiated in path $remote.") + } { channel => + if (!channel.isConnected) connectChannel(channel) + val stream = channel.get(remote, null, restartOffset) + restartOffset = 0 + stream + } + + override def setRestartOffset(offset: Long): Unit = + if (offset >= 0) restartOffset = offset + + private def getFTPFiles(pathname: String, channel: ChannelSftp) = (for { _ <- Try(channel.cd(pathname)) ftpFiles <- fetchFiles(pathname, channel)