Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add sticky port #291

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 29 additions & 15 deletions src/scala/main/ly/stealth/mesos/kafka/Broker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -169,20 +169,32 @@ class Broker(val id: Int = 0) {
)
}

private[kafka] def getSuitablePort(ports: util.List[Range]): Int = {
if (ports.isEmpty) return -1

val ports_ = ports.sortBy(r => r.start)
if (port == null)
return ports_.get(0).start

for (range <- ports_) {
val overlap = range.overlap(port)
if (overlap != null)
return overlap.start
private[kafka] def getSuitablePort(availablePorts: util.List[Range]): Int = {
// no available ports to choose from
if (availablePorts.isEmpty) return -1

// compute allowed usable ports based on broker config and offer
val usablePorts: List[Range] =
if (port == null) availablePorts.toList.sortBy(_.start)
else availablePorts.toList.flatMap { range =>
if (range.overlap(port) == null) None else Some(range.overlap(port))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can be just Option(range.overlap(port))

}.sortBy(_.start)

// no port usable
if (usablePorts.isEmpty) return -1

// try to stick to the previous port if possible
if (stickiness.port != null) {
val preferedPort = new Range(stickiness.port)
for (range <- usablePorts) {
val found = range.overlap(preferedPort)
if (found != null)
return found.start
}
}

-1
// else return first usable port
return usablePorts.get(0).start
}

/*
Expand All @@ -198,8 +210,8 @@ class Broker(val id: Int = 0) {

def shouldStop: Boolean = !active && task != null && !task.stopping

def registerStart(hostname: String): Unit = {
stickiness.registerStart(hostname)
def registerStart(hostname: String, port: Integer): Unit = {
stickiness.registerStart(hostname, port)
failover.resetFailures()
}

Expand Down Expand Up @@ -289,12 +301,14 @@ object Broker {
class Stickiness(_period: Period = new Period("10m")) {
var period: Period = _period
@volatile var hostname: String = null
@volatile var port: Integer = null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer an Option[Int] here rather than a nullable to make it more idiomatic scala.

@volatile var stopTime: Date = null

def expires: Date = if (stopTime != null) new Date(stopTime.getTime + period.ms) else null

def registerStart(hostname: String): Unit = {
def registerStart(hostname: String, port: Integer): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prefer Int over Integer

this.hostname = hostname
this.port = port
stopTime = null
}

Expand Down
1 change: 1 addition & 0 deletions src/scala/main/ly/stealth/mesos/kafka/cli/BrokerCli.scala
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,7 @@ trait BrokerCli {
var stickiness = "stickiness:"
stickiness += " period:" + broker.stickiness.period
if (broker.stickiness.hostname != null) stickiness += ", hostname:" + broker.stickiness.hostname
if (broker.stickiness.port != null) stickiness += ", port:" + broker.stickiness.port
if (broker.stickiness.stopTime != null) stickiness += ", expires:" + Repr.dateTime(broker.stickiness.expires)
printLine(stickiness, indent)

Expand Down
5 changes: 3 additions & 2 deletions src/scala/main/ly/stealth/mesos/kafka/json/Model.scala
Original file line number Diff line number Diff line change
Expand Up @@ -175,11 +175,11 @@ class RangeDeserializer extends StdDeserializer[Range](classOf[Range]) {
}
}

case class StickinessModel(period: Period, stopTime: Date, hostname: String)
case class StickinessModel(period: Period, stopTime: Date, hostname: String, port: Integer)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto on Option[Int] over Integer


class StickinessSerializer extends StdSerializer[Stickiness](classOf[Stickiness]) {
override def serialize(s: Stickiness, gen: JsonGenerator, provider: SerializerProvider): Unit = {
provider.defaultSerializeValue(StickinessModel(s.period, s.stopTime, s.hostname), gen)
provider.defaultSerializeValue(StickinessModel(s.period, s.stopTime, s.hostname, s.port), gen)
}
}

Expand All @@ -188,6 +188,7 @@ class StickinessDeserializer extends StdDeserializer[Stickiness](classOf[Stickin
val model = p.readValueAs(classOf[StickinessModel])
val s = new Stickiness()
s.hostname = model.hostname
s.port = model.port
s.stopTime = model.stopTime
s.period = model.period
s
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,10 @@ trait BrokerLifecycleManagerComponentImpl extends BrokerLifecycleManagerComponen
broker.task.state = Broker.State.RUNNING
if (status.hasData && status.getData.size() > 0)
broker.task.endpoint = new Broker.Endpoint(status.getData.toStringUtf8)
broker.registerStart(broker.task.hostname)

val port: Integer = if (broker.task.endpoint != null) broker.task.endpoint.port else null
logger.info(s"Registering broker at ${broker.task.hostname}:${port}")
broker.registerStart(broker.task.hostname, port)
}

private[this] def onStopped(broker: Broker, status: TaskStatus, failed: Boolean): Unit = {
Expand Down
38 changes: 27 additions & 11 deletions src/test/ly/stealth/mesos/kafka/BrokerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -162,23 +162,27 @@ class BrokerTest extends KafkaMesosTestCase {
def matches_stickiness {
val host0 = "host0"
val host1 = "host1"
val resources = "ports:0..10"
val resources0 = "ports:0..10"
val resources1 = "ports:11..20"


BrokerTest.assertAccept(broker.matches(offer(host0, resources), new Date(0)))
BrokerTest.assertAccept(broker.matches(offer(host1, resources), new Date(0)))
BrokerTest.assertAccept(broker.matches(offer(host0, resources0), new Date(0)))
BrokerTest.assertAccept(broker.matches(offer(host1, resources0), new Date(0)))

broker.registerStart(host0)
broker.registerStart(host0, 5)
broker.registerStop(new Date(0))


BrokerTest.assertAccept(broker.matches(offer(host0, resources), new Date(0)))
val theOffer = offer(host1, resources)
BrokerTest.assertAccept(broker.matches(offer(host0, resources0), new Date(0)))
val theOffer = offer(host1, resources0)
assertEquals(
OfferResult.eventuallyMatch(
theOffer, broker,
"hostname != stickiness host", broker.stickiness.period.ms().toInt / 1000),
broker.matches(theOffer, new Date(0)))

// should still work even if sticky port unavailable
BrokerTest.assertAccept(broker.matches(offer(host0, resources1), new Date(0)))
}

@Test
Expand Down Expand Up @@ -349,6 +353,14 @@ class BrokerTest extends KafkaMesosTestCase {
assertEquals(100, broker.getSuitablePort(ranges("100..200")))
assertEquals(95, broker.getSuitablePort(ranges("0..90,95..96,101..200")))
assertEquals(96, broker.getSuitablePort(ranges("0..90,96,101..200")))

broker.registerStart("", 100)
broker.port = new Range("92..105")
assertEquals(100, broker.getSuitablePort(ranges("1..200")))

broker.registerStart("", 100)
broker.port = new Range("92..99")
assertEquals(92, broker.getSuitablePort(ranges("1..200")))
}

@Test
Expand Down Expand Up @@ -459,6 +471,7 @@ class BrokerTest extends KafkaMesosTestCase {
broker.log4jOptions = parseMap("b=2").toMap
broker.jvmOptions = "-Xms512m"

broker.stickiness.registerStart("localhost", 1234)
broker.failover.registerFailure(new Date())
broker.task = new Task("1", "slave", "executor", "host")

Expand Down Expand Up @@ -497,7 +510,7 @@ class BrokerTest extends KafkaMesosTestCase {
assertTrue(stickiness.matchesHostname("host0"))
assertTrue(stickiness.matchesHostname("host1"))

stickiness.registerStart("host0")
stickiness.registerStart("host0", null)
stickiness.registerStop(new Date(0))
assertTrue(stickiness.matchesHostname("host0"))
assertFalse(stickiness.matchesHostname("host1"))
Expand All @@ -508,7 +521,7 @@ class BrokerTest extends KafkaMesosTestCase {
val stickiness = new Stickiness()
assertEquals(stickiness.stickyTimeLeft(), 0)

stickiness.registerStart("host0")
stickiness.registerStart("host0", null)
stickiness.registerStop(new Date(0))
val stickyTimeSec = (stickiness.period.ms() / 1000).toInt
assertEquals(stickiness.stickyTimeLeft(new Date(0)), stickyTimeSec)
Expand All @@ -521,23 +534,25 @@ class BrokerTest extends KafkaMesosTestCase {
assertNull(stickiness.hostname)
assertNull(stickiness.stopTime)

stickiness.registerStart("host")
stickiness.registerStart("host", 1234)
assertEquals("host", stickiness.hostname)
assertEquals(1234, stickiness.port)
assertNull(stickiness.stopTime)

stickiness.registerStop(new Date(0))
assertEquals("host", stickiness.hostname)
assertEquals(new Date(0), stickiness.stopTime)

stickiness.registerStart("host1")
stickiness.registerStart("host1", 5678)
assertEquals("host1", stickiness.hostname)
assertEquals(5678, stickiness.port)
assertNull(stickiness.stopTime)
}

@Test
def Stickiness_toJson_fromJson {
val stickiness = new Stickiness()
stickiness.registerStart("localhost")
stickiness.registerStart("localhost", 1234)
stickiness.registerStop(new Date(0))

val read = JsonUtil.fromJson[Stickiness](JsonUtil.toJson(stickiness))
Expand Down Expand Up @@ -719,6 +734,7 @@ object BrokerTest {
if (checkNulls(expected, actual)) return

assertEquals(expected.period, actual.period)
assertEquals(expected.port, actual.port)
assertEquals(expected.stopTime, actual.stopTime)
assertEquals(expected.hostname, actual.hostname)
}
Expand Down
2 changes: 1 addition & 1 deletion src/test/ly/stealth/mesos/kafka/JsonTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class JsonTest {
b.task.endpoint = new Endpoint("host1:9092")
b.syslog = false
b.stickiness = new Stickiness(new Period("10m"))
b.stickiness.registerStart("host1")
b.stickiness.registerStart("host1", 1234)
b.log4jOptions = Map("k1" -> "v1", "k2" -> "v2")
b.options = Map("a" -> "1", "b" -> "2")
b.active = true
Expand Down
3 changes: 2 additions & 1 deletion src/test/resources/broker.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
"syslog": false,
"stickiness": {
"period": "10m",
"hostname": "host1"
"hostname": "host1",
"port": 1234
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leave this file as it is, its to test backwards compat with older (pre 0.10.0) serialized JSON.

},
"log4jOptions": "k1=v1,k2=v2",
"options": "a=1,b=2",
Expand Down