From 617fdcfd4520567ce52cdd8c9a901f959580647d Mon Sep 17 00:00:00 2001 From: Din Daniyarbekov Date: Fri, 16 Aug 2024 10:52:35 -0400 Subject: [PATCH] wip --- .../clustering/dynamo/DynamoClusterModule.kt | 2 +- .../dynamo/DynamoClusterWatcherTask.kt | 2 +- .../main/kotlin/misk/clustering/Cluster.kt | 2 +- .../misk/clustering/fake/FakeCluster.kt | 3 +- .../kubernetes/KubernetesClusterProvider.kt | 6 +- .../kubernetes/KubernetesClusterWatcher.kt | 9 +- .../clustering/kubernetes/KubernetesConfig.kt | 1 + .../misk/clustering/ClusterHashRingTest.kt | 16 +- .../misk/clustering/fake/FakeClusterTest.kt | 12 +- .../kubernetes/KubernetesClusterTest.kt | 191 ++++++++++++++---- 10 files changed, 183 insertions(+), 61 deletions(-) diff --git a/misk-clustering-dynamodb/src/main/kotlin/misk/clustering/dynamo/DynamoClusterModule.kt b/misk-clustering-dynamodb/src/main/kotlin/misk/clustering/dynamo/DynamoClusterModule.kt index 0425a9f8cad..a12e00b48de 100644 --- a/misk-clustering-dynamodb/src/main/kotlin/misk/clustering/dynamo/DynamoClusterModule.kt +++ b/misk-clustering-dynamodb/src/main/kotlin/misk/clustering/dynamo/DynamoClusterModule.kt @@ -15,7 +15,7 @@ import java.util.UUID class DynamoClusterModule @JvmOverloads constructor(private val config: DynamoClusterConfig = DynamoClusterConfig()) : KAbstractModule() { override fun configure() { - val defaultCluster = DefaultCluster(Cluster.Member(UUID.randomUUID().toString(), "invalid-ip")) + val defaultCluster = DefaultCluster(Cluster.Member(UUID.randomUUID().toString(), "invalid-ip", "invalid-deployment")) bind().toInstance(config) bind().toInstance(defaultCluster) bind().toInstance(defaultCluster) diff --git a/misk-clustering-dynamodb/src/main/kotlin/misk/clustering/dynamo/DynamoClusterWatcherTask.kt b/misk-clustering-dynamodb/src/main/kotlin/misk/clustering/dynamo/DynamoClusterWatcherTask.kt index 6a826a25880..ddecf487d9b 100644 --- a/misk-clustering-dynamodb/src/main/kotlin/misk/clustering/dynamo/DynamoClusterWatcherTask.kt +++ b/misk-clustering-dynamodb/src/main/kotlin/misk/clustering/dynamo/DynamoClusterWatcherTask.kt @@ -86,7 +86,7 @@ internal class DynamoClusterWatcherTask @Inject constructor( .build() for (page in table.scan(request).stream()) { for (item in page.items()) { - members.add(Member(item.name!!, "invalid-ip")) + members.add(Member(item.name!!, "invalid-ip", "invalid-deployment")) } } cluster.clusterChanged(membersBecomingReady = members, membersBecomingNotReady = prevMembers - members) diff --git a/misk-clustering/src/main/kotlin/misk/clustering/Cluster.kt b/misk-clustering/src/main/kotlin/misk/clustering/Cluster.kt index a041104107d..179be13a78a 100644 --- a/misk-clustering/src/main/kotlin/misk/clustering/Cluster.kt +++ b/misk-clustering/src/main/kotlin/misk/clustering/Cluster.kt @@ -8,7 +8,7 @@ typealias ClusterWatch = (Cluster.Changes) -> Unit * to monitor the state of its peers */ interface Cluster { - data class Member(val name: String, val ipAddress: String) + data class Member(val name: String, val ipAddress: String, val deploymentVersion: String) data class Changes @JvmOverloads constructor( val snapshot: Snapshot, diff --git a/misk-clustering/src/main/kotlin/misk/clustering/fake/FakeCluster.kt b/misk-clustering/src/main/kotlin/misk/clustering/fake/FakeCluster.kt index c28c8c38093..4e082057927 100644 --- a/misk-clustering/src/main/kotlin/misk/clustering/fake/FakeCluster.kt +++ b/misk-clustering/src/main/kotlin/misk/clustering/fake/FakeCluster.kt @@ -61,7 +61,8 @@ class FakeCluster internal constructor( companion object { const val SELF_NAME = "fake-self-node" const val SELF_IP = "10.0.0.1" - @JvmStatic val self = Cluster.Member(name = SELF_NAME, ipAddress = SELF_IP) + const val SELF_DEPLOYMENT = "fake-deployment" + @JvmStatic val self = Cluster.Member(name = SELF_NAME, ipAddress = SELF_IP, deploymentVersion = SELF_DEPLOYMENT) private val log = getLogger() } diff --git a/misk-clustering/src/main/kotlin/misk/clustering/kubernetes/KubernetesClusterProvider.kt b/misk-clustering/src/main/kotlin/misk/clustering/kubernetes/KubernetesClusterProvider.kt index ebf57af0429..42f75b97072 100644 --- a/misk-clustering/src/main/kotlin/misk/clustering/kubernetes/KubernetesClusterProvider.kt +++ b/misk-clustering/src/main/kotlin/misk/clustering/kubernetes/KubernetesClusterProvider.kt @@ -8,5 +8,9 @@ import com.google.inject.Provider internal class KubernetesClusterProvider @Inject internal constructor( private val config: KubernetesConfig ) : Provider { - override fun get() = DefaultCluster(Cluster.Member(config.my_pod_name, config.my_pod_ip)) + override fun get() = DefaultCluster(Cluster.Member( + config.my_pod_name, + config.my_pod_ip, + config.my_deployment_version, + )) } diff --git a/misk-clustering/src/main/kotlin/misk/clustering/kubernetes/KubernetesClusterWatcher.kt b/misk-clustering/src/main/kotlin/misk/clustering/kubernetes/KubernetesClusterWatcher.kt index 74e9fbfe57f..15bcd15d004 100644 --- a/misk-clustering/src/main/kotlin/misk/clustering/kubernetes/KubernetesClusterWatcher.kt +++ b/misk-clustering/src/main/kotlin/misk/clustering/kubernetes/KubernetesClusterWatcher.kt @@ -128,7 +128,14 @@ internal class KubernetesClusterWatcher @Inject internal constructor( } } -private val V1Pod.asClusterMember get() = Cluster.Member(metadata!!.name!!, status!!.podIP ?: "") +private val V1Pod.asClusterMember: Cluster.Member + get() { + return Cluster.Member( + name = metadata!!.name!!, + ipAddress = status!!.podIP ?: "", + deploymentVersion = metadata?.labels?.get("tags.datadoghq.com/version") ?: "" + ) +} private val V1Pod.isReady: Boolean get() { diff --git a/misk-clustering/src/main/kotlin/misk/clustering/kubernetes/KubernetesConfig.kt b/misk-clustering/src/main/kotlin/misk/clustering/kubernetes/KubernetesConfig.kt index b3062b787e3..7fb9431ef85 100644 --- a/misk-clustering/src/main/kotlin/misk/clustering/kubernetes/KubernetesConfig.kt +++ b/misk-clustering/src/main/kotlin/misk/clustering/kubernetes/KubernetesConfig.kt @@ -12,6 +12,7 @@ data class KubernetesConfig @JvmOverloads constructor( val my_pod_namespace: String = System.getenv("MY_POD_NAMESPACE") ?: "", val my_pod_name: String = System.getenv("MY_POD_NAME") ?: "", val my_pod_ip: String = System.getenv("MY_POD_IP") ?: "", + val my_deployment_version: String = System.getenv("MY_POD_DEPLOYMENT") ?: "", val clustering_pod_label_selector: String? = null, // NB(mmihic): kubernetes_watch_read_timeout needs to be long to avoid timeouts during watch val kubernetes_watch_read_timeout: Long = 60, diff --git a/misk-clustering/src/test/kotlin/misk/clustering/ClusterHashRingTest.kt b/misk-clustering/src/test/kotlin/misk/clustering/ClusterHashRingTest.kt index 1dd23fbdb0a..399afe78845 100644 --- a/misk-clustering/src/test/kotlin/misk/clustering/ClusterHashRingTest.kt +++ b/misk-clustering/src/test/kotlin/misk/clustering/ClusterHashRingTest.kt @@ -9,16 +9,16 @@ import org.junit.jupiter.api.assertThrows internal class ClusterHashRingTest { @Test fun singleNode() { - val zork = Cluster.Member("zork", "192.49.168.23") + val zork = Cluster.Member("zork", "192.49.168.23", "fakeDeployment") val hashRing = ClusterHashRing(members = setOf(zork), hashFn = Hashing.murmur3_32(0)) assertThat(listOf("foo", "bar", "zed").map { hashRing[it] }).containsExactly(zork, zork, zork) } @Test fun multipleNodes() { - val zork = Cluster.Member("zork", "192.49.168.23") - val mork = Cluster.Member("mork", "192.49.168.24") - val quark = Cluster.Member("quark", "192.49.168.25") + val zork = Cluster.Member("zork", "192.49.168.23", "fakeDeployment") + val mork = Cluster.Member("mork", "192.49.168.24", "fakeDeployment") + val quark = Cluster.Member("quark", "192.49.168.25", "fakeDeployment") // First version of hash ring val hashRing1 = ClusterHashRing( @@ -40,7 +40,7 @@ internal class ClusterHashRingTest { .containsExactly(zork, quark, zork, zork) // Add a new member, should not remap resources unnecessarily - val bork = Cluster.Member("bork", "192.49.168.26") + val bork = Cluster.Member("bork", "192.49.168.26", "fakeDeployment") val hashRing3 = ClusterHashRing( members = setOf(zork, quark, bork), hashFn = Hashing.murmur3_32(0) @@ -67,9 +67,9 @@ internal class ClusterHashRingTest { (c, INT_MAX] => a This test ensures that each range ends up mapping to the expected vnode. */ - val a = Cluster.Member("a", "192.49.168.23") - val b = Cluster.Member("b", "192.49.168.24") - val c = Cluster.Member("c", "192.49.168.25") + val a = Cluster.Member("a", "192.49.168.23", "fakeDeployment") + val b = Cluster.Member("b", "192.49.168.24", "fakeDeployment") + val c = Cluster.Member("c", "192.49.168.25", "fakeDeployment") // First version of hash ring val hashRing = ClusterHashRing( diff --git a/misk-clustering/src/test/kotlin/misk/clustering/fake/FakeClusterTest.kt b/misk-clustering/src/test/kotlin/misk/clustering/fake/FakeClusterTest.kt index 2771a017efc..01dfdcd5645 100644 --- a/misk-clustering/src/test/kotlin/misk/clustering/fake/FakeClusterTest.kt +++ b/misk-clustering/src/test/kotlin/misk/clustering/fake/FakeClusterTest.kt @@ -19,10 +19,10 @@ internal class FakeClusterTest { @Inject lateinit var cluster: FakeCluster @Test fun clusterRespondsToChanges() { - cluster.clusterChanged(membersBecomingReady = setOf(Cluster.Member("blerp", "192.168.12.3"))) + cluster.clusterChanged(membersBecomingReady = setOf(Cluster.Member("blerp", "192.168.12.3", "fakeDeployment"))) assertThat(cluster.snapshot.readyMembers) - .containsExactlyInAnyOrder(Cluster.Member("blerp", "192.168.12.3")) - cluster.clusterChanged(membersBecomingNotReady = setOf(Cluster.Member("blerp", "192.168.12.3"))) + .containsExactlyInAnyOrder(Cluster.Member("blerp", "192.168.12.3", "fakeDeployment")) + cluster.clusterChanged(membersBecomingNotReady = setOf(Cluster.Member("blerp", "192.168.12.3", "fakeDeployment"))) assertThat(cluster.snapshot.readyMembers).isEmpty() } @@ -32,14 +32,14 @@ internal class FakeClusterTest { FakeCluster.self ) - cluster.resourceMapper.setDefaultMapping(Cluster.Member("zork", "192.168.12.0")) - cluster.resourceMapper.addMapping("my-object", Cluster.Member("bork", "192.168.12.1")) + cluster.resourceMapper.setDefaultMapping(Cluster.Member("zork", "192.168.12.0", "fakeDeployment")) + cluster.resourceMapper.addMapping("my-object", Cluster.Member("bork", "192.168.12.1", "fakeDeployment")) assertThat(cluster.snapshot.resourceMapper["my-object"].name).isEqualTo("bork") assertThat(cluster.snapshot.resourceMapper["other-object"].name).isEqualTo("zork") // Ensure resource mapper remains the same even through cluster changes - cluster.clusterChanged(membersBecomingReady = setOf(Cluster.Member("blerp", "192.168.12.3"))) + cluster.clusterChanged(membersBecomingReady = setOf(Cluster.Member("blerp", "192.168.12.3", "fakeDeployment"))) assertThat(cluster.snapshot.resourceMapper["my-object"].name).isEqualTo("bork") assertThat(cluster.snapshot.resourceMapper["other-object"].name).isEqualTo("zork") diff --git a/misk-clustering/src/test/kotlin/misk/clustering/kubernetes/KubernetesClusterTest.kt b/misk-clustering/src/test/kotlin/misk/clustering/kubernetes/KubernetesClusterTest.kt index 4ff7b97201b..74d83fda754 100644 --- a/misk-clustering/src/test/kotlin/misk/clustering/kubernetes/KubernetesClusterTest.kt +++ b/misk-clustering/src/test/kotlin/misk/clustering/kubernetes/KubernetesClusterTest.kt @@ -34,7 +34,8 @@ internal class KubernetesClusterTest { KubernetesConfig( my_pod_namespace = TEST_NAMESPACE, my_pod_name = TEST_SELF_NAME, - my_pod_ip = TEST_SELF_IP + my_pod_ip = TEST_SELF_IP, + my_deployment_version = TEST_DEPLOYMENT ) ) ) @@ -60,8 +61,8 @@ internal class KubernetesClusterTest { val changes = mutableListOf() cluster.watch { changes.add(it) } - handleWatch(CHANGE_TYPE_ADDED, newPod(TEST_SELF_NAME, true, TEST_SELF_IP)) - handleWatch(CHANGE_TYPE_MODIFIED, newPod(TEST_SELF_NAME, false, TEST_SELF_IP)) + handleWatch(CHANGE_TYPE_ADDED, newPod(TEST_SELF_NAME, true, TEST_SELF_IP, TEST_DEPLOYMENT)) + handleWatch(CHANGE_TYPE_MODIFIED, newPod(TEST_SELF_NAME, false, TEST_SELF_IP, TEST_DEPLOYMENT)) cluster.syncPoint { ready.countDown() } assertThat(ready.await(5, TimeUnit.SECONDS)).isTrue() @@ -101,8 +102,8 @@ internal class KubernetesClusterTest { val changes = mutableListOf() cluster.watch { changes.add(it) } - handleWatch("ADDED", newPod("larry-blerp", true, "10.0.0.3")) - handleWatch("ADDED", newPod("larry-blerp2", true, "10.0.0.4")) + handleWatch("ADDED", newPod("larry-blerp", true, "10.0.0.3", deployment = "currentDeployment")) + handleWatch("ADDED", newPod("larry-blerp2", true, "10.0.0.4", deployment = "currentDeployment")) cluster.syncPoint { ready.countDown() } assertThat(ready.await(5, TimeUnit.SECONDS)).isTrue() @@ -119,27 +120,27 @@ internal class KubernetesClusterTest { snapshot = Cluster.Snapshot( self = expectedSelf, selfReady = false, - readyMembers = setOf(Cluster.Member("larry-blerp", "10.0.0.3")), - resourceMapper = ClusterHashRing(setOf(Cluster.Member("larry-blerp", "10.0.0.3"))) + readyMembers = setOf(Cluster.Member("larry-blerp", "10.0.0.3", "currentDeployment")), + resourceMapper = ClusterHashRing(setOf(Cluster.Member("larry-blerp", "10.0.0.3", "currentDeployment"))) ), - added = setOf(Cluster.Member("larry-blerp", "10.0.0.3")) + added = setOf(Cluster.Member("larry-blerp", "10.0.0.3", "currentDeployment")) ), Cluster.Changes( snapshot = Cluster.Snapshot( self = expectedSelf, selfReady = false, readyMembers = setOf( - Cluster.Member("larry-blerp", "10.0.0.3"), - Cluster.Member("larry-blerp2", "10.0.0.4") + Cluster.Member("larry-blerp", "10.0.0.3", "currentDeployment"), + Cluster.Member("larry-blerp2", "10.0.0.4", "currentDeployment") ), resourceMapper = ClusterHashRing( setOf( - Cluster.Member("larry-blerp", "10.0.0.3"), - Cluster.Member("larry-blerp2", "10.0.0.4") + Cluster.Member("larry-blerp", "10.0.0.3", "currentDeployment"), + Cluster.Member("larry-blerp2", "10.0.0.4", "currentDeployment") ) ) ), - added = setOf(Cluster.Member("larry-blerp2", "10.0.0.4")) + added = setOf(Cluster.Member("larry-blerp2", "10.0.0.4", "currentDeployment")) ) ) } @@ -149,8 +150,8 @@ internal class KubernetesClusterTest { val ready = CountDownLatch(1) // Start with members - handleWatch(CHANGE_TYPE_ADDED, newPod("larry-blerp", true, "10.0.0.3")) - handleWatch(CHANGE_TYPE_ADDED, newPod("larry-blerp2", true, "10.0.0.4")) + handleWatch(CHANGE_TYPE_ADDED, newPod("larry-blerp", true, "10.0.0.3", "currentDeployment")) + handleWatch(CHANGE_TYPE_ADDED, newPod("larry-blerp2", true, "10.0.0.4", "currentDeployment")) // Explicitly remove a member cluster.watch { changes.add(it) } @@ -164,13 +165,13 @@ internal class KubernetesClusterTest { self = expectedSelf, selfReady = false, readyMembers = setOf( - Cluster.Member("larry-blerp", "10.0.0.3"), - Cluster.Member("larry-blerp2", "10.0.0.4") + Cluster.Member("larry-blerp", "10.0.0.3", "currentDeployment"), + Cluster.Member("larry-blerp2", "10.0.0.4", "currentDeployment") ), resourceMapper = ClusterHashRing( setOf( - Cluster.Member("larry-blerp", "10.0.0.3"), - Cluster.Member("larry-blerp2", "10.0.0.4") + Cluster.Member("larry-blerp", "10.0.0.3", "currentDeployment"), + Cluster.Member("larry-blerp2", "10.0.0.4", "currentDeployment") ) ) ) @@ -179,14 +180,14 @@ internal class KubernetesClusterTest { snapshot = Cluster.Snapshot( self = expectedSelf, selfReady = false, - readyMembers = setOf(Cluster.Member("larry-blerp2", "10.0.0.4")), + readyMembers = setOf(Cluster.Member("larry-blerp2", "10.0.0.4", "currentDeployment")), resourceMapper = ClusterHashRing( setOf( - Cluster.Member("larry-blerp2", "10.0.0.4") + Cluster.Member("larry-blerp2", "10.0.0.4", "currentDeployment") ) ) ), - removed = setOf(Cluster.Member("larry-blerp", "")) + removed = setOf(Cluster.Member("larry-blerp", "", "")) ) ) } @@ -240,12 +241,12 @@ internal class KubernetesClusterTest { val changes = mutableListOf() // Start as an existing member - handleWatch(CHANGE_TYPE_ADDED, newPod("larry-blerp", true, "10.0.0.3")) - handleWatch(CHANGE_TYPE_ADDED, newPod("larry-blerp2", true, "10.0.0.4")) + handleWatch(CHANGE_TYPE_ADDED, newPod("larry-blerp", true, "10.0.0.3", "currentDeployment")) + handleWatch(CHANGE_TYPE_ADDED, newPod("larry-blerp2", true, "10.0.0.4", "currentDeployment")) // Transition to not ready - should remove from the list cluster.watch { changes.add(it) } - handleWatch(CHANGE_TYPE_MODIFIED, newPod("larry-blerp", false, "10.0.0.3")) + handleWatch(CHANGE_TYPE_MODIFIED, newPod("larry-blerp", false, "10.0.0.3", "currentDeployment")) cluster.syncPoint { ready.countDown() } assertThat(ready.await(5, TimeUnit.SECONDS)).isTrue() @@ -255,13 +256,13 @@ internal class KubernetesClusterTest { self = expectedSelf, selfReady = false, readyMembers = setOf( - Cluster.Member("larry-blerp", "10.0.0.3"), - Cluster.Member("larry-blerp2", "10.0.0.4") + Cluster.Member("larry-blerp", "10.0.0.3", "currentDeployment"), + Cluster.Member("larry-blerp2", "10.0.0.4", "currentDeployment") ), resourceMapper = ClusterHashRing( setOf( - Cluster.Member("larry-blerp", "10.0.0.3"), - Cluster.Member("larry-blerp2", "10.0.0.4") + Cluster.Member("larry-blerp", "10.0.0.3", "currentDeployment"), + Cluster.Member("larry-blerp2", "10.0.0.4", "currentDeployment") ) ) ) @@ -270,14 +271,14 @@ internal class KubernetesClusterTest { snapshot = Cluster.Snapshot( self = expectedSelf, selfReady = false, - readyMembers = setOf(Cluster.Member("larry-blerp2", "10.0.0.4")), + readyMembers = setOf(Cluster.Member("larry-blerp2", "10.0.0.4", "currentDeployment")), resourceMapper = ClusterHashRing( setOf( - Cluster.Member("larry-blerp2", "10.0.0.4") + Cluster.Member("larry-blerp2", "10.0.0.4", "currentDeployment") ) ) ), - removed = setOf(Cluster.Member("larry-blerp", "10.0.0.3")) + removed = setOf(Cluster.Member("larry-blerp", "10.0.0.3", "currentDeployment")) ) ) } @@ -302,13 +303,13 @@ internal class KubernetesClusterTest { self = expectedSelf, selfReady = false, readyMembers = setOf( - Cluster.Member("larry-blerp", "10.0.0.3"), - Cluster.Member("larry-blerp2", "10.0.0.4") + Cluster.Member("larry-blerp", "10.0.0.3", ""), + Cluster.Member("larry-blerp2", "10.0.0.4", "") ), resourceMapper = ClusterHashRing( setOf( - Cluster.Member("larry-blerp", "10.0.0.3"), - Cluster.Member("larry-blerp2", "10.0.0.4") + Cluster.Member("larry-blerp", "10.0.0.3", ""), + Cluster.Member("larry-blerp2", "10.0.0.4", "") ) ) ) @@ -317,23 +318,127 @@ internal class KubernetesClusterTest { snapshot = Cluster.Snapshot( self = expectedSelf, selfReady = false, - readyMembers = setOf(Cluster.Member("larry-blerp2", "10.0.0.4")), + readyMembers = setOf(Cluster.Member("larry-blerp2", "10.0.0.4", "")), resourceMapper = ClusterHashRing( setOf( - Cluster.Member("larry-blerp2", "10.0.0.4") + Cluster.Member("larry-blerp2", "10.0.0.4", "") ) ) ), - removed = setOf(Cluster.Member("larry-blerp", "")) + removed = setOf(Cluster.Member("larry-blerp", "", "")) ) ) } + @Test fun deploymentChangesWatcherUpdated() { + val ready = CountDownLatch(1) + val changes = mutableListOf() + + // Start as an existing member + handleWatch(CHANGE_TYPE_ADDED, newPod("larry-blerp", true, "10.0.0.3", "oldDeployment")) + handleWatch(CHANGE_TYPE_ADDED, newPod("larry-blerp2", true, "10.0.0.4", "oldDeployment")) + + cluster.watch { changes.add(it) } + handleWatch(CHANGE_TYPE_DELETED, newPod("larry-blerp", true, "10.0.0.3", "oldDeployment")) + handleWatch(CHANGE_TYPE_DELETED, newPod("larry-blerp2", true, "10.0.0.4", "oldDeployment")) + + handleWatch(CHANGE_TYPE_ADDED, newPod("harry-blerp", true, "10.0.0.3", "newDeployment")) + handleWatch(CHANGE_TYPE_ADDED, newPod("harry-blerp2", true, "10.0.0.4", "newDeployment")) + + cluster.syncPoint { ready.countDown() } + assertThat(ready.await(5, TimeUnit.SECONDS)).isTrue() + + assertThat(changes).containsExactly( + Cluster.Changes( + snapshot = Cluster.Snapshot( + self = expectedSelf, + selfReady = false, + readyMembers = setOf( + Cluster.Member("larry-blerp", "10.0.0.3", "oldDeployment"), + Cluster.Member("larry-blerp2", "10.0.0.4", "oldDeployment") + ), + resourceMapper = ClusterHashRing( + setOf( + Cluster.Member("larry-blerp", "10.0.0.3", "oldDeployment"), + Cluster.Member("larry-blerp2", "10.0.0.4", "oldDeployment") + ) + ) + ) + ), + Cluster.Changes( + snapshot = Cluster.Snapshot( + self = expectedSelf, + selfReady = false, + readyMembers = setOf( + Cluster.Member("larry-blerp2", "10.0.0.4", "oldDeployment") + ), + resourceMapper = ClusterHashRing( + setOf(Cluster.Member("larry-blerp2", "10.0.0.4", "oldDeployment")) + ) + ), + removed = setOf( + Cluster.Member("larry-blerp", "10.0.0.3", "oldDeployment") + ) + ), + Cluster.Changes( + snapshot = Cluster.Snapshot( + self = expectedSelf, + selfReady = false, + readyMembers = setOf(), + resourceMapper = ClusterHashRing(setOf()) + ), + removed = setOf( + Cluster.Member("larry-blerp2", "10.0.0.4", "oldDeployment") + ) + ), + Cluster.Changes( + snapshot = Cluster.Snapshot( + self = expectedSelf, + selfReady = false, + readyMembers = setOf( + Cluster.Member("harry-blerp", "10.0.0.3", "newDeployment") + ), + resourceMapper = ClusterHashRing( + setOf(Cluster.Member("harry-blerp", "10.0.0.3", "newDeployment")) + ) + ), + added = setOf( + Cluster.Member("harry-blerp", "10.0.0.3", "newDeployment") + ) + ), + Cluster.Changes( + snapshot = Cluster.Snapshot( + self = expectedSelf, + selfReady = false, + readyMembers = setOf( + Cluster.Member("harry-blerp", "10.0.0.3", "newDeployment"), + Cluster.Member("harry-blerp2", "10.0.0.4", "newDeployment") + ), + resourceMapper = ClusterHashRing( + setOf( + Cluster.Member("harry-blerp", "10.0.0.3", "newDeployment"), + Cluster.Member("harry-blerp2", "10.0.0.4", "newDeployment") + ) + ) + ), + added = setOf( + Cluster.Member("harry-blerp2", "10.0.0.4", "newDeployment") + ) + ) + ) + } + + private fun handleWatch(type: String, pod: V1Pod) { Watches.newResponse(type, pod).applyTo(cluster) } - private fun newPod(name: String, isReady: Boolean = false, ipAddress: String? = null): V1Pod { + private fun newPod( + name: String, + isReady: Boolean = false, + ipAddress: String? = null, + deployment: String = "", + ): V1Pod { val containerStatus = V1ContainerStatus() containerStatus.ready = isReady containerStatus.name = TEST_NAMESPACE @@ -342,6 +447,9 @@ internal class KubernetesClusterTest { pod.metadata = V1ObjectMeta() pod.metadata!!.namespace = TEST_NAMESPACE pod.metadata!!.name = name + pod.metadata!!.labels = mutableMapOf() + pod.metadata!!.labels!!["tags.datadoghq.com/version"] = deployment + pod.status = V1PodStatus() pod.status!!.containerStatuses = listOf(containerStatus) pod.status!!.podIP = ipAddress @@ -352,7 +460,8 @@ internal class KubernetesClusterTest { const val TEST_NAMESPACE = "larry" const val TEST_SELF_NAME = "larry-76485b7568-l5rmm" const val TEST_SELF_IP = "10.133.66.206" + const val TEST_DEPLOYMENT = "deployment-test" - val expectedSelf = Cluster.Member(TEST_SELF_NAME, TEST_SELF_IP) + val expectedSelf = Cluster.Member(TEST_SELF_NAME, TEST_SELF_IP, TEST_DEPLOYMENT) } }