From beeb6e56319bf6b6a97225a364bea7027d5253ee Mon Sep 17 00:00:00 2001 From: Di Shang Date: Fri, 10 Mar 2017 17:32:32 +1100 Subject: [PATCH] add sticky port --- .../main/ly/stealth/mesos/kafka/Broker.scala | 44 ++++++++++++------- .../stealth/mesos/kafka/cli/BrokerCli.scala | 1 + .../ly/stealth/mesos/kafka/json/Model.scala | 5 ++- .../scheduler/BrokerLifecycleManager.scala | 5 ++- .../ly/stealth/mesos/kafka/BrokerTest.scala | 38 +++++++++++----- .../ly/stealth/mesos/kafka/JsonTest.scala | 2 +- src/test/resources/broker.json | 3 +- 7 files changed, 67 insertions(+), 31 deletions(-) diff --git a/src/scala/main/ly/stealth/mesos/kafka/Broker.scala b/src/scala/main/ly/stealth/mesos/kafka/Broker.scala index e3dda86..99c0d4f 100644 --- a/src/scala/main/ly/stealth/mesos/kafka/Broker.scala +++ b/src/scala/main/ly/stealth/mesos/kafka/Broker.scala @@ -169,20 +169,32 @@ class Broker(val id: Int = 0) { ) } - private[kafka] def getSuitablePort(ports: util.List[Range]): Int = { - if (ports.isEmpty) return -1 - - val ports_ = ports.sortBy(r => r.start) - if (port == null) - return ports_.get(0).start - - for (range <- ports_) { - val overlap = range.overlap(port) - if (overlap != null) - return overlap.start + private[kafka] def getSuitablePort(availablePorts: util.List[Range]): Int = { + // no available ports to choose from + if (availablePorts.isEmpty) return -1 + + // compute allowed usable ports based on broker config and offer + val usablePorts: List[Range] = + if (port == null) availablePorts.toList.sortBy(_.start) + else availablePorts.toList.flatMap { range => + if (range.overlap(port) == null) None else Some(range.overlap(port)) + }.sortBy(_.start) + + // no port usable + if (usablePorts.isEmpty) return -1 + + // try to stick to the previous port if possible + if (stickiness.port != null) { + val preferedPort = new Range(stickiness.port) + for (range <- usablePorts) { + val found = range.overlap(preferedPort) + if (found != null) + return found.start + } } - -1 + // else return first usable port + return usablePorts.get(0).start } /* @@ -198,8 +210,8 @@ class Broker(val id: Int = 0) { def shouldStop: Boolean = !active && task != null && !task.stopping - def registerStart(hostname: String): Unit = { - stickiness.registerStart(hostname) + def registerStart(hostname: String, port: Integer): Unit = { + stickiness.registerStart(hostname, port) failover.resetFailures() } @@ -289,12 +301,14 @@ object Broker { class Stickiness(_period: Period = new Period("10m")) { var period: Period = _period @volatile var hostname: String = null + @volatile var port: Integer = null @volatile var stopTime: Date = null def expires: Date = if (stopTime != null) new Date(stopTime.getTime + period.ms) else null - def registerStart(hostname: String): Unit = { + def registerStart(hostname: String, port: Integer): Unit = { this.hostname = hostname + this.port = port stopTime = null } diff --git a/src/scala/main/ly/stealth/mesos/kafka/cli/BrokerCli.scala b/src/scala/main/ly/stealth/mesos/kafka/cli/BrokerCli.scala index bb47848..a7a1da5 100644 --- a/src/scala/main/ly/stealth/mesos/kafka/cli/BrokerCli.scala +++ b/src/scala/main/ly/stealth/mesos/kafka/cli/BrokerCli.scala @@ -489,6 +489,7 @@ trait BrokerCli { var stickiness = "stickiness:" stickiness += " period:" + broker.stickiness.period if (broker.stickiness.hostname != null) stickiness += ", hostname:" + broker.stickiness.hostname + if (broker.stickiness.port != null) stickiness += ", port:" + broker.stickiness.port if (broker.stickiness.stopTime != null) stickiness += ", expires:" + Repr.dateTime(broker.stickiness.expires) printLine(stickiness, indent) diff --git a/src/scala/main/ly/stealth/mesos/kafka/json/Model.scala b/src/scala/main/ly/stealth/mesos/kafka/json/Model.scala index e81f2a8..6cf2682 100644 --- a/src/scala/main/ly/stealth/mesos/kafka/json/Model.scala +++ b/src/scala/main/ly/stealth/mesos/kafka/json/Model.scala @@ -175,11 +175,11 @@ class RangeDeserializer extends StdDeserializer[Range](classOf[Range]) { } } -case class StickinessModel(period: Period, stopTime: Date, hostname: String) +case class StickinessModel(period: Period, stopTime: Date, hostname: String, port: Integer) class StickinessSerializer extends StdSerializer[Stickiness](classOf[Stickiness]) { override def serialize(s: Stickiness, gen: JsonGenerator, provider: SerializerProvider): Unit = { - provider.defaultSerializeValue(StickinessModel(s.period, s.stopTime, s.hostname), gen) + provider.defaultSerializeValue(StickinessModel(s.period, s.stopTime, s.hostname, s.port), gen) } } @@ -188,6 +188,7 @@ class StickinessDeserializer extends StdDeserializer[Stickiness](classOf[Stickin val model = p.readValueAs(classOf[StickinessModel]) val s = new Stickiness() s.hostname = model.hostname + s.port = model.port s.stopTime = model.stopTime s.period = model.period s diff --git a/src/scala/main/ly/stealth/mesos/kafka/scheduler/BrokerLifecycleManager.scala b/src/scala/main/ly/stealth/mesos/kafka/scheduler/BrokerLifecycleManager.scala index c203910..1905b84 100644 --- a/src/scala/main/ly/stealth/mesos/kafka/scheduler/BrokerLifecycleManager.scala +++ b/src/scala/main/ly/stealth/mesos/kafka/scheduler/BrokerLifecycleManager.scala @@ -198,7 +198,10 @@ trait BrokerLifecycleManagerComponentImpl extends BrokerLifecycleManagerComponen broker.task.state = Broker.State.RUNNING if (status.hasData && status.getData.size() > 0) broker.task.endpoint = new Broker.Endpoint(status.getData.toStringUtf8) - broker.registerStart(broker.task.hostname) + + val port: Integer = if (broker.task.endpoint != null) broker.task.endpoint.port else null + logger.info(s"Registering broker at ${broker.task.hostname}:${port}") + broker.registerStart(broker.task.hostname, port) } private[this] def onStopped(broker: Broker, status: TaskStatus, failed: Boolean): Unit = { diff --git a/src/test/ly/stealth/mesos/kafka/BrokerTest.scala b/src/test/ly/stealth/mesos/kafka/BrokerTest.scala index 2d9591e..bf61b4a 100644 --- a/src/test/ly/stealth/mesos/kafka/BrokerTest.scala +++ b/src/test/ly/stealth/mesos/kafka/BrokerTest.scala @@ -162,23 +162,27 @@ class BrokerTest extends KafkaMesosTestCase { def matches_stickiness { val host0 = "host0" val host1 = "host1" - val resources = "ports:0..10" + val resources0 = "ports:0..10" + val resources1 = "ports:11..20" - BrokerTest.assertAccept(broker.matches(offer(host0, resources), new Date(0))) - BrokerTest.assertAccept(broker.matches(offer(host1, resources), new Date(0))) + BrokerTest.assertAccept(broker.matches(offer(host0, resources0), new Date(0))) + BrokerTest.assertAccept(broker.matches(offer(host1, resources0), new Date(0))) - broker.registerStart(host0) + broker.registerStart(host0, 5) broker.registerStop(new Date(0)) - BrokerTest.assertAccept(broker.matches(offer(host0, resources), new Date(0))) - val theOffer = offer(host1, resources) + BrokerTest.assertAccept(broker.matches(offer(host0, resources0), new Date(0))) + val theOffer = offer(host1, resources0) assertEquals( OfferResult.eventuallyMatch( theOffer, broker, "hostname != stickiness host", broker.stickiness.period.ms().toInt / 1000), broker.matches(theOffer, new Date(0))) + + // should still work even if sticky port unavailable + BrokerTest.assertAccept(broker.matches(offer(host0, resources1), new Date(0))) } @Test @@ -349,6 +353,14 @@ class BrokerTest extends KafkaMesosTestCase { assertEquals(100, broker.getSuitablePort(ranges("100..200"))) assertEquals(95, broker.getSuitablePort(ranges("0..90,95..96,101..200"))) assertEquals(96, broker.getSuitablePort(ranges("0..90,96,101..200"))) + + broker.registerStart("", 100) + broker.port = new Range("92..105") + assertEquals(100, broker.getSuitablePort(ranges("1..200"))) + + broker.registerStart("", 100) + broker.port = new Range("92..99") + assertEquals(92, broker.getSuitablePort(ranges("1..200"))) } @Test @@ -459,6 +471,7 @@ class BrokerTest extends KafkaMesosTestCase { broker.log4jOptions = parseMap("b=2").toMap broker.jvmOptions = "-Xms512m" + broker.stickiness.registerStart("localhost", 1234) broker.failover.registerFailure(new Date()) broker.task = new Task("1", "slave", "executor", "host") @@ -497,7 +510,7 @@ class BrokerTest extends KafkaMesosTestCase { assertTrue(stickiness.matchesHostname("host0")) assertTrue(stickiness.matchesHostname("host1")) - stickiness.registerStart("host0") + stickiness.registerStart("host0", null) stickiness.registerStop(new Date(0)) assertTrue(stickiness.matchesHostname("host0")) assertFalse(stickiness.matchesHostname("host1")) @@ -508,7 +521,7 @@ class BrokerTest extends KafkaMesosTestCase { val stickiness = new Stickiness() assertEquals(stickiness.stickyTimeLeft(), 0) - stickiness.registerStart("host0") + stickiness.registerStart("host0", null) stickiness.registerStop(new Date(0)) val stickyTimeSec = (stickiness.period.ms() / 1000).toInt assertEquals(stickiness.stickyTimeLeft(new Date(0)), stickyTimeSec) @@ -521,23 +534,25 @@ class BrokerTest extends KafkaMesosTestCase { assertNull(stickiness.hostname) assertNull(stickiness.stopTime) - stickiness.registerStart("host") + stickiness.registerStart("host", 1234) assertEquals("host", stickiness.hostname) + assertEquals(1234, stickiness.port) assertNull(stickiness.stopTime) stickiness.registerStop(new Date(0)) assertEquals("host", stickiness.hostname) assertEquals(new Date(0), stickiness.stopTime) - stickiness.registerStart("host1") + stickiness.registerStart("host1", 5678) assertEquals("host1", stickiness.hostname) + assertEquals(5678, stickiness.port) assertNull(stickiness.stopTime) } @Test def Stickiness_toJson_fromJson { val stickiness = new Stickiness() - stickiness.registerStart("localhost") + stickiness.registerStart("localhost", 1234) stickiness.registerStop(new Date(0)) val read = JsonUtil.fromJson[Stickiness](JsonUtil.toJson(stickiness)) @@ -719,6 +734,7 @@ object BrokerTest { if (checkNulls(expected, actual)) return assertEquals(expected.period, actual.period) + assertEquals(expected.port, actual.port) assertEquals(expected.stopTime, actual.stopTime) assertEquals(expected.hostname, actual.hostname) } diff --git a/src/test/ly/stealth/mesos/kafka/JsonTest.scala b/src/test/ly/stealth/mesos/kafka/JsonTest.scala index 9ceb86b..d736a18 100644 --- a/src/test/ly/stealth/mesos/kafka/JsonTest.scala +++ b/src/test/ly/stealth/mesos/kafka/JsonTest.scala @@ -55,7 +55,7 @@ class JsonTest { b.task.endpoint = new Endpoint("host1:9092") b.syslog = false b.stickiness = new Stickiness(new Period("10m")) - b.stickiness.registerStart("host1") + b.stickiness.registerStart("host1", 1234) b.log4jOptions = Map("k1" -> "v1", "k2" -> "v2") b.options = Map("a" -> "1", "b" -> "2") b.active = true diff --git a/src/test/resources/broker.json b/src/test/resources/broker.json index db3d57d..52a7540 100644 --- a/src/test/resources/broker.json +++ b/src/test/resources/broker.json @@ -19,7 +19,8 @@ "syslog": false, "stickiness": { "period": "10m", - "hostname": "host1" + "hostname": "host1", + "port": 1234 }, "log4jOptions": "k1=v1,k2=v2", "options": "a=1,b=2",