Skip to content

Commit

Permalink
fix: SQL Server limit syntax (#1310)
Browse files Browse the repository at this point in the history
  • Loading branch information
patriknw authored Jan 28, 2025
1 parent 62f81cc commit adbfd31
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,13 @@ private[projection] class PostgresOffsetStoreDao(

private implicit val ec: ExecutionContext = system.executionContext

private val selectTimestampOffsetSql: String =
protected def createSelectTimestampOffsetSql: String =
sql"""
SELECT projection_key, persistence_id, seq_nr, timestamp_offset
FROM $timestampOffsetTable WHERE slice = ? AND projection_name = ? ORDER BY timestamp_offset DESC LIMIT ?"""

private val selectTimestampOffsetSql: String = createSelectTimestampOffsetSql

protected def createSelectOneTimestampOffsetSql: String =
sql"""
SELECT seq_nr, timestamp_offset
Expand Down Expand Up @@ -192,24 +194,25 @@ private[projection] class PostgresOffsetStoreDao(
s"Expected BySlicesSourceProvider to be defined when TimestampOffset is used.")
}

protected def bindSelectTimestampOffsetSql(stmt: Statement, slice: Int): Statement = {
stmt
.bind(0, slice)
.bind(1, projectionId.name)
.bind(2, settings.offsetSliceReadLimit)
}

override def readTimestampOffset(
slice: Int): Future[immutable.IndexedSeq[R2dbcOffsetStore.RecordWithProjectionKey]] = {
r2dbcExecutor.select("read timestamp offset")(
conn => {
logger.trace("reading timestamp offset slice [{}] for [{}]", slice, projectionId)
conn
.createStatement(selectTimestampOffsetSql)
.bind(0, slice)
.bind(1, projectionId.name)
.bind(2, settings.offsetSliceReadLimit)
},
row => {
val projectionKey = row.get("projection_key", classOf[String])
val pid = row.get("persistence_id", classOf[String])
val seqNr = row.get("seq_nr", classOf[java.lang.Long])
val timestamp = row.getTimestamp("timestamp_offset")
R2dbcOffsetStore.RecordWithProjectionKey(R2dbcOffsetStore.Record(slice, pid, seqNr, timestamp), projectionKey)
})
r2dbcExecutor.select("read timestamp offset")(conn => {
logger.trace("reading timestamp offset slice [{}] for [{}]", slice, projectionId)
bindSelectTimestampOffsetSql(conn.createStatement(selectTimestampOffsetSql), slice)
}, row => {
val projectionKey = row.get("projection_key", classOf[String])
val pid = row.get("persistence_id", classOf[String])
val seqNr = row.get("seq_nr", classOf[java.lang.Long])
val timestamp = row.getTimestamp("timestamp_offset")
R2dbcOffsetStore.RecordWithProjectionKey(R2dbcOffsetStore.Record(slice, pid, seqNr, timestamp), projectionKey)
})
}

def readTimestampOffset(slice: Int, pid: String): Future[Option[R2dbcOffsetStore.Record]] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,18 @@ private[projection] class SqlServerOffsetStoreDao(

override protected implicit def timestampCodec: TimestampCodec = SqlServerTimestampCodec

override protected def createSelectTimestampOffsetSql: String =
sql"""
SELECT TOP(@limit) projection_key, persistence_id, seq_nr, timestamp_offset
FROM $timestampOffsetTable WHERE slice = @slice AND projection_name = @projectionName ORDER BY timestamp_offset DESC"""

override protected def bindSelectTimestampOffsetSql(stmt: Statement, slice: Int): Statement = {
stmt
.bind("@limit", settings.offsetSliceReadLimit)
.bind("@slice", slice)
.bind("@projectionName", projectionId.name)
}

override protected def createSelectOneTimestampOffsetSql: String =
sql"""
SELECT TOP(1) seq_nr, timestamp_offset
Expand Down

0 comments on commit adbfd31

Please sign in to comment.