Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(discovery): discovery synchronization for stale lost targets #689

Merged
merged 38 commits into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
3a12aa0
fix(discovery): k8s discovery synchronization for stale lost targets
andrewazores Oct 9, 2024
ca9c97f
synchronize on lock while building/pruning owner chains to avoid dupl…
andrewazores Oct 9, 2024
15934f5
do not perform node query in toString
andrewazores Oct 15, 2024
88397f8
return existing node if already discovered
andrewazores Oct 16, 2024
4eee22c
remove unnecessary locking
andrewazores Oct 17, 2024
c066ddb
Revert "remove unnecessary locking"
andrewazores Oct 17, 2024
9bbf6cd
start transaction for active recording list request
andrewazores Oct 17, 2024
e23aa7a
cleanup
andrewazores Oct 17, 2024
e0473db
cleanup
andrewazores Oct 17, 2024
15b7bb0
rename
andrewazores Oct 17, 2024
df5af8d
simplify transaction ordering and locking
andrewazores Oct 17, 2024
a805cc0
refactor
andrewazores Oct 17, 2024
faddfd2
force resync on existing period
andrewazores Oct 17, 2024
9be1431
lower log level
andrewazores Oct 17, 2024
fccdc07
ensure full sync on startup
andrewazores Oct 17, 2024
bd42f0b
periodically retry storage bucket creation
andrewazores Oct 17, 2024
966297e
ensure ordered queue processing of transactions, error handling
andrewazores Oct 22, 2024
facfe29
fixup! ensure ordered queue processing of transactions, error handling
andrewazores Oct 22, 2024
5dc0591
decouple target JVM ID retrieval from persistence, handle in separate…
andrewazores Oct 23, 2024
a182230
ensure JVM ID is nulled if connection fails
andrewazores Oct 23, 2024
4016408
delay MODIFIED events same as FOUND events
andrewazores Oct 23, 2024
75568cd
refactor to use quartz for scheduling delayed connection, and reuse l…
andrewazores Oct 23, 2024
d33e8cc
remove unused case
andrewazores Oct 23, 2024
a8ce783
remove unnecessary job identity
andrewazores Oct 23, 2024
94844e3
handle nullable input data
andrewazores Oct 23, 2024
c2f5fd0
slower initial delay, periodic delay based on connection timeout
andrewazores Oct 23, 2024
95151f5
unwrap exception handling so transactions can be rolled back
andrewazores Oct 23, 2024
2cec483
cleanup
andrewazores Oct 23, 2024
c14f1ab
handle single-target updates within existing transaction
andrewazores Oct 23, 2024
730ddce
reduce delay
andrewazores Oct 23, 2024
35b17bc
skip update if jvmId already known
andrewazores Oct 23, 2024
c1262c3
updates should continue even if JVM ID is already known, so that acti…
andrewazores Oct 24, 2024
9be3035
rename
andrewazores Oct 24, 2024
2699300
rules ignore target discovery when JVM ID is still blank, act later a…
andrewazores Oct 24, 2024
1d372f3
handle updating JVM ID on credential change, null out JVM ID if updat…
andrewazores Oct 24, 2024
9caa89a
handle events in ordered serial fashion
andrewazores Oct 24, 2024
64b2320
use infrastructure pool instead of forkjoin
andrewazores Oct 24, 2024
d9ea05b
Revert "handle events in ordered serial fashion"
andrewazores Oct 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 41 additions & 1 deletion src/main/java/io/cryostat/StorageBuckets.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,21 @@
*/
package io.cryostat;

import java.time.Duration;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import io.cryostat.util.HttpStatusCodeIdentifier;

import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.jboss.logging.Logger;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
Expand All @@ -30,7 +41,17 @@ public class StorageBuckets {
@Inject S3Client storage;
@Inject Logger logger;

@ConfigProperty(name = "storage.buckets.creation-retry.period")
Duration creationRetryPeriod;

private final Set<String> buckets = ConcurrentHashMap.newKeySet();
private final ScheduledExecutorService worker = Executors.newSingleThreadScheduledExecutor();

public void createIfNecessary(String bucket) {
buckets.add(bucket);
}

private boolean tryCreate(String bucket) {
boolean exists = false;
logger.debugv("Checking if storage bucket \"{0}\" exists ...", bucket);
try {
Expand All @@ -49,8 +70,27 @@ public void createIfNecessary(String bucket) {
storage.createBucket(CreateBucketRequest.builder().bucket(bucket).build());
logger.debugv("Storage bucket \"{0}\" created", bucket);
} catch (Exception e) {
logger.error(e);
logger.warn(e);
return false;
}
}
return true;
}

void onStart(@Observes StartupEvent evt) {
worker.scheduleAtFixedRate(
() -> {
var it = buckets.iterator();
while (it.hasNext()) {
if (tryCreate(it.next())) it.remove();
}
},
0,
creationRetryPeriod.toMillis(),
TimeUnit.MILLISECONDS);
}

void onStop(@Observes ShutdownEvent evt) {
worker.shutdown();
}
}
71 changes: 47 additions & 24 deletions src/main/java/io/cryostat/discovery/DiscoveryNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.fasterxml.jackson.annotation.JsonView;
import io.quarkus.hibernate.orm.panache.PanacheEntity;
import io.quarkus.narayana.jta.QuarkusTransaction;
import io.quarkus.panache.common.Parameters;
import io.vertx.mutiny.core.eventbus.EventBus;
import jakarta.annotation.Nullable;
import jakarta.enterprise.context.ApplicationScoped;
Expand All @@ -42,6 +43,8 @@
import jakarta.persistence.FetchType;
import jakarta.persistence.JoinColumn;
import jakarta.persistence.ManyToOne;
import jakarta.persistence.NamedQueries;
import jakarta.persistence.NamedQuery;
import jakarta.persistence.OneToMany;
import jakarta.persistence.OneToOne;
import jakarta.persistence.PostPersist;
Expand All @@ -56,6 +59,11 @@

@Entity
@EntityListeners(DiscoveryNode.Listener.class)
@NamedQueries({
@NamedQuery(
name = "DiscoveryNode.byTypeWithName",
query = "from DiscoveryNode where nodeType = :nodeType and name = :name")
})
public class DiscoveryNode extends PanacheEntity {

public static final String NODE_TYPE = "nodeType";
Expand Down Expand Up @@ -129,33 +137,48 @@ public static List<DiscoveryNode> findAllByNodeType(NodeType nodeType) {
}

public static DiscoveryNode environment(String name, NodeType nodeType) {
return QuarkusTransaction.joiningExisting()
.call(
() -> {
DiscoveryNode node = new DiscoveryNode();
node.name = name;
node.nodeType = nodeType.getKind();
node.labels = new HashMap<>();
node.children = new ArrayList<>();
node.target = null;
node.persist();
return node;
});
var kind = nodeType.getKind();
return DiscoveryNode.<DiscoveryNode>find(
"#DiscoveryNode.byTypeWithName",
Parameters.with("nodeType", kind).and("name", name))
.firstResultOptional()
.orElseGet(
() ->
QuarkusTransaction.joiningExisting()
.call(
() -> {
DiscoveryNode node = new DiscoveryNode();
node.name = name;
node.nodeType = kind;
node.labels = new HashMap<>();
node.children = new ArrayList<>();
node.target = null;
node.persist();
return node;
}));
}

public static DiscoveryNode target(Target target, NodeType nodeType) {
return QuarkusTransaction.joiningExisting()
.call(
() -> {
DiscoveryNode node = new DiscoveryNode();
node.name = target.connectUrl.toString();
node.nodeType = nodeType.getKind();
node.labels = new HashMap<>(target.labels);
node.children = null;
node.target = target;
node.persist();
return node;
});
var kind = nodeType.getKind();
var connectUrl = target.connectUrl.toString();
return DiscoveryNode.<DiscoveryNode>find(
"#DiscoveryNode.byTypeWithName",
Parameters.with("nodeType", kind).and("name", connectUrl))
.firstResultOptional()
.orElseGet(
() ->
QuarkusTransaction.joiningExisting()
.call(
() -> {
DiscoveryNode node = new DiscoveryNode();
node.name = connectUrl;
node.nodeType = kind;
node.labels = new HashMap<>(target.labels);
node.children = null;
node.target = target;
node.persist();
return node;
}));
}

@Override
Expand Down
Loading
Loading