Skip to content
This repository has been archived by the owner on Dec 6, 2023. It is now read-only.

Conditional Fetch requests support #39

Open
wants to merge 19 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
46fd9fa
Introduce conditional request parameters leveraging If-(None)-Match a…
ajantis Jul 5, 2016
4bbc75b
Remove an unused import
ajantis Jul 5, 2016
47b43d4
Add conditional request params to internal helper API for fetching si…
ajantis Jul 5, 2016
88d1bca
Introduce conditional request params to public RiakBucket single obje…
ajantis Jul 5, 2016
f7eddda
Adapt Riak HTTP helper to support conditional request param fetch API
ajantis Jul 5, 2016
aeeebdc
Fix compile error in internal API helper
ajantis Jul 5, 2016
2d60100
Refine class constructor arguments
ajantis Jul 5, 2016
2625e73
Add handling for 412 PreconditionFailed http status code when fetchin…
ajantis Jul 5, 2016
8ebfe8d
Rename a class
ajantis Jul 5, 2016
fb6fe91
Align class API naming with http header names
ajantis Jul 5, 2016
d6e814c
Add tests for conditional fetch object requests
ajantis Jul 5, 2016
e051ade
Add a workaround for ETag-based http headers rendering
ajantis Jul 5, 2016
c5f5169
Fix formatting
ajantis Jul 5, 2016
fed1fc2
Fix a failing test: Riak treats If-Modified-Since condition as true f…
ajantis Jul 6, 2016
7733e08
Add scaladoc for new conditional request parameters
ajantis Jul 6, 2016
9a6bd02
[changed] Improve conditional request parameter naming
ajantis Jul 7, 2016
07a62f5
= Avoid using current time in integration tests with Riak (as it may …
ajantis Jul 7, 2016
3fc4795
= Add an integration tests for combining multiple conditional request…
ajantis Jul 7, 2016
11ac7d0
Merge changes from upstream
ajantis Jul 7, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 43 additions & 1 deletion src/main/scala/com/scalapenos/riak/RiakBucket.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package com.scalapenos.riak
trait RiakBucket {
import scala.concurrent.{ ExecutionContext, Future }
import internal._
import RiakBucket._

val name: String

Expand All @@ -28,7 +29,7 @@ trait RiakBucket {
*/
def resolver: RiakConflictsResolver

def fetch(key: String): Future[Option[RiakValue]]
def fetch(key: String, conditionalParams: ConditionalRequestParam*): Future[Option[RiakValue]]
def fetchWithSiblings(key: String): Future[Option[Set[RiakValue]]]

def fetch(index: String, value: String): Future[List[RiakValue]] = fetch(RiakIndex(index, value))
Expand Down Expand Up @@ -73,3 +74,44 @@ trait RiakBucket {

def unsafe: UnsafeBucketOperations
}

object RiakBucket {

/**
* Parameter for conditional request semantics.
* Can be used for Fetch Value and Store Value operations.
*/
sealed trait ConditionalRequestParam

/**
* Perform a request on a RiakValue only if value's ETag does not match the given one.
*
* @param eTag the target ETag value.
*/
case class IfNotMatch(eTag: ETag) extends ConditionalRequestParam

/**
* Perform a request on a RiakValue only if value's ETag matches the given one.
*
* @param eTag the target ETag value.
*/
case class IfMatch(eTag: ETag) extends ConditionalRequestParam

/**
* Perform a request on a RiakValue only if value's Last-Modified time is after the given timestamp.
*
* @param timestamp
*
* *Note*: if target time is in the future then Riak always treats this condition as `true`.
*/
case class IfModifiedSince(timestamp: DateTime) extends ConditionalRequestParam

/**
* Perform a request on a RiakValue only if value's Last-Modified time is before the given timestamp.
*
* @param timestamp
*
* *Note*: if target time is in the future then Riak always treats this condition as `true`.
*/
case class IfUnmodifiedSince(timestamp: DateTime) extends ConditionalRequestParam
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@
package com.scalapenos.riak
package internal

import RiakBucket._

private[riak] sealed class RiakHttpBucket(helper: RiakHttpClientHelper, server: RiakServerInfo, val name: String, val resolver: RiakConflictsResolver) extends RiakBucket {
def fetch(key: String) = helper.fetch(server, name, key, resolver)

def fetch(key: String, conditionalParams: ConditionalRequestParam*) = helper.fetch(server, name, key, resolver, conditionalParams)
def fetchWithSiblings(key: String) = helper.fetchWithSiblings(server, name, key, resolver)

def fetch(index: RiakIndex) = helper.fetch(server, name, index, resolver)
def fetch(indexRange: RiakIndexRange) = helper.fetch(server, name, indexRange, resolver)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ private[riak] object RiakHttpClientHelper {
}
}

private[riak] class RiakHttpClientHelper(system: ActorSystem) extends RiakUriSupport with RiakIndexSupport with DateTimeSupport {
private[riak] class RiakHttpClientHelper(system: ActorSystem) extends RiakHttpSupport with RiakIndexSupport with DateTimeSupport {
import scala.concurrent.Future
import scala.concurrent.Future._

Expand All @@ -44,11 +44,10 @@ private[riak] class RiakHttpClientHelper(system: ActorSystem) extends RiakUriSup
import spray.httpx.SprayJsonSupport._
import spray.json.DefaultJsonProtocol._

import org.slf4j.LoggerFactory

import SprayClientExtras._
import RiakHttpHeaders._
import RiakHttpClientHelper._
import RiakBucket._

import system.dispatcher

Expand All @@ -68,14 +67,15 @@ private[riak] class RiakHttpClientHelper(system: ActorSystem) extends RiakUriSup
}
}

def fetch(server: RiakServerInfo, bucket: String, key: String, resolver: RiakConflictsResolver): Future[Option[RiakValue]] = {
httpRequest(Get(KeyUri(server, bucket, key))).flatMap { response ⇒
def fetch(server: RiakServerInfo, bucket: String, key: String, resolver: RiakConflictsResolver, conditionalParams: Seq[ConditionalRequestParam] = Seq()): Future[Option[RiakValue]] = {
httpRequest(Get(KeyUri(server, bucket, key)).withHeaders(conditionalParams.map(_.asHttpHeader): _*)).flatMap { response ⇒
response.status match {
case OK ⇒ successful(toRiakValue(response))
case NotFound ⇒ successful(None)
case MultipleChoices ⇒ resolveConflict(server, bucket, key, response, resolver).map(Some(_))
case other ⇒ throw new BucketOperationFailed(s"Fetch for key '$key' in bucket '$bucket' produced an unexpected response code '$other'.")
// TODO: case NotModified => successful(None)
case OK ⇒ successful(toRiakValue(response))
case NotFound ⇒ successful(None)
case NotModified ⇒ successful(None) // This means that client is not able to distinguish cases when value is not in Riak or a supplied condition is not met.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a way to distinguish between the not-found and not-modified case? Perhaps in a different method to retain backwards compatibility.

case MultipleChoices ⇒ resolveConflict(server, bucket, key, response, resolver).map(Some(_))
case PreconditionFailed ⇒ successful(None) // Fetch with If-Match header returns that if ETag value doesn't match.
case other ⇒ throw new BucketOperationFailed(s"Fetch for key '$key' in bucket '$bucket' produced an unexpected response code '$other'.")
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@
package com.scalapenos.riak
package internal

private[riak] trait RiakUriSupport {
import spray.http.Uri
import spray.http.Uri._
private[riak] trait RiakHttpSupport {
import spray.http.{ Uri, HttpHeader, HttpHeaders, EntityTag }, HttpHeaders._, Uri._
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer consistent wildcard imports instead of explicit imports but that's a personal thing and I'm not religious about it.

import DateTimeSupport._
import RiakBucket._

// ==========================================================================
// Query Parameters
Expand All @@ -37,6 +38,22 @@ private[riak] trait RiakUriSupport {
def query = ("returnbody", s"$returnBody") +: Query.Empty
}

// ==========================================================================
// Conditional Request Parameters Support
// ==========================================================================

implicit class ConditionalHttpRequestParam(conditionalParam: ConditionalRequestParam) {
def asHttpHeader: HttpHeader = {
conditionalParam match {
case IfModifiedSince(date) ⇒ `If-Modified-Since`(toSprayDateTime(date))
case IfUnmodifiedSince(date) ⇒ `If-Unmodified-Since`(toSprayDateTime(date))
case IfMatch(eTag) ⇒ RawHeader("If-Match", eTag.value) // TODO this `If-Match`(EntityTag(eTag)) doesn't work as spray escapes double quotes in ETag value
case IfNotMatch(eTag) ⇒ RawHeader("If-None-Match", eTag.value) // TODO this `If-None-Match`(EntityTag(eTag)) doesn't work as spray escapes double quotes in ETag value
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The TODO comments are slightly confusing.
Are you saying that the "normal" Spray support for these two headers doesn't work because of an escaping problem and that you are therefor using raw headers? If so, fine because the functionality works. If not, than the PR isn't finished yet, right?

case _ ⇒ throw new IllegalArgumentException("Unknown conditional request param: cannot convert to HTTP header.")
}
}
}

// ==========================================================================
// URL building and Query Parameters
// ==========================================================================
Expand Down
127 changes: 127 additions & 0 deletions src/test/scala/com/scalapenos/riak/RiakBucketSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

package com.scalapenos.riak

import com.scalapenos.riak.RiakBucket.{ IfMatch, IfModifiedSince, IfNotMatch, IfUnmodifiedSince }
import org.joda.time.DateTime

class RiakBucketSpec extends RiakClientSpecification with RandomKeySupport with RandomBucketSupport {

"A RiakBucket" should {
Expand Down Expand Up @@ -100,5 +103,129 @@ class RiakBucketSpec extends RiakClientSpecification with RandomKeySupport with

fetched should beNone
}

// ============================================================================
// Conditional requests tests
// ============================================================================

"not return back a stored value if 'If-None-Match' condition does not hold for a requested data" in {
val bucket = randomBucket
val key = randomKey

val storedValue = bucket.storeAndFetch(key, "value").await

val eTag = storedValue.etag

bucket.fetch(key, IfNotMatch(eTag)).await must beNone
}

"return back a stored value if 'If-None-Match' condition holds for requested data" in {
val bucket = randomBucket
val key = randomKey

val storedValue = bucket.storeAndFetch(key, "value").await

bucket.fetch(key, IfNotMatch(randomKey)).await must beSome(storedValue)
}

"not return back a stored value if 'If-Match' condition does not hold for a requested data" in {
val bucket = randomBucket
val key = randomKey

bucket.storeAndFetch(key, "value").await

bucket.fetch(key, IfMatch(randomKey)).await must beNone
}

"return back a stored value if 'If-Match' condition holds for requested data" in {
val bucket = randomBucket
val key = randomKey

val storedValue = bucket.storeAndFetch(key, "value").await

val eTag = storedValue.etag

bucket.fetch(key, IfMatch(eTag)).await must beSome(storedValue)
}

"not return back a stored value if 'If-Modified-Since' condition does not hold for a requested data" in {
val bucket = randomBucket
val key = randomKey

val storedValue = bucket.storeAndFetch(key, "value").await

// Fetch if the value has been modified after store operation
bucket.fetch(key, IfModifiedSince(storedValue.lastModified.plusMillis(1))).await must beNone
}

"return back a stored value if 'If-Modified-Since' condition holds for requested data" in {
val bucket = randomBucket
val key = randomKey

val storedValue = bucket.storeAndFetch(key, "value").await

// Fetch if the value has been modified since before the store operation
bucket.fetch(key, IfModifiedSince(storedValue.lastModified.minusMinutes(5))).await must beSome(storedValue)
}

"not return back a stored value if 'If-Unmodified-Since' condition does not hold for a requested data" in {
val bucket = randomBucket
val key = randomKey

val storedValue = bucket.storeAndFetch(key, "value").await

// Fetch if the value has not been modified since before the store operation
bucket.fetch(key, IfUnmodifiedSince(storedValue.lastModified.minusMinutes(5))).await must beNone
}

"return back a stored value if 'If-Unmodified-Since' condition holds for requested data" in {
val bucket = randomBucket
val key = randomKey

val storedValue = bucket.storeAndFetch(key, "value").await

// Fetch if the value has not been modified since after the store operation
bucket.fetch(key, IfUnmodifiedSince(storedValue.lastModified.plusMinutes(5))).await must beSome(storedValue)
}

// Combining multiple request conditions

"support multiple conditional request parameters" in {
val bucket = randomBucket
val key = randomKey

val storedValue = bucket.storeAndFetch(key, "value").await

// Fetch a value that hasn't been modified since after the store operation (this condition holds)
// only if it has a different tag (this condition doesn't hold)
bucket.fetch(key,
IfUnmodifiedSince(storedValue.lastModified.plusMillis(1)),
IfNotMatch(storedValue.etag)
).await must beNone

// Fetch a value if it has the same ETag (this condition holds)
// has been modified since before the store operation (this condition also holds)
bucket.fetch(key,
IfMatch(storedValue.etag),
IfModifiedSince(storedValue.lastModified.minusMillis(1))
).await must beSome(storedValue)

// Fetch a value if it has the same ETag (this condition holds)
// and has been modified since after the store operation (this condition doesn't hold)
bucket.fetch(key,
IfMatch(storedValue.etag),
IfModifiedSince(storedValue.lastModified.plusMillis(1))
).await must beNone

bucket.fetch(key,
// Repeating the same conditional parameter doesn't change the behaviour
IfNotMatch(storedValue.etag),
IfNotMatch(storedValue.etag)).await must beNone

bucket.fetch(key,
// Repeating the same conditional parameter doesn't change the behaviour
IfModifiedSince(storedValue.lastModified.minusMinutes(5)),
IfModifiedSince(storedValue.lastModified.minusMinutes(5))).await must beSome(storedValue)
}
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be better expressed as separate tests.

}
}