Skip to content

Commit

Permalink
enable ftp resume downloading using restart offset
Browse files Browse the repository at this point in the history
  • Loading branch information
pgdurand committed Oct 30, 2020
1 parent dda00a4 commit 9e92600
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 15 deletions.
28 changes: 18 additions & 10 deletions src/bzh/plealog/dbmirror/fetcher/MyCopyStreamListener.java
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/* Copyright (C) 2007-2017 Patrick G. Durand
/* Copyright (C) 2007-2020 Patrick G. Durand
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
Expand Down Expand Up @@ -48,6 +48,7 @@ public class MyCopyStreamListener implements CopyStreamListener {
private long stepSize;
private long step = 1;
private long stepPlastRunner = 1;
private long restartOffset = 0;
private Date start = null;
private int waitDelay = 3; // unit:
// seconds
Expand All @@ -56,11 +57,17 @@ public class MyCopyStreamListener implements CopyStreamListener {
public MyCopyStreamListener(String workerID,
UserProcessingMonitor userMonitor, String dbConfName, String fName,
long streamSize) {
this(workerID, userMonitor, dbConfName, fName, streamSize, 0l);
}
public MyCopyStreamListener(String workerID,
UserProcessingMonitor userMonitor, String dbConfName, String fName,
long streamSize, long restartOffset) {
_userMonitor = userMonitor;
this.dbConfName = dbConfName;
this.fName = fName;
this.workerID = workerID;
this.stepSize = streamSize / 100l;// 20l;
this.restartOffset = restartOffset;
if (this.stepSize < Util.DEFAULT_COPY_BUFFER_SIZE)
this.stepSize = Util.DEFAULT_COPY_BUFFER_SIZE;
// set the wait delay depending on the streamSize
Expand All @@ -87,21 +94,22 @@ public void bytesTransferred(CopyStreamEvent event) {
public void bytesTransferred(long totalBytesTransferred,
int bytesTransferred, long streamSize) {

long tbr = restartOffset + totalBytesTransferred;
// keep this log for Plastrunner
if (totalBytesTransferred >= (stepPlastRunner * stepSize * 10)
|| totalBytesTransferred == streamSize) {
if (tbr >= (stepPlastRunner * stepSize * 10)
|| tbr == streamSize) {
LoggerCentral.info(LOGGER, this.fName + " - download in progress: "
+ (totalBytesTransferred * 100 / streamSize) + " %");
+ (tbr * 100 / streamSize) + " %");
stepPlastRunner++;
}

// avoid too many messages
if (totalBytesTransferred >= (step * stepSize)
|| totalBytesTransferred == streamSize) {
if (tbr >= (step * stepSize)
|| tbr == streamSize) {
if (_userMonitor != null) {
_userMonitor.processingFile(workerID, dbConfName,
UserProcessingMonitor.PROCESS_TYPE.FTP_LOADING, fName,
totalBytesTransferred, streamSize);
tbr, streamSize);
}

step++;
Expand All @@ -111,15 +119,15 @@ public void bytesTransferred(long totalBytesTransferred,
if (_userMonitor != null) {
// msg to display in the label
StringBuffer msg = new StringBuffer(
bzh.plealog.dbmirror.util.Utils.getBytes(totalBytesTransferred)
bzh.plealog.dbmirror.util.Utils.getBytes(tbr)
+ "/"
+ bzh.plealog.dbmirror.util.Utils.getBytes(streamSize));
// debit calcul
long seconds = (new Date().getTime() - this.start.getTime()) / 1000;
long koPerSeconds = (totalBytesTransferred / 1024) / seconds;
long koPerSeconds = (tbr / 1024) / seconds;
// msg += " - " + koPerSeconds + " Ko/s";
// estimate end
long totalSecondsToEnd = ((streamSize - totalBytesTransferred) / 1024)
long totalSecondsToEnd = ((streamSize - tbr) / 1024)
/ koPerSeconds;
long hoursToEnd = TimeUnit.SECONDS.toHours(totalSecondsToEnd);
long minutesToEnd = TimeUnit.SECONDS.toMinutes(totalSecondsToEnd)
Expand Down
18 changes: 13 additions & 5 deletions src/bzh/plealog/dbmirror/fetcher/PFTPLoader.java
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/* Copyright (C) 2007-2017 Patrick G. Durand
/* Copyright (C) 2007-2020 Patrick G. Durand
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
Expand Down Expand Up @@ -175,7 +175,7 @@ protected boolean configureFtpClient(FTPClient ftp, DBServerConfig fsc) {
* @return 1 if success, 0 if failure, 2 if skip (file already loaded ; when
* resuming from a previous work) and 3 if aborted.
* */
protected int downloadFile(FTPClient ftp, DBServerConfig fsc, DBMSFtpFile rFile, File file) {
protected int downloadFile(FTPClient ftp, DBServerConfig fsc, DBMSFtpFile rFile, File file, long lclFSize) {
FileOutputStream fos = null;
InputStream ftpIS = null;
String remoteFName;
Expand All @@ -191,13 +191,21 @@ protected int downloadFile(FTPClient ftp, DBServerConfig fsc, DBMSFtpFile rFile,
if (ftp.changeWorkingDirectory(rFile.getRemoteDir())) {
// download file
LoggerCentral.info(LOGGER, " " + getLoaderId() + ": download: " + rFile.getRemoteDir() + remoteFName);
fos = new FileOutputStream(file);

if (lclFSize!=0) {
fos = new FileOutputStream(file, true);
ftp.setRestartOffset(lclFSize);
}
else {
fos = new FileOutputStream(file);
ftp.setRestartOffset(0l);
}
ftpIS = ftp.retrieveFileStream(remoteFName);
if (ftpIS == null) {
throw new Exception(getLoaderId() + ": unable to open remote input stream: " + ftp.getReplyString());
}
Util.copyStream(ftpIS, fos, Util.DEFAULT_COPY_BUFFER_SIZE, remoteFSize,
new MyCopyStreamListener(getLoaderId(), _userMonitor, fsc.getName(), remoteFName, remoteFSize));
new MyCopyStreamListener(getLoaderId(), _userMonitor, fsc.getName(), remoteFName, remoteFSize, lclFSize));
IOUtils.closeQuietly(ftpIS);
fos.flush();
IOUtils.closeQuietly(fos);
Expand Down Expand Up @@ -307,7 +315,7 @@ public int downloadFile(FTPClient ftp, DBServerConfig fsc, DBMSFtpFile rFile,
UserProcessingMonitor.MSG_TYPE.OK,
msg);
}
iRet = downloadFile(ftp, fsc, rFile, file);
iRet = downloadFile(ftp, fsc, rFile, file, lclFSize<remoteFSize?lclFSize:0);
if (_userMonitor != null) {
_userMonitor.processingMessage(getLoaderId(), fsc.getName(),
UserProcessingMonitor.PROCESS_TYPE.FTP_LOADING,
Expand Down

0 comments on commit 9e92600

Please sign in to comment.