Skip to content

Commit

Permalink
Merge pull request #110 from jmxtrans/better-shut-down
Browse files Browse the repository at this point in the history
Better shut down
  • Loading branch information
Cyrille Le Clerc committed Nov 27, 2015
2 parents e767ede + 6022fea commit f70885f
Show file tree
Hide file tree
Showing 7 changed files with 194 additions and 127 deletions.
196 changes: 97 additions & 99 deletions src/main/java/org/jmxtrans/embedded/EmbeddedJmxTrans.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.management.MBeanServer;
Expand All @@ -40,8 +41,6 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

Expand Down Expand Up @@ -89,89 +88,55 @@ public EmbeddedJmxTrans(MBeanServer mbeanServer) {
*/
private class EmbeddedJmxTransShutdownHook extends Thread {

private final AtomicBoolean alreadyExecuted = new AtomicBoolean(false);
private boolean removed = false;

private final Logger logger = LoggerFactory.getLogger(getClass());

public EmbeddedJmxTransShutdownHook() {
setName(getClass().getSimpleName() + "-" + getName());
}

@Override
public void run() {
// this method will be executed by the Runtime as a Shutdown Hook
execute("EmbeddedJmxTransShutdownHook");
}

private void execute(String invokerName) {
if (alreadyExecuted.compareAndSet(false, true)) {
try {
collectMetrics();
exportCollectedMetrics();
logger.info("{} successfully collected and exported metrics one last time", invokerName);
} catch (RuntimeException e) {
logger.warn("{} failed to collect and export metrics one last time", invokerName);
}

try {
for (Query query : queries) {
query.stop();
}
} catch (Exception e) {
logger.warn("Failure while stopping queries", e);
}

try {
for (OutputWriter outputWriter : outputWriters) {
outputWriter.stop();
}
} catch (Exception e) {
logger.warn("Failure while stopping outputWriters", e);
}
try {
EmbeddedJmxTrans.this.stop();
} catch(Exception e) {
logger.warn("Exception shutting down", e);
}
}

public boolean isAlreadyExecuted() {
return alreadyExecuted.get();
}

public void registerToRuntime() {
Runtime.getRuntime().addShutdownHook(this);
}

public void unregisterFromRuntime() {
if (removed) {
logger.debug("Shutdown hook already removed");
}
try {
boolean shutdownHookRemoved = Runtime.getRuntime().removeShutdownHook(this);
if (shutdownHookRemoved) {
removed = true;
logger.debug("ShutdownHook successfully removed");
} else {
logger.warn("Failure to remove ShutdownHook");
}
} catch (IllegalStateException e) {
logger.debug("Failure to remove ShutdownHook, probably 'Shutdown in progress'", e);
} catch (RuntimeException e) {
logger.warn("Failure to remove ShutdownHook", e);
}
}

public void onStop() {
// if the shutdown hook was already executed by the Runtime
// then we should not try to remove it from the Runtime otherwise we would get an IllegalStateException: Shutdown in progress

if (!isAlreadyExecuted()) {
unregisterFromRuntime();

// as the shutdownHook was not already executed
// we want to execute it in order to trigger the last collection and export
execute("EmbeddedJmxTrans Stop Handler");
}
}

}

private final Logger logger = LoggerFactory.getLogger(getClass());

enum State {STOPPED, STARTING, STARTED, STOPPING}
enum State {STOPPED, STARTED, ERROR}

private AtomicReference<State> state = new AtomicReference(State.STOPPED);
private State state = State.STOPPED;

private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private final ReadWriteLock lifecycleLock = new ReentrantReadWriteLock();

private ScheduledExecutorService collectScheduledExecutor;

Expand All @@ -184,7 +149,7 @@ enum State {STOPPED, STARTING, STARTED, STOPPING}
private final List<Query> queries = new ArrayList<Query>();

/**
* Use to {@linkplain Set} to deduplicate during configuration merger
* Use a {@linkplain Set} to deduplicate during configuration merger
*/
private Set<OutputWriter> outputWriters = new HashSet<OutputWriter>();

Expand All @@ -204,15 +169,14 @@ enum State {STOPPED, STARTING, STARTED, STOPPING}
* Start the exporter: initialize underlying queries, start scheduled executors, register shutdown hook
*/
@PostConstruct
public synchronized void start() throws Exception {
readWriteLock.writeLock().lock();
public void start() throws Exception {
lifecycleLock.writeLock().lock();
try {
State state = this.state.get();
if (!State.STOPPED.equals(state)) {
logger.warn("Ignore start() command for {} instance", state);
return;
}
this.state.set(State.STARTING);
logger.info("Start...");

for (Query query : queries) {
query.start();
Expand All @@ -224,20 +188,20 @@ public synchronized void start() throws Exception {
collectScheduledExecutor = Executors.newScheduledThreadPool(getNumQueryThreads(), new NamedThreadFactory("jmxtrans-collect-", true));
exportScheduledExecutor = Executors.newScheduledThreadPool(getNumExportThreads(), new NamedThreadFactory("jmxtrans-export-", true));

logger.info("Start queries and output writers...");
for (final Query query : getQueries()) {
collectScheduledExecutor.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
readWriteLock.readLock().lock();
lifecycleLock.readLock().lock();
try {
State state = EmbeddedJmxTrans.this.state.get();
if (!State.STARTED.equals(state)) {
logger.debug("Ignore query.collectMetrics() command for {} instance", state);
return;
}
query.collectMetrics();
} finally {
readWriteLock.readLock().unlock();
lifecycleLock.readLock().unlock();
}
}

Expand All @@ -251,17 +215,16 @@ public String toString() {
exportScheduledExecutor.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
readWriteLock.readLock().lock();
lifecycleLock.readLock().lock();
try {
State state = EmbeddedJmxTrans.this.state.get();
if (!State.STARTED.equals(state)) {
logger.debug("Ignore query.exportCollectedMetrics() command for {} instance", state);
return;
}

query.exportCollectedMetrics();
} finally {
readWriteLock.readLock().unlock();
lifecycleLock.readLock().unlock();
}
}

Expand All @@ -274,56 +237,82 @@ public String toString() {

shutdownHook = new EmbeddedJmxTransShutdownHook();
shutdownHook.registerToRuntime();
this.state.set(State.STARTED);
state = State.STARTED;
logger.info("EmbeddedJmxTrans started");
} catch (RuntimeException e) {
this.state = State.ERROR;
if (logger.isDebugEnabled()) {
// to troubleshoot JMX call errors or equivalent, it may be useful to log and rethrow
logger.warn("Exception starting EmbeddedJmxTrans", e);
}
throw e;
} finally {
readWriteLock.writeLock().unlock();
lifecycleLock.writeLock().unlock();
}
}


/**
* Stop scheduled executors and collect-and-export metrics one last time.
*/
@PreDestroy
public synchronized void stop() throws Exception {
readWriteLock.writeLock().lock();
public void stop() {
logger.info("Stop...");
lifecycleLock.writeLock().lock();
try {
State state = this.state.get();
if (!State.STARTED.equals(state)) {
logger.debug("Ignore stop() command for " + state + " instance");
return;
}
this.state.set(State.STOPPING);
logger.info("Unregister shutdown hook");
this.shutdownHook.unregisterFromRuntime();

logger.info("Shutdown collectScheduledExecutor and exportScheduledExecutor...");
// no need to `shutdown()` and `awaitTermination()` before `shutdownNow()` as we invoke `collectMetrics()` and `exportCollectedMetrics()`
// `shutdownNow()` can be invoked before `collectMetrics()` and `exportCollectedMetrics()`
collectScheduledExecutor.shutdownNow();
exportScheduledExecutor.shutdownNow();

try {
collectScheduledExecutor.shutdown();
logger.info("Collect metrics...");
collectMetrics();
logger.info("Export metrics...");
exportCollectedMetrics();
} catch (RuntimeException e) {
logger.warn("Ignore failure collecting and exporting metrics during stop", e);
}

// queries and outputwriters can be stopped even if exports threads are running thanks to the lifecycleLock
logger.info("Stop queries...");
for (Query query : queries) {
try {
boolean terminated = collectScheduledExecutor.awaitTermination(getQueryIntervalInSeconds(), TimeUnit.SECONDS);
if (!terminated) {
List<Runnable> tasks = collectScheduledExecutor.shutdownNow();
logger.warn("Collect executor could not shutdown in time. Abort tasks " + tasks);
}
} catch (InterruptedException e) {
logger.warn("Ignore InterruptedException stopping", e);
query.stop();
} catch (Exception e) {
logger.warn("Ignore exception stopping query {}", query, e);
}
exportScheduledExecutor.shutdown();
}

logger.info("Stop output writers...");
for (OutputWriter outputWriter : outputWriters) {
try {
boolean terminated = exportScheduledExecutor.awaitTermination(getExportIntervalInSeconds(), TimeUnit.SECONDS);
if (!terminated) {
List<Runnable> tasks = exportScheduledExecutor.shutdownNow();
logger.warn("Export executor could not shutdown in time. Abort tasks " + tasks);
}
} catch (InterruptedException e) {
logger.warn("Ignore InterruptedException stopping", e);
outputWriter.stop();
} catch (Exception e) {
logger.warn("Ignore exception stopping outputWriters", e);
}
} catch (RuntimeException e) {
logger.warn("Failure while shutting down ExecutorServices", e);
}
shutdownHook.onStop();
this.state.set(State.STOPPED);

state = State.STOPPED;
logger.info("Set state to {}", state);
} catch (RuntimeException e) {
state = State.ERROR;
if (logger.isDebugEnabled()) {
// to troubleshoot JMX call errors or equivalent, it may be useful to log and rethrow
logger.warn("Exception stopping EmbeddedJmxTrans", e);
}
throw e;
} finally {
readWriteLock.writeLock().unlock();
lifecycleLock.writeLock().unlock();
}
logger.info("Stopped");
}


Expand All @@ -332,9 +321,8 @@ public synchronized void stop() throws Exception {
*/
@Override
public void collectMetrics() {
readWriteLock.readLock().lock();
lifecycleLock.readLock().lock();
try {
State state = this.state.get();
if (!State.STARTED.equals(state)) {
logger.debug("Ignore collectMetrics() command for " + state + " instance");
return;
Expand All @@ -343,7 +331,7 @@ public void collectMetrics() {
query.collectMetrics();
}
} finally {
readWriteLock.readLock().unlock();
lifecycleLock.readLock().unlock();
}
}

Expand All @@ -352,18 +340,17 @@ public void collectMetrics() {
*/
@Override
public void exportCollectedMetrics() {
readWriteLock.readLock().lock();
lifecycleLock.readLock().lock();
try {
State state = this.state.get();
if (!State.STARTED.equals(state)) {
logger.debug("Ignore collectMetrics() command for not running instance");
logger.debug("Ignore collectMetrics() command for " + state + " instance");
return;
}
for (Query query : getQueries()) {
query.exportCollectedMetrics();
}
} finally {
readWriteLock.readLock().unlock();
lifecycleLock.readLock().unlock();
}
}

Expand All @@ -380,7 +367,7 @@ public void addQuery(@Nonnull Query query) {
@Override
public String toString() {
return "EmbeddedJmxTrans{" +
"state=" + state +
"state=" + getState() +
", queries=" + queries +
", outputWriters=" + outputWriters +
", numQueryThreads=" + numQueryThreads +
Expand Down Expand Up @@ -522,4 +509,15 @@ public int getDiscardedResultsCount() {
}
return result;
}

// return a String and not an embedded-jmxtrans class/enum to be portable and usable in JMX tools such as VisualVM
@Nullable
public String getState() {
lifecycleLock.readLock().lock();
try {
return state == null ? null : state.toString();
} finally {
lifecycleLock.readLock().unlock();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,10 @@ public interface EmbeddedJmxTransMBean {
int getExportCount();

int getDiscardedResultsCount();

String getState();

void stop() throws Exception;

void start() throws Exception;
}
Loading

0 comments on commit f70885f

Please sign in to comment.