diff --git a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/PostgresOffsetStoreDao.scala b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/PostgresOffsetStoreDao.scala index 5cd2df80f..2ad2bd056 100644 --- a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/PostgresOffsetStoreDao.scala +++ b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/PostgresOffsetStoreDao.scala @@ -62,11 +62,13 @@ private[projection] class PostgresOffsetStoreDao( private implicit val ec: ExecutionContext = system.executionContext - private val selectTimestampOffsetSql: String = + protected def createSelectTimestampOffsetSql: String = sql""" SELECT projection_key, persistence_id, seq_nr, timestamp_offset FROM $timestampOffsetTable WHERE slice = ? AND projection_name = ? ORDER BY timestamp_offset DESC LIMIT ?""" + private val selectTimestampOffsetSql: String = createSelectTimestampOffsetSql + protected def createSelectOneTimestampOffsetSql: String = sql""" SELECT seq_nr, timestamp_offset @@ -192,24 +194,25 @@ private[projection] class PostgresOffsetStoreDao( s"Expected BySlicesSourceProvider to be defined when TimestampOffset is used.") } + protected def bindSelectTimestampOffsetSql(stmt: Statement, slice: Int): Statement = { + stmt + .bind(0, slice) + .bind(1, projectionId.name) + .bind(2, settings.offsetSliceReadLimit) + } + override def readTimestampOffset( slice: Int): Future[immutable.IndexedSeq[R2dbcOffsetStore.RecordWithProjectionKey]] = { - r2dbcExecutor.select("read timestamp offset")( - conn => { - logger.trace("reading timestamp offset slice [{}] for [{}]", slice, projectionId) - conn - .createStatement(selectTimestampOffsetSql) - .bind(0, slice) - .bind(1, projectionId.name) - .bind(2, settings.offsetSliceReadLimit) - }, - row => { - val projectionKey = row.get("projection_key", classOf[String]) - val pid = row.get("persistence_id", classOf[String]) - val seqNr = row.get("seq_nr", classOf[java.lang.Long]) - val timestamp = row.getTimestamp("timestamp_offset") - R2dbcOffsetStore.RecordWithProjectionKey(R2dbcOffsetStore.Record(slice, pid, seqNr, timestamp), projectionKey) - }) + r2dbcExecutor.select("read timestamp offset")(conn => { + logger.trace("reading timestamp offset slice [{}] for [{}]", slice, projectionId) + bindSelectTimestampOffsetSql(conn.createStatement(selectTimestampOffsetSql), slice) + }, row => { + val projectionKey = row.get("projection_key", classOf[String]) + val pid = row.get("persistence_id", classOf[String]) + val seqNr = row.get("seq_nr", classOf[java.lang.Long]) + val timestamp = row.getTimestamp("timestamp_offset") + R2dbcOffsetStore.RecordWithProjectionKey(R2dbcOffsetStore.Record(slice, pid, seqNr, timestamp), projectionKey) + }) } def readTimestampOffset(slice: Int, pid: String): Future[Option[R2dbcOffsetStore.Record]] = { diff --git a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/SqlServerOffsetStoreDao.scala b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/SqlServerOffsetStoreDao.scala index bb431e4ce..b8d3d58b5 100644 --- a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/SqlServerOffsetStoreDao.scala +++ b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/SqlServerOffsetStoreDao.scala @@ -38,6 +38,18 @@ private[projection] class SqlServerOffsetStoreDao( override protected implicit def timestampCodec: TimestampCodec = SqlServerTimestampCodec + override protected def createSelectTimestampOffsetSql: String = + sql""" + SELECT TOP(@limit) projection_key, persistence_id, seq_nr, timestamp_offset + FROM $timestampOffsetTable WHERE slice = @slice AND projection_name = @projectionName ORDER BY timestamp_offset DESC""" + + override protected def bindSelectTimestampOffsetSql(stmt: Statement, slice: Int): Statement = { + stmt + .bind("@limit", settings.offsetSliceReadLimit) + .bind("@slice", slice) + .bind("@projectionName", projectionId.name) + } + override protected def createSelectOneTimestampOffsetSql: String = sql""" SELECT TOP(1) seq_nr, timestamp_offset