Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding deployment version on Cluster.Member #3371

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import java.util.UUID

class DynamoClusterModule @JvmOverloads constructor(private val config: DynamoClusterConfig = DynamoClusterConfig()) : KAbstractModule() {
override fun configure() {
val defaultCluster = DefaultCluster(Cluster.Member(UUID.randomUUID().toString(), "invalid-ip"))
val defaultCluster = DefaultCluster(Cluster.Member(UUID.randomUUID().toString(), "invalid-ip", "invalid-deployment"))
bind<DynamoClusterConfig>().toInstance(config)
bind<Cluster>().toInstance(defaultCluster)
bind<DefaultCluster>().toInstance(defaultCluster)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ internal class DynamoClusterWatcherTask @Inject constructor(
.build()
for (page in table.scan(request).stream()) {
for (item in page.items()) {
members.add(Member(item.name!!, "invalid-ip"))
members.add(Member(item.name!!, "invalid-ip", "invalid-deployment"))
}
}
cluster.clusterChanged(membersBecomingReady = members, membersBecomingNotReady = prevMembers - members)
Expand Down
2 changes: 1 addition & 1 deletion misk-clustering/src/main/kotlin/misk/clustering/Cluster.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ typealias ClusterWatch = (Cluster.Changes) -> Unit
* to monitor the state of its peers
*/
interface Cluster {
data class Member(val name: String, val ipAddress: String)
data class Member(val name: String, val ipAddress: String, val deploymentVersion: String)

data class Changes @JvmOverloads constructor(
val snapshot: Snapshot,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ class FakeCluster internal constructor(
companion object {
const val SELF_NAME = "fake-self-node"
const val SELF_IP = "10.0.0.1"
@JvmStatic val self = Cluster.Member(name = SELF_NAME, ipAddress = SELF_IP)
const val SELF_DEPLOYMENT = "fake-deployment"
@JvmStatic val self = Cluster.Member(name = SELF_NAME, ipAddress = SELF_IP, deploymentVersion = SELF_DEPLOYMENT)

private val log = getLogger<FakeCluster>()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,9 @@ import com.google.inject.Provider
internal class KubernetesClusterProvider @Inject internal constructor(
private val config: KubernetesConfig
) : Provider<DefaultCluster> {
override fun get() = DefaultCluster(Cluster.Member(config.my_pod_name, config.my_pod_ip))
override fun get() = DefaultCluster(Cluster.Member(
config.my_pod_name,
config.my_pod_ip,
config.my_deployment_version,
))
}
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,14 @@ internal class KubernetesClusterWatcher @Inject internal constructor(
}
}

private val V1Pod.asClusterMember get() = Cluster.Member(metadata!!.name!!, status!!.podIP ?: "")
private val V1Pod.asClusterMember: Cluster.Member
get() {
return Cluster.Member(
name = metadata!!.name!!,
ipAddress = status!!.podIP ?: "",
deploymentVersion = metadata?.labels?.get("tags.datadoghq.com/version") ?: ""
)
}

private val V1Pod.isReady: Boolean
get() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ data class KubernetesConfig @JvmOverloads constructor(
val my_pod_namespace: String = System.getenv("MY_POD_NAMESPACE") ?: "<invalid-namespace>",
val my_pod_name: String = System.getenv("MY_POD_NAME") ?: "<invalid-pod-name>",
val my_pod_ip: String = System.getenv("MY_POD_IP") ?: "<invalid-pod-ip>",
val my_deployment_version: String = System.getenv("MY_POD_DEPLOYMENT") ?: "<invalid-pod-deployment>",
val clustering_pod_label_selector: String? = null,
// NB(mmihic): kubernetes_watch_read_timeout needs to be long to avoid timeouts during watch
val kubernetes_watch_read_timeout: Long = 60,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,16 @@ import org.junit.jupiter.api.assertThrows

internal class ClusterHashRingTest {
@Test fun singleNode() {
val zork = Cluster.Member("zork", "192.49.168.23")
val zork = Cluster.Member("zork", "192.49.168.23", "fakeDeployment")
val hashRing =
ClusterHashRing(members = setOf(zork), hashFn = Hashing.murmur3_32(0))
assertThat(listOf("foo", "bar", "zed").map { hashRing[it] }).containsExactly(zork, zork, zork)
}

@Test fun multipleNodes() {
val zork = Cluster.Member("zork", "192.49.168.23")
val mork = Cluster.Member("mork", "192.49.168.24")
val quark = Cluster.Member("quark", "192.49.168.25")
val zork = Cluster.Member("zork", "192.49.168.23", "fakeDeployment")
val mork = Cluster.Member("mork", "192.49.168.24", "fakeDeployment")
val quark = Cluster.Member("quark", "192.49.168.25", "fakeDeployment")

// First version of hash ring
val hashRing1 = ClusterHashRing(
Expand All @@ -40,7 +40,7 @@ internal class ClusterHashRingTest {
.containsExactly(zork, quark, zork, zork)

// Add a new member, should not remap resources unnecessarily
val bork = Cluster.Member("bork", "192.49.168.26")
val bork = Cluster.Member("bork", "192.49.168.26", "fakeDeployment")
val hashRing3 = ClusterHashRing(
members = setOf(zork, quark, bork),
hashFn = Hashing.murmur3_32(0)
Expand All @@ -67,9 +67,9 @@ internal class ClusterHashRingTest {
(c, INT_MAX] => a
This test ensures that each range ends up mapping to the expected vnode.
*/
val a = Cluster.Member("a", "192.49.168.23")
val b = Cluster.Member("b", "192.49.168.24")
val c = Cluster.Member("c", "192.49.168.25")
val a = Cluster.Member("a", "192.49.168.23", "fakeDeployment")
val b = Cluster.Member("b", "192.49.168.24", "fakeDeployment")
val c = Cluster.Member("c", "192.49.168.25", "fakeDeployment")

// First version of hash ring
val hashRing = ClusterHashRing(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ internal class FakeClusterTest {
@Inject lateinit var cluster: FakeCluster

@Test fun clusterRespondsToChanges() {
cluster.clusterChanged(membersBecomingReady = setOf(Cluster.Member("blerp", "192.168.12.3")))
cluster.clusterChanged(membersBecomingReady = setOf(Cluster.Member("blerp", "192.168.12.3", "fakeDeployment")))
assertThat(cluster.snapshot.readyMembers)
.containsExactlyInAnyOrder(Cluster.Member("blerp", "192.168.12.3"))
cluster.clusterChanged(membersBecomingNotReady = setOf(Cluster.Member("blerp", "192.168.12.3")))
.containsExactlyInAnyOrder(Cluster.Member("blerp", "192.168.12.3", "fakeDeployment"))
cluster.clusterChanged(membersBecomingNotReady = setOf(Cluster.Member("blerp", "192.168.12.3", "fakeDeployment")))
assertThat(cluster.snapshot.readyMembers).isEmpty()
}

Expand All @@ -32,14 +32,14 @@ internal class FakeClusterTest {
FakeCluster.self
)

cluster.resourceMapper.setDefaultMapping(Cluster.Member("zork", "192.168.12.0"))
cluster.resourceMapper.addMapping("my-object", Cluster.Member("bork", "192.168.12.1"))
cluster.resourceMapper.setDefaultMapping(Cluster.Member("zork", "192.168.12.0", "fakeDeployment"))
cluster.resourceMapper.addMapping("my-object", Cluster.Member("bork", "192.168.12.1", "fakeDeployment"))

assertThat(cluster.snapshot.resourceMapper["my-object"].name).isEqualTo("bork")
assertThat(cluster.snapshot.resourceMapper["other-object"].name).isEqualTo("zork")

// Ensure resource mapper remains the same even through cluster changes
cluster.clusterChanged(membersBecomingReady = setOf(Cluster.Member("blerp", "192.168.12.3")))
cluster.clusterChanged(membersBecomingReady = setOf(Cluster.Member("blerp", "192.168.12.3", "fakeDeployment")))
assertThat(cluster.snapshot.resourceMapper["my-object"].name).isEqualTo("bork")
assertThat(cluster.snapshot.resourceMapper["other-object"].name).isEqualTo("zork")

Expand Down
Loading
Loading