Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to estimate file size lazily #51

Merged
merged 1 commit into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ trait FileCommitStrategy {
* Checks whether a file should be closed and committed to storage.
*
* @param fileOpenDuration Time the file has been open for.
* @param fileSize Current size of the file in bytes.
* @param fileSize Current size of the file in bytes, evaluated on-demand only, as it can be an expensive operation.
* @param recordsWritten Number of records written to the file.
* @return Whether the file should be closed and committed.
*/
def shouldCommit(fileOpenDuration: Duration, fileSize: Long, recordsWritten: Long): Boolean
def shouldCommit(fileOpenDuration: Duration, fileSize: => Long, recordsWritten: Long): Boolean
}

object FileCommitStrategy {
Expand All @@ -47,21 +47,24 @@ object FileCommitStrategy {
"At least one upper limit for the file commit strategy has to be defined"
)

override def shouldCommit(currFileOpenDuration: Duration, currFileSize: Long, currRecordsWritten: Long): Boolean = {
override def shouldCommit(currFileOpenDuration: Duration, currFileSize: => Long, currRecords: Long): Boolean = {
fileOpenDuration.exists(d => currFileOpenDuration.toMillis >= d.toMillis) ||
fileSize.exists(s => currFileSize >= s) ||
recordsWritten.exists(r => currRecordsWritten >= r)
recordsWritten.exists(r => currRecords >= r)
}
}

/**
* Commit strategy that commits files once ANY of the parameters reaches a threshold
* value sampled from a given Gaussian distribution.
* value sampled from a given Gaussian distribution. Optionally users can specify a batch size (in records)
* that triggers file size estimation, as doing it on every record can be too expensive.
* Note that the class maintains state, so it should not be re-used for multiple sinks.
*/
case class FuzzyReachedAnyOf(
fileOpenDurationDistribution: Option[GaussianDistribution[Duration]] = None,
fileSizeDistribution: Option[GaussianDistribution[Long]] = None,
recordsWrittenDistribution: Option[GaussianDistribution[Long]] = None
recordsWrittenDistribution: Option[GaussianDistribution[Long]] = None,
fileSizeSamplingBatchSize: Option[Long] = None
)(randomSeed: Option[Int] = None)
extends FileCommitStrategy {

Expand All @@ -85,13 +88,22 @@ object FileCommitStrategy {
)

private var currentSample = sampleParameters
private var recordsUntilFileSizeCheck = fileSizeSamplingBatchSize.getOrElse(1L)

override def shouldCommit(currFileOpenDuration: Duration, currFileSize: => Long, currRecords: Long): Boolean = {
val durationReached = currentSample.fileOpenDuration.exists(d => currFileOpenDuration.toMillis >= d.toMillis)
val recordsReached = currentSample.recordsWritten.exists(r => currRecords >= r)
val fileSizeReached = if (recordsUntilFileSizeCheck == 1) {
recordsUntilFileSizeCheck = fileSizeSamplingBatchSize.getOrElse(1)
currentSample.fileSize.exists(s => currFileSize >= s)
} else {
recordsUntilFileSizeCheck -= 1
false
}

override def shouldCommit(currFileOpenDuration: Duration, currFileSize: Long, currRecordsWritten: Long): Boolean = {
val shouldCommit = currentSample.fileOpenDuration.exists(d => currFileOpenDuration.toMillis >= d.toMillis) ||
currentSample.fileSize.exists(s => currFileSize >= s) ||
currentSample.recordsWritten.exists(r => currRecordsWritten >= r)

val shouldCommit = durationReached || recordsReached || fileSizeReached
if (shouldCommit) {
recordsUntilFileSizeCheck = fileSizeSamplingBatchSize.getOrElse(1)
// We only re-sample if we satisfy the conditions in the current sample, otherwise sampling is broken!
currentSample = sampleParameters
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright (c) 2020 Adform
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/

package com.adform.streamloader.sink.file

import com.adform.streamloader.sink.file.FileCommitStrategy.FuzzyReachedAnyOf
import com.adform.streamloader.util.GaussianDistribution
import org.scalatest.funspec.AnyFunSpec
import org.scalatest.matchers.should.Matchers

import java.time.Duration

class FileCommitStrategyTest extends AnyFunSpec with Matchers {

describe("FuzzyReachedAnyOf strategy") {
val strategy = new FuzzyReachedAnyOf(
fileOpenDurationDistribution = Some(GaussianDistribution(Duration.ofSeconds(10), Duration.ofSeconds(0))),
fileSizeDistribution = Some(GaussianDistribution(1000, 0)),
recordsWrittenDistribution = Some(GaussianDistribution(1000, 0))
)()

it("should commit after duration gets reached") {
strategy.shouldCommit(Duration.ofSeconds(1), 1, 1) shouldEqual false
strategy.shouldCommit(Duration.ofSeconds(11), 1, 1) shouldEqual true
}

it("should commit after record count gets reached") {
strategy.shouldCommit(Duration.ofSeconds(1), 1, 1) shouldEqual false
strategy.shouldCommit(Duration.ofSeconds(1), 1, 1001) shouldEqual true
}

it("should commit after file size gets reached") {
strategy.shouldCommit(Duration.ofSeconds(1), 1, 1) shouldEqual false
strategy.shouldCommit(Duration.ofSeconds(1), 1001, 1) shouldEqual true
}

it("should not commit if file size gets reached without reaching file size check sample size") {
val sampledStrategy = strategy.copy(fileSizeSamplingBatchSize = Some(5))(None)

sampledStrategy.shouldCommit(Duration.ofSeconds(1), 1, 1) shouldEqual false
sampledStrategy.shouldCommit(Duration.ofSeconds(1), 1001, 1) shouldEqual false
}

it("should commit once file size reached and file size check sample size reached") {
val sampledStrategy = strategy.copy(fileSizeSamplingBatchSize = Some(5))(None)

sampledStrategy.shouldCommit(Duration.ofSeconds(1), 1, 1) shouldEqual false
sampledStrategy.shouldCommit(Duration.ofSeconds(1), 1001, 1) shouldEqual false
sampledStrategy.shouldCommit(Duration.ofSeconds(1), 1001, 1) shouldEqual false
sampledStrategy.shouldCommit(Duration.ofSeconds(1), 1001, 1) shouldEqual false
sampledStrategy.shouldCommit(Duration.ofSeconds(1), 1001, 1) shouldEqual true
}

it("should only invoke file size check after the check sample size reached") {
val sampledStrategy = strategy.copy(fileSizeSamplingBatchSize = Some(5))(None)
var invoked = 0

def getFileSize: Long = {
invoked += 1
1001
}

sampledStrategy.shouldCommit(Duration.ofSeconds(1), getFileSize, 1) shouldEqual false
sampledStrategy.shouldCommit(Duration.ofSeconds(1), getFileSize, 1) shouldEqual false
sampledStrategy.shouldCommit(Duration.ofSeconds(1), getFileSize, 1) shouldEqual false
sampledStrategy.shouldCommit(Duration.ofSeconds(1), getFileSize, 1) shouldEqual false
sampledStrategy.shouldCommit(Duration.ofSeconds(1), getFileSize, 1) shouldEqual true

invoked shouldEqual 1
}

it("should reset the file size sampling counter after commiting") {
val sampledStrategy = strategy.copy(fileSizeSamplingBatchSize = Some(5))(None)

// open duration gets reached first
sampledStrategy.shouldCommit(Duration.ofSeconds(1), 1, 1) shouldEqual false
sampledStrategy.shouldCommit(Duration.ofSeconds(1), 1, 1) shouldEqual false
sampledStrategy.shouldCommit(Duration.ofSeconds(1001), 1, 1) shouldEqual true

// now it should still take at least 5 records to trigger file size check
sampledStrategy.shouldCommit(Duration.ofSeconds(1), 1, 1) shouldEqual false
sampledStrategy.shouldCommit(Duration.ofSeconds(1), 1001, 1) shouldEqual false
sampledStrategy.shouldCommit(Duration.ofSeconds(1), 1001, 1) shouldEqual false
sampledStrategy.shouldCommit(Duration.ofSeconds(1), 1001, 1) shouldEqual false
sampledStrategy.shouldCommit(Duration.ofSeconds(1), 1001, 1) shouldEqual true
}
}
}