Skip to content

Commit

Permalink
Merge pull request #9 from reugn/develop
Browse files Browse the repository at this point in the history
v0.6.0
  • Loading branch information
reugn authored Sep 9, 2023
2 parents a772b30 + 93b7fd1 commit aafe535
Show file tree
Hide file tree
Showing 11 changed files with 78 additions and 78 deletions.
14 changes: 5 additions & 9 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:

steps:
- name: Checkout
uses: actions/checkout@v2
uses: actions/checkout@v3

- name: Setup Java and Scala
uses: olafurpg/setup-scala@v12
Expand All @@ -30,17 +30,13 @@ jobs:
- name: Set up Aerospike Database
uses: reugn/github-action-aerospike@v1

- name: Cache sbt
uses: actions/cache@v2
- name: Cache SBT
uses: actions/cache@v3
with:
path: |
~/.sbt
~/.ivy2/cache
~/.coursier/cache/v1
~/.cache/coursier/v1
~/AppData/Local/Coursier/Cache/v1
~/Library/Caches/Coursier/v1
key: ${{ runner.os }}-sbt-cache-v2-${{ hashFiles('**/*.sbt') }}-${{ hashFiles('project/build.properties') }}
~/.sbt
key: ${{ runner.os }}-sbt-${{ hashFiles('**/build.sbt') }}

- name: Build and test
run: sbt ++${{ matrix.scala }} test
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package io.github.reugn.aerospike.scala

import com.aerospike.client.metrics.MetricsPolicy
import com.aerospike.client.policy.{AuthMode, ClientPolicy}
import com.aerospike.client.{AerospikeClient, Host, IAerospikeClient}
import com.typesafe.config.Config
import io.github.reugn.aerospike.scala.AerospikeClientBuilder._
import io.github.reugn.aerospike.scala.Policies.ClientPolicyImplicits._

class AerospikeClientBuilder(config: Config) {
class AerospikeClientBuilder(config: Config, metricsPolicy: Option[MetricsPolicy]) {

private def buildClientPolicy(): ClientPolicy = {
val policy = new ClientPolicy();
Expand All @@ -33,20 +34,22 @@ class AerospikeClientBuilder(config: Config) {
}

def build(): IAerospikeClient = {
Option(config.getString("aerospike.hostList")) map {
val client = Option(config.getString("aerospike.hostList")) map {
hostList =>
new AerospikeClient(buildClientPolicy(), Host.parseHosts(hostList, defaultPort): _*)
} getOrElse {
val hostname = Option(config.getString("aerospike.hostname")).getOrElse(defaultHostName)
val port = Option(config.getInt("aerospike.port")).getOrElse(defaultPort)
new AerospikeClient(buildClientPolicy(), hostname, port)
}
metricsPolicy.foreach(policy => client.enableMetrics(policy))
client
}
}

object AerospikeClientBuilder {
private[aerospike] val defaultHostName = "localhost"
private[aerospike] val defaultPort = 3000

def apply(config: Config): AerospikeClientBuilder = new AerospikeClientBuilder(config)
def apply(config: Config): AerospikeClientBuilder = new AerospikeClientBuilder(config, None)
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package io.github.reugn.aerospike.scala

import com.aerospike.client.async.{EventLoop, EventLoops, EventPolicy, NettyEventLoops}
import io.github.reugn.aerospike.scala.util.{Linux, Mac, OperatingSystem}
import io.netty.channel.epoll.EpollEventLoopGroup
import io.netty.channel.kqueue.KQueueEventLoopGroup
import io.netty.channel.epoll.{Epoll, EpollEventLoopGroup}
import io.netty.channel.kqueue.{KQueue, KQueueEventLoopGroup}
import io.netty.channel.nio.NioEventLoopGroup

object EventLoopProvider {
Expand All @@ -12,9 +12,9 @@ object EventLoopProvider {

private[scala] lazy val eventLoops: EventLoops = {
val eventLoopGroup = OperatingSystem() match {
case Linux =>
case Linux if Epoll.isAvailable =>
new EpollEventLoopGroup(nThreads)
case Mac =>
case Mac if KQueue.isAvailable =>
new KQueueEventLoopGroup(nThreads)
case _ =>
new NioEventLoopGroup(nThreads)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,21 @@ case class QueryStatement(
namespace: String,
setName: Option[String] = None,
binNames: Option[Seq[String]] = None,
secondaryIndexName: Option[String] = None,
secondaryIndexFilter: Option[Filter] = None,
partitionFilter: Option[PartitionFilter] = None,
operations: Option[Seq[Operation]] = None,
maxRecords: Option[Long] = None,
recordsPerSecond: Option[Int] = None
) {

lazy val statement: Statement = {
val statement: Statement = new Statement
statement.setNamespace(namespace)
setName.foreach(statement.setSetName)
binNames.foreach(bins => statement.setBinNames(bins: _*))
secondaryIndexName.foreach(statement.setIndexName)
secondaryIndexFilter.foreach(statement.setFilter)
operations.foreach(ops => statement.setOperations(ops.toArray))
maxRecords.foreach(statement.setMaxRecords)
recordsPerSecond.foreach(statement.setRecordsPerSecond)
statement
}

lazy val isScan: Boolean = secondaryIndexName.isEmpty || secondaryIndexFilter.isEmpty
}
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,6 @@ class AerospikeHandlerTest extends AsyncFlatSpec with TestCommon with Matchers w
val queryStatement = QueryStatement(
namespace,
setName = Some(set),
secondaryIndexName = Some("idx1"),
secondaryIndexFilter = Some(Filter.equal("bin1", 1))
)
assertThrows[AerospikeException] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import com.aerospike.client.task.ExecuteTask
import com.typesafe.config.Config
import io.github.reugn.aerospike.scala._
import io.github.reugn.aerospike.scala.model.QueryStatement
import zio.Task
import zio.stream.ZStream
import zio.{Task, ZIO}

import java.util.Calendar
import scala.collection.JavaConverters.seqAsJavaListConverter
Expand All @@ -19,49 +19,49 @@ class ZioAerospikeHandler(protected val client: IAerospikeClient)
with StreamHandler3[ZStream] {

override def put(key: Key, bins: Bin*)(implicit policy: WritePolicy): Task[Key] = {
Task(client.put(policy, key, bins: _*)).map(_ => key)
ZIO.attemptBlocking(client.put(policy, key, bins: _*)).map(_ => key)
}

override def append(key: Key, bins: Bin*)(implicit policy: WritePolicy): Task[Key] = {
Task(client.append(policy, key, bins: _*)).map(_ => key)
ZIO.attemptBlocking(client.append(policy, key, bins: _*)).map(_ => key)
}

override def prepend(key: Key, bins: Bin*)(implicit policy: WritePolicy): Task[Key] = {
Task(client.prepend(policy, key, bins: _*)).map(_ => key)
ZIO.attemptBlocking(client.prepend(policy, key, bins: _*)).map(_ => key)
}

override def add(key: Key, bins: Bin*)(implicit policy: WritePolicy): Task[Key] = {
Task(client.add(policy, key, bins: _*)).map(_ => key)
ZIO.attemptBlocking(client.add(policy, key, bins: _*)).map(_ => key)
}

override def delete(key: Key)(implicit policy: WritePolicy): Task[Boolean] = {
Task(client.delete(policy, key))
ZIO.attemptBlocking(client.delete(policy, key))
}

override def deleteBatch(keys: Seq[Key])
(implicit policy: BatchPolicy, batchDeletePolicy: BatchDeletePolicy): Task[BatchResults] = {
Task(client.delete(policy, batchDeletePolicy, keys.toArray))
ZIO.attemptBlocking(client.delete(policy, batchDeletePolicy, keys.toArray))
}

override def truncate(ns: String, set: String, beforeLastUpdate: Option[Calendar] = None)
(implicit policy: InfoPolicy): Task[Unit] = {
Task(client.truncate(policy, ns, set, beforeLastUpdate.orNull))
ZIO.attemptBlocking(client.truncate(policy, ns, set, beforeLastUpdate.orNull))
}

override def touch(key: Key)(implicit policy: WritePolicy): Task[Key] = {
Task(client.touch(policy, key)).map(_ => key)
ZIO.attemptBlocking(client.touch(policy, key)).map(_ => key)
}

override def exists(key: Key)(implicit policy: Policy): Task[Boolean] = {
Task(client.exists(policy, key))
ZIO.attemptBlocking(client.exists(policy, key))
}

override def existsBatch(keys: Seq[Key])(implicit policy: BatchPolicy): Task[Seq[Boolean]] = {
Task(client.exists(policy, keys.toArray)).map(_.toIndexedSeq)
ZIO.attemptBlocking(client.exists(policy, keys.toArray)).map(_.toIndexedSeq)
}

override def get(key: Key, binNames: String*)(implicit policy: Policy): Task[Record] = {
Task {
ZIO.attemptBlocking {
if (binNames.toArray.length > 0)
client.get(policy, key, binNames: _*)
else
Expand All @@ -70,7 +70,7 @@ class ZioAerospikeHandler(protected val client: IAerospikeClient)
}

override def getBatch(keys: Seq[Key], binNames: String*)(implicit policy: BatchPolicy): Task[Seq[Record]] = {
Task {
ZIO.attemptBlocking {
if (binNames.toArray.length > 0)
client.get(policy, keys.toArray, binNames: _*)
else
Expand All @@ -81,33 +81,33 @@ class ZioAerospikeHandler(protected val client: IAerospikeClient)
}

override def getBatchOp(keys: Seq[Key], operations: Operation*)(implicit policy: BatchPolicy): Task[Seq[Record]] = {
Task(client.get(policy, keys.toArray, operations: _*))
ZIO.attemptBlocking(client.get(policy, keys.toArray, operations: _*))
}

override def getHeader(key: Key)(implicit policy: Policy): Task[Record] = {
Task(client.getHeader(policy, key))
ZIO.attemptBlocking(client.getHeader(policy, key))
}

override def getHeaderBatch(keys: Seq[Key])(implicit policy: BatchPolicy): Task[Seq[Record]] = {
Task(client.getHeader(policy, keys.toArray)).map(_.toIndexedSeq)
ZIO.attemptBlocking(client.getHeader(policy, keys.toArray)).map(_.toIndexedSeq)
}

override def operate(key: Key, operations: Operation*)(implicit policy: WritePolicy): Task[Record] = {
Task(client.operate(policy, key, operations: _*))
ZIO.attemptBlocking(client.operate(policy, key, operations: _*))
}

override def operateBatch(keys: Seq[Key], operations: Operation*)
(implicit policy: BatchPolicy, batchWritePolicy: BatchWritePolicy): Task[BatchResults] = {
Task(client.operate(policy, batchWritePolicy, keys.toArray, operations: _*))
ZIO.attemptBlocking(client.operate(policy, batchWritePolicy, keys.toArray, operations: _*))
}

override def operateBatchRecord(records: Seq[BatchRecord])(implicit policy: BatchPolicy): Task[Boolean] = {
Task(client.operate(policy, records.asJava))
ZIO.attemptBlocking(client.operate(policy, records.asJava))
}

override def scanNodeName(nodeName: String, ns: String, set: String, binNames: String*)
(implicit policy: ScanPolicy): Task[List[KeyRecord]] = {
Task {
ZIO.attemptBlocking {
val callback = RecordScanCallback()
client.scanNode(policy, nodeName, ns, set, callback, binNames: _*)
callback.getRecordSet
Expand All @@ -116,7 +116,7 @@ class ZioAerospikeHandler(protected val client: IAerospikeClient)

override def scanNode(node: Node, ns: String, set: String, binNames: String*)
(implicit policy: ScanPolicy): Task[List[KeyRecord]] = {
Task {
ZIO.attemptBlocking {
val callback = RecordScanCallback()
client.scanNode(policy, node, ns, set, callback, binNames: _*)
callback.getRecordSet
Expand All @@ -125,11 +125,11 @@ class ZioAerospikeHandler(protected val client: IAerospikeClient)

override def execute(statement: Statement, operations: Operation*)
(implicit policy: WritePolicy): Task[ExecuteTask] = {
Task(client.execute(policy, statement, operations: _*))
ZIO.attemptBlocking(client.execute(policy, statement, operations: _*))
}

override def info(node: Node, name: String): Task[String] = {
Task(Info.request(node, name))
ZIO.attemptBlocking(Info.request(node, name))
}

override def query(statement: QueryStatement)
Expand Down
Loading

0 comments on commit aafe535

Please sign in to comment.