Skip to content

Commit

Permalink
Support slice mode for SFTP (#862)
Browse files Browse the repository at this point in the history
* Extract SFTP tests to a separate module for readability
* Add an SFTP test where slice mode is activated
* Don't mask exception coming from the FTP backend
* add `retrieveFileStream` in `SFTPClient`

Co-authored-by: Jehan Bruggeman <[email protected]>
  • Loading branch information
jbruggem and Jehan Bruggeman authored Jun 20, 2022
1 parent ad5ba6c commit 2d53dac
Show file tree
Hide file tree
Showing 5 changed files with 279 additions and 170 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,8 @@ import com.datamountaineer.streamreactor.connect.ftp.EmbeddedFtpServer
import com.datamountaineer.streamreactor.connect.ftp.source.EndToEnd.Append
import com.datamountaineer.streamreactor.connect.ftp.source.EndToEnd.DummyOffsetStorage
import com.datamountaineer.streamreactor.connect.ftp.source.EndToEnd.FileSystem
import com.github.stefanbirkner.fakesftpserver.lambda.FakeSftpServer.withSftpServer
import com.typesafe.scalalogging.StrictLogging
import org.apache.kafka.connect.source.SourceRecord
import org.scalatest.BeforeAndAfter
import org.scalatest.concurrent.Eventually.eventually
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.should.Matchers

Expand Down Expand Up @@ -191,169 +188,4 @@ class BySlicesTest extends AnyFunSuite with Matchers with BeforeAndAfter with St
allReadBytes shouldBe fileContent1
ftpServer.stop()
}

/**
* Sftp
* ------
*/

test(
"Sftp:Same content mode by slices mode with SimpleFileConverter : " +
"after update file with same data, we detect same info so no data must be sent") {

withSftpServer { server =>
server.addUser("demo", "password")

val offsets = new DummyOffsetStorage
val configMap = Map()
.updated(FtpSourceConfig.Address, s"localhost:${server.getPort}")
.updated(FtpSourceConfig.protocol, "sftp")
.updated(FtpSourceConfig.User, "demo")
.updated(FtpSourceConfig.Password, "password")
.updated(FtpSourceConfig.RefreshRate, "PT1S")
.updated(FtpSourceConfig.MonitorTail, "/directory/:sftp_update_slice")
.updated(FtpSourceConfig.FileMaxAge, "PT952302H53M5.962S")
.updated(FtpSourceConfig.KeyStyle, "struct")
.updated(FtpSourceConfig.fileFilter, ".*")

val cfg = new FtpSourceConfig(configMap.asJava)

val poller = new FtpSourcePoller(cfg, offsets)

//push file
server.putFile("/directory/file1.txt", fileContent1)

val allReadBytes: Array[Byte] = sftpPollUntilEnd(poller)

allReadBytes.length shouldBe fileContent1.size
allReadBytes shouldBe fileContent1

logger.info(s"===================================================")

//append same content to file
server.putFile("/directory/file1.txt", fileContent1)

val allReadBytes1: Array[Byte] = sftpPollUntilEnd(poller)

//No event is generated
allReadBytes1.length shouldBe 0
()
}
}

test(
"Sftp:Update mode by slices mode with MonitorUpdate and SimpleFileConverter :" +
" after update of file, all file data must be sent") {
withSftpServer { server =>
server.addUser("demo", "password")

val offsets = new DummyOffsetStorage
val configMap = Map()
.updated(FtpSourceConfig.Address, s"localhost:${server.getPort}")
.updated(FtpSourceConfig.protocol, "sftp")
.updated(FtpSourceConfig.User, "demo")
.updated(FtpSourceConfig.Password, "password")
.updated(FtpSourceConfig.RefreshRate, "PT1S")
.updated(FtpSourceConfig.MonitorUpdate, "/directory/:sftp_update_slice")
.updated(FtpSourceConfig.FileMaxAge, "PT952302H53M5.962S")
.updated(FtpSourceConfig.KeyStyle, "struct")
.updated(FtpSourceConfig.fileFilter, ".*")

val cfg = new FtpSourceConfig(configMap.asJava)

val poller = new FtpSourcePoller(cfg, offsets)

//push file
server.putFile("/directory/file1.txt", fileContent1)

val allReadBytes: Array[Byte] = sftpPollUntilEnd(poller)

allReadBytes.length shouldBe fileContent1.size
allReadBytes shouldBe fileContent1

logger.info(s"===================================================")

//append content to file
val deltaContent = "extra".getBytes
server.putFile("/directory/file1.txt", fileContent1 ++ deltaContent)

val allReadBytes1: Array[Byte] = sftpPollUntilEnd(poller)

//Only the new delta
allReadBytes1.length shouldBe (fileContent1.size + deltaContent.size)
()
}
}

test(
"Sftp:Update mode by slices mode with MonitorTail and SimpleFileConverter :" +
" after update of file, only new data must be sent") {
withSftpServer { server =>
server.addUser("demo", "password")

val offsets = new DummyOffsetStorage
val configMap = Map()
.updated(FtpSourceConfig.Address, s"localhost:${server.getPort}")
.updated(FtpSourceConfig.protocol, "sftp")
.updated(FtpSourceConfig.User, "demo")
.updated(FtpSourceConfig.Password, "password")
.updated(FtpSourceConfig.RefreshRate, "PT1S")
.updated(FtpSourceConfig.MonitorTail, "/directory/:sftp_update_slice")
.updated(FtpSourceConfig.FileMaxAge, "PT952302H53M5.962S")
.updated(FtpSourceConfig.KeyStyle, "struct")
.updated(FtpSourceConfig.fileFilter, ".*")

val cfg = new FtpSourceConfig(configMap.asJava)

val poller: FtpSourcePoller = new FtpSourcePoller(cfg, offsets)

//push file
server.putFile("/directory/file1.txt", fileContent1)

val allReadBytes: Array[Byte] = sftpPollUntilEnd(poller)

allReadBytes.length shouldBe fileContent1.size
allReadBytes shouldBe fileContent1

logger.info(s"===================================================")

//append content to file
val deltaContent = "extra".getBytes
server.putFile("/directory/file1.txt", fileContent1 ++ deltaContent)

val allReadBytes1: Array[Byte] = sftpPollUntilEnd(poller)

//Only the new delta
allReadBytes1.length shouldBe deltaContent.size
()
}
}

def sftpPollUntilEnd(poller: FtpSourcePoller): Array[Byte] = {
var cnt = 0
logger.info("--------------------poll" + cnt + "-------------------------")
var slice = waitForSlice(poller)
var allReadBytes = new Array[Byte](0)
while (slice.lengthCompare(1) == 0) {
slice.head.topic shouldBe "sftp_update_slice"
val bytes = slice.head.value().asInstanceOf[Array[Byte]]
allReadBytes ++= bytes
logger.info(s"bytes.size=${bytes.length}")
cnt += 1
logger.info(s"--------------------poll$cnt-------------------------")
slice = waitForSlice(poller)
}
allReadBytes
}

private def waitForSlice(poller: FtpSourcePoller): Seq[SourceRecord] = {
var slice: Seq[SourceRecord] = poller.poll()
eventually {
Thread.sleep(500)
slice = poller.poll()
println(slice)
slice should not be null
}
slice
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package com.datamountaineer.streamreactor.connect.ftp.source

import com.datamountaineer.streamreactor.connect.ftp.source.EndToEnd.DummyOffsetStorage
import com.github.stefanbirkner.fakesftpserver.lambda.FakeSftpServer.withSftpServer
import com.typesafe.scalalogging.StrictLogging
import org.apache.kafka.connect.source.SourceRecord
import org.scalatest.BeforeAndAfter
import org.scalatest.concurrent.Eventually.eventually
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.should.Matchers

import scala.jdk.CollectionConverters.MapHasAsJava

class SftpBySliceTest extends AnyFunSuite with Matchers with BeforeAndAfter with StrictLogging {

val lineSep = System.getProperty("line.separator")
val fileContent1 = (0 to 10000).map(index => s"line_${index}${lineSep}").mkString.getBytes

test(
"Sftp:Same content mode by slices mode with SimpleFileConverter") {

withSftpServer { server =>
server.addUser("demo", "password")

val offsets = new DummyOffsetStorage
val configMap = Map()
.updated(FtpSourceConfig.Address, s"localhost:${server.getPort}")
.updated(FtpSourceConfig.protocol, "sftp")
.updated(FtpSourceConfig.User, "demo")
.updated(FtpSourceConfig.Password, "password")
.updated(FtpSourceConfig.RefreshRate, "PT1S")
.updated(FtpSourceConfig.MonitorTail, "/directory/:sftp_update_slice")
.updated(FtpSourceConfig.MonitorSliceSize, "20480")
.updated(FtpSourceConfig.FileMaxAge, "PT952302H53M5.962S")
.updated(FtpSourceConfig.KeyStyle, "struct")
.updated(FtpSourceConfig.fileFilter, ".*")

val cfg = new FtpSourceConfig(configMap.asJava)

val poller = new FtpSourcePoller(cfg, offsets)

//push file
server.putFile("/directory/file1.txt", fileContent1)

val allReadBytes: Array[Byte] = sftpPollUntilEnd(poller)

allReadBytes.length shouldBe fileContent1.size
allReadBytes shouldBe fileContent1
()
}
}

def sftpPollUntilEnd(poller: FtpSourcePoller): Array[Byte] = {
var cnt = 0
logger.info("--------------------poll" + cnt + "-------------------------")
var slice = waitForSlice(poller)
var allReadBytes = new Array[Byte](0)
while (slice.lengthCompare(1) == 0) {
slice.head.topic shouldBe "sftp_update_slice"
val bytes = slice.head.value().asInstanceOf[Array[Byte]]
allReadBytes ++= bytes
logger.info(s"bytes.size=${bytes.length}")
cnt += 1
logger.info(s"--------------------poll$cnt-------------------------")
slice = waitForSlice(poller)
}
allReadBytes
}

private def waitForSlice(poller: FtpSourcePoller): Seq[SourceRecord] = {
var slice: Seq[SourceRecord] = poller.poll()
eventually {
Thread.sleep(100)
slice = poller.poll()
println(slice)
slice should not be null
}
slice
}
}
Loading

0 comments on commit 2d53dac

Please sign in to comment.