diff --git a/skid-road-dropwizard/src/main/java/io/ifar/skidroad/dropwizard/cli/FlushLogsCommand.java b/skid-road-dropwizard/src/main/java/io/ifar/skidroad/dropwizard/cli/FlushLogsCommand.java new file mode 100644 index 0000000..8b39c29 --- /dev/null +++ b/skid-road-dropwizard/src/main/java/io/ifar/skidroad/dropwizard/cli/FlushLogsCommand.java @@ -0,0 +1,141 @@ +package io.ifar.skidroad.dropwizard.cli; + +import com.google.common.collect.ImmutableSet; +import com.yammer.dropwizard.cli.ConfiguredCommand; +import com.yammer.dropwizard.config.Bootstrap; +import com.yammer.dropwizard.config.Configuration; +import com.yammer.dropwizard.config.Environment; +import io.ifar.goodies.CliConveniences; +import io.ifar.skidroad.LogFile; +import io.ifar.skidroad.dropwizard.ManagedPrepWorkerManager; +import io.ifar.skidroad.dropwizard.ManagedUploadWorkerManager; +import io.ifar.skidroad.tracking.LogFileState; +import io.ifar.skidroad.tracking.LogFileStateListener; +import io.ifar.skidroad.tracking.LogFileTracker; +import io.ifar.skidroad.writing.WritingWorkerManager; +import net.sourceforge.argparse4j.inf.Namespace; +import net.sourceforge.argparse4j.inf.Subparser; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.TimeUnit; + +import static io.ifar.skidroad.tracking.LogFileState.*; + +/** + * Prepares and uploads any in-progress log files on this host. Service must not be currently running! Used to + * facilitate decommissioning a host without leaving behind any data. + */ +@SuppressWarnings("UnusedDeclaration") +public abstract class FlushLogsCommand extends ConfiguredCommand +{ + private final static Logger LOG = LoggerFactory.getLogger(FlushLogsCommand.class); + + public FlushLogsCommand() { + super("flush-logs","Prepare and upload any in-progress log files on this host. SERVICE MUST NOT BE RUNNING."); + } + + @Override + public void configure(Subparser subparser) { + super.configure(subparser); + } + + /** + * Subclasses should overwrite and create singleton instances returned by the other abstract methods. Manager objects + * created should be registered with the Environment for proper lifecycle handling. + * @param configuration + * @param environment + */ + protected abstract void init(T configuration, Environment environment) throws Exception; + protected abstract LogFileTracker getLogTracker(); + protected abstract WritingWorkerManager getWritingWorkerManager(); + protected abstract ManagedPrepWorkerManager getPrepWorkerManager(); + protected abstract ManagedUploadWorkerManager getUploadWorkerManager(); + + + @Override + protected void run(Bootstrap bootstrap, Namespace namespace, T configuration) throws Exception { + + Environment env = CliConveniences.fabricateEnvironment(getName(), configuration); + init(configuration,env); + + LogFileTracker logFileTracker = getLogTracker(); + //Get a WritingWorkerManager because, on start-up, it progresses any stale WRITING files to WRITTEN or WRITE_ERROR status + WritingWorkerManager writingWorkerManager = getWritingWorkerManager(); + //Get prep and upload worker managers to execute the flushing + ManagedPrepWorkerManager managedPrepWorkerManager = getPrepWorkerManager(); + ManagedUploadWorkerManager managedUploadWorkerManager = getUploadWorkerManager(); + + try { + + env.start(); + //immediately kick off an initial retry, then continue with regular configured interval + managedPrepWorkerManager.retryOneThenRetryAll(); + managedUploadWorkerManager.retryOneThenRetryAll(); + + ExecutorService executor = Executors.newFixedThreadPool(1); + + //TODO: Configurable timeout. DropWizard will System.exit(1) if we throw an exception from our run method. + executor.submit(new StateWaiter(logFileTracker)).get(); + + System.out.println("[DONE]"); + + } finally { + env.stop(); + } + } + + private static class StateWaiter implements Runnable, LogFileStateListener { + private final static Set STATES_TO_WAIT_ON = ImmutableSet.of( + WRITING, + WRITTEN, + //don't wait on WRITE_ERROR; probably no data recover + PREPARING, + PREPARED, + PREP_ERROR, + UPLOADING, + UPLOAD_ERROR + ); + private final LogFileTracker tracker; + private final SynchronousQueue stateChanges; + + private StateWaiter(LogFileTracker tracker) { + this.tracker = tracker; + this.stateChanges = new SynchronousQueue<>(); + } + + @Override + public void run() { + while (true) { + int pendingFiles = tracker.getCount(STATES_TO_WAIT_ON); + if (pendingFiles == 0) { + System.out.println("No pending files. Done."); + break; + } else { + System.out.println(pendingFiles + " files pending upload. Waiting..."); + try { + //wait until some file's state has changed or until timeout it reached + stateChanges.poll(30, TimeUnit.SECONDS); + } catch (InterruptedException e) { + System.out.println("Interrupted!"); + break; + } + } + } + } + + @Override + public void stateChanged(LogFile logFile) { + //try for a little while to notify run() method that there's a new file, then give up and drop on floor. + try { + stateChanges.offer(logFile, 1, TimeUnit.SECONDS); + } catch (InterruptedException e) { + //ignore + } + } + } +} diff --git a/skid-road/src/main/java/io/ifar/skidroad/prepping/PrepWorkerManager.java b/skid-road/src/main/java/io/ifar/skidroad/prepping/PrepWorkerManager.java index f58d7e3..a9fae63 100644 --- a/skid-road/src/main/java/io/ifar/skidroad/prepping/PrepWorkerManager.java +++ b/skid-road/src/main/java/io/ifar/skidroad/prepping/PrepWorkerManager.java @@ -216,48 +216,28 @@ private boolean isClaimed(LogFile f) { } } - @DisallowConcurrentExecution - public static class RetryJob implements Job - { - public static final String PREP_WORKER_MANAGER = "prep_worker_manager"; - private static final Logger LOG = LoggerFactory.getLogger(RetryJob.class); - - public void execute(JobExecutionContext context) throws JobExecutionException { - JobDataMap m = context.getMergedJobDataMap(); - PrepWorkerManager mgr = (PrepWorkerManager) m.get(PREP_WORKER_MANAGER); - - try { - try (AutoCloseableIterator iterator = mgr.tracker.findMine(ImmutableSet.of(PREPARING,WRITTEN,PREP_ERROR))) { - tryOneThenRetryAll(iterator,mgr); - } - } catch (Exception e) { - throw new JobExecutionException("Failure retrying prep jobs.", e); - } - } - - /** - * Guards against situation where there are many files to retry and retries are failing. Attempt one and, - * if it succeeds, retry the others. Otherwise wait. Selection of one to try is random within the first - * PEEK_DEPTH records returned by teh database. This avoids iterating the whole result set. - * @param iterator - * @param mgr - */ - private void tryOneThenRetryAll(Iterator iterator, PrepWorkerManager mgr) { + /** + * Guards against situation where there are many files to retry and retries are failing. Attempt one and, + * if it succeeds, retry the others. Otherwise wait. Selection of one to try is random within the first + * PEEK_DEPTH records returned by teh database. This avoids iterating the whole result set. + */ + public void retryOneThenRetryAll() throws Exception { + try (AutoCloseableIterator iterator = tracker.findMine(ImmutableSet.of(PREPARING,WRITTEN,PREP_ERROR))) { Pair> oneSelected = Iterators.takeOneFromTopN(iterator, PEEK_DEPTH); if (oneSelected.left != null) { //claim check not required for thread safety, but avoid spurious WARNs about retrying items while they are in-flight for the first time LogFile trialLogFile = oneSelected.left; - if (!mgr.isClaimed(trialLogFile)) { + if (!isClaimed(trialLogFile)) { try { logRetryMessageForState(trialLogFile); - if (mgr.processSync(trialLogFile)) { + if (processSync(trialLogFile)) { if (oneSelected.right.hasNext()) { LOG.info("First retry succeeded. Will queue others."); for (LogFile logFile : oneSelected.right) { //claim check not required for thread safety, but avoid spurious WARNs about retrying items while they are in-flight for the first time - if (!mgr.isClaimed(logFile)) { + if (!isClaimed(logFile)) { logRetryMessageForState(logFile); - mgr.processAsync(logFile); + processAsync(logFile); } } } else { @@ -276,27 +256,44 @@ private void tryOneThenRetryAll(Iterator iterator, PrepWorkerManager mg } } } + } - private void logRetryMessageForState(LogFile logFile) { - switch (logFile.getState()) { - case WRITTEN: - LOG.info("Found stale {} record for {}. Perhaps server was previously terminated before preparing it.", - logFile.getState(), - logFile); - break; - case PREPARING: - LOG.info("Found stale {} record for {}. Perhaps server was previously terminated while preparing it.", - logFile.getState(), - logFile); - break; - case PREP_ERROR: - LOG.info("Found {} record for {}. Perhaps a transient error occurred while preparing it.", - logFile.getState(), - logFile); - break; - default: - throw new IllegalStateException(String.format("Did not expect to be processing %s record for %s. Bug!", logFile.getState(), logFile)); + private void logRetryMessageForState(LogFile logFile) { + switch (logFile.getState()) { + case WRITTEN: + LOG.info("Found stale {} record for {}. Perhaps server was previously terminated before preparing it.", + logFile.getState(), + logFile); + break; + case PREPARING: + LOG.info("Found stale {} record for {}. Perhaps server was previously terminated while preparing it.", + logFile.getState(), + logFile); + break; + case PREP_ERROR: + LOG.info("Found {} record for {}. Perhaps a transient error occurred while preparing it.", + logFile.getState(), + logFile); + break; + default: + throw new IllegalStateException(String.format("Did not expect to be processing %s record for %s. Bug!", logFile.getState(), logFile)); + } + } + + + @DisallowConcurrentExecution + public static class RetryJob implements Job + { + public static final String PREP_WORKER_MANAGER = "prep_worker_manager"; + + public void execute(JobExecutionContext context) throws JobExecutionException { + JobDataMap m = context.getMergedJobDataMap(); + PrepWorkerManager mgr = (PrepWorkerManager) m.get(PREP_WORKER_MANAGER); + try { + mgr.retryOneThenRetryAll(); + } catch (Exception e) { + throw new JobExecutionException("Failure retrying prep jobs.", e); } } } diff --git a/skid-road/src/main/java/io/ifar/skidroad/upload/UploadWorkerManager.java b/skid-road/src/main/java/io/ifar/skidroad/upload/UploadWorkerManager.java index 2d91651..8d69026 100644 --- a/skid-road/src/main/java/io/ifar/skidroad/upload/UploadWorkerManager.java +++ b/skid-road/src/main/java/io/ifar/skidroad/upload/UploadWorkerManager.java @@ -225,51 +225,28 @@ private boolean isClaimed(LogFile f) { } } - @DisallowConcurrentExecution - public static class RetryJob implements Job - { - public static final String UPLOAD_WORKER_MANAGER = "upload_worker_manager"; - private static final Logger LOG = LoggerFactory.getLogger(RetryJob.class); - - public void execute(JobExecutionContext context) throws JobExecutionException { - JobDataMap m = context.getMergedJobDataMap(); - UploadWorkerManager mgr = (UploadWorkerManager) m.get(UPLOAD_WORKER_MANAGER); - - try { - try (AutoCloseableIterator iterator = mgr.tracker.findMine(ImmutableSet.of(UPLOADING, PREPARED, UPLOAD_ERROR))){ - tryOneThenRetryAll(iterator, mgr); - } - } catch (Exception e) { - //Observed causes: - // findMine throws org.skife.jdbi.v2.exceptions.UnableToCreateStatementException: org.postgresql.util.PSQLException: This connection has been closed. - // findMine throws org.skife.jdbi.v2.exceptions.UnableToExecuteStatementException: org.postgresql.util.PSQLException: An I/O error occured while sending to the backend. - throw new JobExecutionException("Failure retrying upload jobs.", e); - } - } - - /** - * Guards against situation where there are many files to retry and retries are failing. Attempt one and, - * if it succeeds, retry the others. Otherwise wait. Selection of one to try is random within the first - * PEEK_DEPTH records returned by the database. This avoids iterating the whole result set. - * @param iterator - * @param mgr - */ - private void tryOneThenRetryAll(Iterator iterator, UploadWorkerManager mgr) { + /** + * Guards against situation where there are many files to retry and retries are failing. Attempt one and, + * if it succeeds, retry the others. Otherwise wait. Selection of one to try is random within the first + * PEEK_DEPTH records returned by the database. This avoids iterating the whole result set. + */ + public void retryOneThenRetryAll() throws Exception { + try (AutoCloseableIterator iterator = tracker.findMine(ImmutableSet.of(UPLOADING, PREPARED, UPLOAD_ERROR))){ Pair> oneSelected = Iterators.takeOneFromTopN(iterator, PEEK_DEPTH); if (oneSelected.left != null) { //claim check not required for thread safety, but avoid spurious WARNs about retrying items while they are in-flight for the first time LogFile trialLogFile = oneSelected.left; - if (!mgr.isClaimed(trialLogFile)) { + if (!isClaimed(trialLogFile)) { try { logRetryMessageForState(trialLogFile); - if (mgr.processSync(trialLogFile)) { + if (processSync(trialLogFile)) { if (oneSelected.right.hasNext()) { LOG.info("First retry succeeded. Will queue others."); for (LogFile logFile : oneSelected.right) { //claim check not required for thread safety, but avoid spurious WARNs about retrying items while they are in-flight for the first time - if (!mgr.isClaimed(logFile)) { + if (!isClaimed(logFile)) { logRetryMessageForState(logFile); - mgr.processAsync(logFile); + processAsync(logFile); } } } else { @@ -288,26 +265,47 @@ private void tryOneThenRetryAll(Iterator iterator, UploadWorkerManager } } } + } - private void logRetryMessageForState(LogFile logFile) { - switch (logFile.getState()) { - case PREPARED: - LOG.info("Found stale {} record for {}. Perhaps server was previously terminated before uploading it.", - logFile.getState(), - logFile); - break; - case UPLOADING: - LOG.info("Found stale {} record for {}. Perhaps server was previously terminated while uploading it.", - logFile.getState(), - logFile); - break; - case UPLOAD_ERROR: - LOG.info("Found {} record for {}. Perhaps a transient error occurred while uploading it.", - logFile.getState(), - logFile); - break; - default: - throw new IllegalStateException(String.format("Did not expect to be processing %s record for %s. Bug!", logFile.getState(), logFile)); + private void logRetryMessageForState(LogFile logFile) { + switch (logFile.getState()) { + case PREPARED: + LOG.info("Found stale {} record for {}. Perhaps server was previously terminated before uploading it.", + logFile.getState(), + logFile); + break; + case UPLOADING: + LOG.info("Found stale {} record for {}. Perhaps server was previously terminated while uploading it.", + logFile.getState(), + logFile); + break; + case UPLOAD_ERROR: + LOG.info("Found {} record for {}. Perhaps a transient error occurred while uploading it.", + logFile.getState(), + logFile); + break; + default: + throw new IllegalStateException(String.format("Did not expect to be processing %s record for %s. Bug!", logFile.getState(), logFile)); + } + } + + @DisallowConcurrentExecution + public static class RetryJob implements Job + { + public static final String UPLOAD_WORKER_MANAGER = "upload_worker_manager"; + private static final Logger LOG = LoggerFactory.getLogger(RetryJob.class); + + public void execute(JobExecutionContext context) throws JobExecutionException { + JobDataMap m = context.getMergedJobDataMap(); + UploadWorkerManager mgr = (UploadWorkerManager) m.get(UPLOAD_WORKER_MANAGER); + + try { + mgr.retryOneThenRetryAll(); + } catch (Exception e) { + //Observed causes: + // findMine throws org.skife.jdbi.v2.exceptions.UnableToCreateStatementException: org.postgresql.util.PSQLException: This connection has been closed. + // findMine throws org.skife.jdbi.v2.exceptions.UnableToExecuteStatementException: org.postgresql.util.PSQLException: An I/O error occured while sending to the backend. + throw new JobExecutionException("Failure retrying upload jobs.", e); } } } diff --git a/skid-road/src/main/java/io/ifar/skidroad/writing/AbstractWritingWorker.java b/skid-road/src/main/java/io/ifar/skidroad/writing/AbstractWritingWorker.java index 9469169..13572e9 100644 --- a/skid-road/src/main/java/io/ifar/skidroad/writing/AbstractWritingWorker.java +++ b/skid-road/src/main/java/io/ifar/skidroad/writing/AbstractWritingWorker.java @@ -99,6 +99,10 @@ public void run() { LOG.debug("{} caught InterruptedException. Stopping...", this.name); shuttingDown = true; } + //Check for interrupt received while no blocking operation was in progress. + if (Thread.interrupted()) { + shuttingDown = true; + } } flush(writer); dirty = false; diff --git a/skid-road/src/main/java/io/ifar/skidroad/writing/WritingWorkerManager.java b/skid-road/src/main/java/io/ifar/skidroad/writing/WritingWorkerManager.java index 2cf5fdb..863d98f 100644 --- a/skid-road/src/main/java/io/ifar/skidroad/writing/WritingWorkerManager.java +++ b/skid-road/src/main/java/io/ifar/skidroad/writing/WritingWorkerManager.java @@ -278,6 +278,20 @@ public void start() throws Exception { LOG.info("Starting {}.",WritingWorkerManager.class.getSimpleName()); //On startup, look for database records that were left hanging and tidy up. + cleanStaleEntries(); + + Map pruneConfiguration = new HashMap<>(1); + pruneConfiguration.put(PruneJob.FILE_WRITING_WORKER_MANAGER, this); + scheduler.schedule(this.getClass().getSimpleName()+"_prune_"+instanceCounter.incrementAndGet(), PruneJob.class, pruneIntervalSeconds * 1000, pruneConfiguration); + } + + /** + * Find any WRITING entries, presume they are stale, and move them to WRITTEN or WRITE_ERROR state. This method should + * only be called when the server is not running (or just starting up), lest it stomp on a WRITING entry which is + * in-progress. + * @throws Exception + */ + private void cleanStaleEntries() throws Exception { try (AutoCloseableIterator staleEntries = tracker.findMine(WRITING)){ while (staleEntries.hasNext()) { LogFile staleEntry = staleEntries.next(); @@ -290,10 +304,6 @@ public void start() throws Exception { } } } - - Map pruneConfiguration = new HashMap<>(1); - pruneConfiguration.put(PruneJob.FILE_WRITING_WORKER_MANAGER, this); - scheduler.schedule(this.getClass().getSimpleName()+"_prune_"+instanceCounter.incrementAndGet(), PruneJob.class, pruneIntervalSeconds * 1000, pruneConfiguration); } public void stop() throws InterruptedException {