Skip to content

Commit

Permalink
First pass at flush-logs command (See #4).
Browse files Browse the repository at this point in the history
  • Loading branch information
lorrin committed Jul 23, 2013
1 parent 1522ec2 commit e9156de
Show file tree
Hide file tree
Showing 5 changed files with 257 additions and 107 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package io.ifar.skidroad.dropwizard.cli;

import com.google.common.collect.ImmutableSet;
import com.yammer.dropwizard.cli.ConfiguredCommand;
import com.yammer.dropwizard.config.Bootstrap;
import com.yammer.dropwizard.config.Configuration;
import com.yammer.dropwizard.config.Environment;
import io.ifar.goodies.CliConveniences;
import io.ifar.skidroad.LogFile;
import io.ifar.skidroad.dropwizard.ManagedPrepWorkerManager;
import io.ifar.skidroad.dropwizard.ManagedUploadWorkerManager;
import io.ifar.skidroad.tracking.LogFileState;
import io.ifar.skidroad.tracking.LogFileStateListener;
import io.ifar.skidroad.tracking.LogFileTracker;
import io.ifar.skidroad.writing.WritingWorkerManager;
import net.sourceforge.argparse4j.inf.Namespace;
import net.sourceforge.argparse4j.inf.Subparser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;

import static io.ifar.skidroad.tracking.LogFileState.*;

/**
* Prepares and uploads any in-progress log files on this host. Service must not be currently running! Used to
* facilitate decommissioning a host without leaving behind any data.
*/
@SuppressWarnings("UnusedDeclaration")
public abstract class FlushLogsCommand<T extends Configuration> extends ConfiguredCommand<T>
{
private final static Logger LOG = LoggerFactory.getLogger(FlushLogsCommand.class);

public FlushLogsCommand() {
super("flush-logs","Prepare and upload any in-progress log files on this host. SERVICE MUST NOT BE RUNNING.");
}

@Override
public void configure(Subparser subparser) {
super.configure(subparser);
}

/**
* Subclasses should overwrite and create singleton instances returned by the other abstract methods. Manager objects
* created should be registered with the Environment for proper lifecycle handling.
* @param configuration
* @param environment
*/
protected abstract void init(T configuration, Environment environment) throws Exception;
protected abstract LogFileTracker getLogTracker();
protected abstract WritingWorkerManager getWritingWorkerManager();
protected abstract ManagedPrepWorkerManager getPrepWorkerManager();
protected abstract ManagedUploadWorkerManager getUploadWorkerManager();


@Override
protected void run(Bootstrap<T> bootstrap, Namespace namespace, T configuration) throws Exception {

Environment env = CliConveniences.fabricateEnvironment(getName(), configuration);
init(configuration,env);

LogFileTracker logFileTracker = getLogTracker();
//Get a WritingWorkerManager because, on start-up, it progresses any stale WRITING files to WRITTEN or WRITE_ERROR status
WritingWorkerManager writingWorkerManager = getWritingWorkerManager();
//Get prep and upload worker managers to execute the flushing
ManagedPrepWorkerManager managedPrepWorkerManager = getPrepWorkerManager();
ManagedUploadWorkerManager managedUploadWorkerManager = getUploadWorkerManager();

try {

env.start();
//immediately kick off an initial retry, then continue with regular configured interval
managedPrepWorkerManager.retryOneThenRetryAll();
managedUploadWorkerManager.retryOneThenRetryAll();

ExecutorService executor = Executors.newFixedThreadPool(1);

//TODO: Configurable timeout. DropWizard will System.exit(1) if we throw an exception from our run method.
executor.submit(new StateWaiter(logFileTracker)).get();

System.out.println("[DONE]");

} finally {
env.stop();
}
}

private static class StateWaiter implements Runnable, LogFileStateListener {
private final static Set<LogFileState> STATES_TO_WAIT_ON = ImmutableSet.<LogFileState>of(
WRITING,
WRITTEN,
//don't wait on WRITE_ERROR; probably no data recover
PREPARING,
PREPARED,
PREP_ERROR,
UPLOADING,
UPLOAD_ERROR
);
private final LogFileTracker tracker;
private final SynchronousQueue<LogFile> stateChanges;

private StateWaiter(LogFileTracker tracker) {
this.tracker = tracker;
this.stateChanges = new SynchronousQueue<>();
}

@Override
public void run() {
while (true) {
int pendingFiles = tracker.getCount(STATES_TO_WAIT_ON);
if (pendingFiles == 0) {
System.out.println("No pending files. Done.");
break;
} else {
System.out.println(pendingFiles + " files pending upload. Waiting...");
try {
//wait until some file's state has changed or until timeout it reached
stateChanges.poll(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
System.out.println("Interrupted!");
break;
}
}
}
}

@Override
public void stateChanged(LogFile logFile) {
//try for a little while to notify run() method that there's a new file, then give up and drop on floor.
try {
stateChanges.offer(logFile, 1, TimeUnit.SECONDS);
} catch (InterruptedException e) {
//ignore
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -216,48 +216,28 @@ private boolean isClaimed(LogFile f) {
}
}

@DisallowConcurrentExecution
public static class RetryJob implements Job
{
public static final String PREP_WORKER_MANAGER = "prep_worker_manager";
private static final Logger LOG = LoggerFactory.getLogger(RetryJob.class);

public void execute(JobExecutionContext context) throws JobExecutionException {
JobDataMap m = context.getMergedJobDataMap();
PrepWorkerManager mgr = (PrepWorkerManager) m.get(PREP_WORKER_MANAGER);

try {
try (AutoCloseableIterator<LogFile> iterator = mgr.tracker.findMine(ImmutableSet.of(PREPARING,WRITTEN,PREP_ERROR))) {
tryOneThenRetryAll(iterator,mgr);
}
} catch (Exception e) {
throw new JobExecutionException("Failure retrying prep jobs.", e);
}
}

/**
* Guards against situation where there are many files to retry and retries are failing. Attempt one and,
* if it succeeds, retry the others. Otherwise wait. Selection of one to try is random within the first
* PEEK_DEPTH records returned by teh database. This avoids iterating the whole result set.
* @param iterator
* @param mgr
*/
private void tryOneThenRetryAll(Iterator<LogFile> iterator, PrepWorkerManager mgr) {
/**
* Guards against situation where there are many files to retry and retries are failing. Attempt one and,
* if it succeeds, retry the others. Otherwise wait. Selection of one to try is random within the first
* PEEK_DEPTH records returned by teh database. This avoids iterating the whole result set.
*/
public void retryOneThenRetryAll() throws Exception {
try (AutoCloseableIterator<LogFile> iterator = tracker.findMine(ImmutableSet.of(PREPARING,WRITTEN,PREP_ERROR))) {
Pair<LogFile,IterableIterator<LogFile>> oneSelected = Iterators.takeOneFromTopN(iterator, PEEK_DEPTH);
if (oneSelected.left != null) {
//claim check not required for thread safety, but avoid spurious WARNs about retrying items while they are in-flight for the first time
LogFile trialLogFile = oneSelected.left;
if (!mgr.isClaimed(trialLogFile)) {
if (!isClaimed(trialLogFile)) {
try {
logRetryMessageForState(trialLogFile);
if (mgr.processSync(trialLogFile)) {
if (processSync(trialLogFile)) {
if (oneSelected.right.hasNext()) {
LOG.info("First retry succeeded. Will queue others.");
for (LogFile logFile : oneSelected.right) {
//claim check not required for thread safety, but avoid spurious WARNs about retrying items while they are in-flight for the first time
if (!mgr.isClaimed(logFile)) {
if (!isClaimed(logFile)) {
logRetryMessageForState(logFile);
mgr.processAsync(logFile);
processAsync(logFile);
}
}
} else {
Expand All @@ -276,27 +256,44 @@ private void tryOneThenRetryAll(Iterator<LogFile> iterator, PrepWorkerManager mg
}
}
}
}


private void logRetryMessageForState(LogFile logFile) {
switch (logFile.getState()) {
case WRITTEN:
LOG.info("Found stale {} record for {}. Perhaps server was previously terminated before preparing it.",
logFile.getState(),
logFile);
break;
case PREPARING:
LOG.info("Found stale {} record for {}. Perhaps server was previously terminated while preparing it.",
logFile.getState(),
logFile);
break;
case PREP_ERROR:
LOG.info("Found {} record for {}. Perhaps a transient error occurred while preparing it.",
logFile.getState(),
logFile);
break;
default:
throw new IllegalStateException(String.format("Did not expect to be processing %s record for %s. Bug!", logFile.getState(), logFile));
private void logRetryMessageForState(LogFile logFile) {
switch (logFile.getState()) {
case WRITTEN:
LOG.info("Found stale {} record for {}. Perhaps server was previously terminated before preparing it.",
logFile.getState(),
logFile);
break;
case PREPARING:
LOG.info("Found stale {} record for {}. Perhaps server was previously terminated while preparing it.",
logFile.getState(),
logFile);
break;
case PREP_ERROR:
LOG.info("Found {} record for {}. Perhaps a transient error occurred while preparing it.",
logFile.getState(),
logFile);
break;
default:
throw new IllegalStateException(String.format("Did not expect to be processing %s record for %s. Bug!", logFile.getState(), logFile));
}
}


@DisallowConcurrentExecution
public static class RetryJob implements Job
{
public static final String PREP_WORKER_MANAGER = "prep_worker_manager";

public void execute(JobExecutionContext context) throws JobExecutionException {
JobDataMap m = context.getMergedJobDataMap();
PrepWorkerManager mgr = (PrepWorkerManager) m.get(PREP_WORKER_MANAGER);
try {
mgr.retryOneThenRetryAll();
} catch (Exception e) {
throw new JobExecutionException("Failure retrying prep jobs.", e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,51 +225,28 @@ private boolean isClaimed(LogFile f) {
}
}

@DisallowConcurrentExecution
public static class RetryJob implements Job
{
public static final String UPLOAD_WORKER_MANAGER = "upload_worker_manager";
private static final Logger LOG = LoggerFactory.getLogger(RetryJob.class);

public void execute(JobExecutionContext context) throws JobExecutionException {
JobDataMap m = context.getMergedJobDataMap();
UploadWorkerManager mgr = (UploadWorkerManager) m.get(UPLOAD_WORKER_MANAGER);

try {
try (AutoCloseableIterator<LogFile> iterator = mgr.tracker.findMine(ImmutableSet.of(UPLOADING, PREPARED, UPLOAD_ERROR))){
tryOneThenRetryAll(iterator, mgr);
}
} catch (Exception e) {
//Observed causes:
// findMine throws org.skife.jdbi.v2.exceptions.UnableToCreateStatementException: org.postgresql.util.PSQLException: This connection has been closed.
// findMine throws org.skife.jdbi.v2.exceptions.UnableToExecuteStatementException: org.postgresql.util.PSQLException: An I/O error occured while sending to the backend.
throw new JobExecutionException("Failure retrying upload jobs.", e);
}
}

/**
* Guards against situation where there are many files to retry and retries are failing. Attempt one and,
* if it succeeds, retry the others. Otherwise wait. Selection of one to try is random within the first
* PEEK_DEPTH records returned by the database. This avoids iterating the whole result set.
* @param iterator
* @param mgr
*/
private void tryOneThenRetryAll(Iterator<LogFile> iterator, UploadWorkerManager mgr) {
/**
* Guards against situation where there are many files to retry and retries are failing. Attempt one and,
* if it succeeds, retry the others. Otherwise wait. Selection of one to try is random within the first
* PEEK_DEPTH records returned by the database. This avoids iterating the whole result set.
*/
public void retryOneThenRetryAll() throws Exception {
try (AutoCloseableIterator<LogFile> iterator = tracker.findMine(ImmutableSet.of(UPLOADING, PREPARED, UPLOAD_ERROR))){
Pair<LogFile,IterableIterator<LogFile>> oneSelected = Iterators.takeOneFromTopN(iterator, PEEK_DEPTH);
if (oneSelected.left != null) {
//claim check not required for thread safety, but avoid spurious WARNs about retrying items while they are in-flight for the first time
LogFile trialLogFile = oneSelected.left;
if (!mgr.isClaimed(trialLogFile)) {
if (!isClaimed(trialLogFile)) {
try {
logRetryMessageForState(trialLogFile);
if (mgr.processSync(trialLogFile)) {
if (processSync(trialLogFile)) {
if (oneSelected.right.hasNext()) {
LOG.info("First retry succeeded. Will queue others.");
for (LogFile logFile : oneSelected.right) {
//claim check not required for thread safety, but avoid spurious WARNs about retrying items while they are in-flight for the first time
if (!mgr.isClaimed(logFile)) {
if (!isClaimed(logFile)) {
logRetryMessageForState(logFile);
mgr.processAsync(logFile);
processAsync(logFile);
}
}
} else {
Expand All @@ -288,26 +265,47 @@ private void tryOneThenRetryAll(Iterator<LogFile> iterator, UploadWorkerManager
}
}
}
}

private void logRetryMessageForState(LogFile logFile) {
switch (logFile.getState()) {
case PREPARED:
LOG.info("Found stale {} record for {}. Perhaps server was previously terminated before uploading it.",
logFile.getState(),
logFile);
break;
case UPLOADING:
LOG.info("Found stale {} record for {}. Perhaps server was previously terminated while uploading it.",
logFile.getState(),
logFile);
break;
case UPLOAD_ERROR:
LOG.info("Found {} record for {}. Perhaps a transient error occurred while uploading it.",
logFile.getState(),
logFile);
break;
default:
throw new IllegalStateException(String.format("Did not expect to be processing %s record for %s. Bug!", logFile.getState(), logFile));
private void logRetryMessageForState(LogFile logFile) {
switch (logFile.getState()) {
case PREPARED:
LOG.info("Found stale {} record for {}. Perhaps server was previously terminated before uploading it.",
logFile.getState(),
logFile);
break;
case UPLOADING:
LOG.info("Found stale {} record for {}. Perhaps server was previously terminated while uploading it.",
logFile.getState(),
logFile);
break;
case UPLOAD_ERROR:
LOG.info("Found {} record for {}. Perhaps a transient error occurred while uploading it.",
logFile.getState(),
logFile);
break;
default:
throw new IllegalStateException(String.format("Did not expect to be processing %s record for %s. Bug!", logFile.getState(), logFile));
}
}

@DisallowConcurrentExecution
public static class RetryJob implements Job
{
public static final String UPLOAD_WORKER_MANAGER = "upload_worker_manager";
private static final Logger LOG = LoggerFactory.getLogger(RetryJob.class);

public void execute(JobExecutionContext context) throws JobExecutionException {
JobDataMap m = context.getMergedJobDataMap();
UploadWorkerManager mgr = (UploadWorkerManager) m.get(UPLOAD_WORKER_MANAGER);

try {
mgr.retryOneThenRetryAll();
} catch (Exception e) {
//Observed causes:
// findMine throws org.skife.jdbi.v2.exceptions.UnableToCreateStatementException: org.postgresql.util.PSQLException: This connection has been closed.
// findMine throws org.skife.jdbi.v2.exceptions.UnableToExecuteStatementException: org.postgresql.util.PSQLException: An I/O error occured while sending to the backend.
throw new JobExecutionException("Failure retrying upload jobs.", e);
}
}
}
Expand Down
Loading

0 comments on commit e9156de

Please sign in to comment.