Skip to content

Commit

Permalink
Merge pull request #889 from sheiksalahudeen/release-2.1
Browse files Browse the repository at this point in the history
Release 2.1 OLE-9004 : Fixed the issue with inaccurate record counts when doing Incremental exports to vufind
  • Loading branch information
sheiksalahudeen authored Sep 21, 2016
2 parents e7ecc70 + 09487a4 commit d7fc40c
Show file tree
Hide file tree
Showing 13 changed files with 822 additions and 589 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package org.kuali.ole.oleng.dao.export;

import org.kuali.ole.oleng.handler.BatchExportHandler;
import org.kuali.ole.oleng.util.BatchExportUtil;

import java.util.Set;
import java.util.concurrent.Callable;

/**
* Created by sheiks on 16/09/16.
*/
public class ExportBibIdFinderCallable implements Callable {

private String query;
private int start;
private int chunkSize;
private BatchExportHandler batchExportHandler;
private BatchExportUtil batchExportUtil;

public ExportBibIdFinderCallable(String query, int start, int chunkSize, BatchExportHandler batchExportHandler) {
this.query = query;
this.start = start;
this.chunkSize = chunkSize;
this.batchExportHandler = batchExportHandler;

}

@Override
public Object call() throws Exception {
Set<String> bibIdentifiers = getBatchExportUtil().getBibIdentifiersForQuery(query, start, chunkSize);
return bibIdentifiers;
}

public BatchExportUtil getBatchExportUtil() {
if(null == batchExportUtil) {
batchExportUtil = new BatchExportUtil();
}
return batchExportUtil;
}

public void setBatchExportUtil(BatchExportUtil batchExportUtil) {
this.batchExportUtil = batchExportUtil;
}
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package org.kuali.ole.oleng.dao.export;

import com.google.common.collect.Lists;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrDocumentList;
import org.kuali.ole.DocumentUniqueIDPrefix;
import org.kuali.ole.OLEConstants;
import org.kuali.ole.constants.OleNGConstants;
import org.kuali.ole.deliver.service.ParameterValueResolver;
Expand Down Expand Up @@ -53,7 +52,8 @@ private void init() throws Exception {
fetchExtentOfOwnershipType();
}

public void export(BatchExportHandler batchExportHandler, String query, BatchProcessTxObject batchProcessTxObject, OleNGBatchExportResponse oleNGBatchExportResponse) {
public void export(BatchExportHandler batchExportHandler, String query, BatchProcessTxObject batchProcessTxObject,
OleNGBatchExportResponse oleNGBatchExportResponse, boolean isIncremental) {
try {
init();
this.chunkSize = batchExportHandler.getBatchChunkSize();
Expand All @@ -62,20 +62,48 @@ public void export(BatchExportHandler batchExportHandler, String query, BatchPro
int fileCount = 1;
int fileSize = batchProcessTxObject.getBatchJobDetails().getNumOfRecordsInFile();
int numOfRecordsInFile = 0;
SolrDocumentList solrDocumentList = batchExportHandler.getSolrRequestReponseHandler().getSolrDocumentList(query, start, chunkSize, OleNGConstants.BIB_IDENTIFIER);
SolrDocumentList solrDocumentList = batchExportHandler.getSolrRequestReponseHandler().getSolrDocumentList(
query, null, null, OleNGConstants.BIB_IDENTIFIER);
totalCount = solrDocumentList.getNumFound();

List<String> bibIds = new ArrayList<>();

if(isIncremental) {
bibIds = getBibIds(query, totalCount, chunkSize, batchExportHandler, batchProcessTxObject);
totalCount = bibIds.size();
if(fileSize < chunkSize) {
chunkSize = fileSize;
}
}

batchProcessTxObject.getBatchJobDetails().setTotalRecords(String.valueOf(totalCount));
oleNGBatchExportResponse.setTotalNumberOfRecords((int) totalCount);
batchExportHandler.updateBatchJob(batchProcessTxObject.getBatchJobDetails());
do {
futures.add(executorService.submit(new ExportDaoCallableImpl(commonFields, getJdbcTemplate(), start, chunkSize, query, fileCount, batchExportHandler, batchProcessTxObject, oleNGBatchExportResponse)));
numOfRecordsInFile += chunkSize;
if (numOfRecordsInFile == fileSize) {
fileCount++;
numOfRecordsInFile = 0;

if(isIncremental) {
List<List<String>> partition = Lists.partition(bibIds, chunkSize);
for (Iterator<List<String>> iterator = partition.iterator(); iterator.hasNext(); ) {
List<String> bibIdLists = iterator.next();
futures.add(executorService.submit(new IncrementalExportCallableImpl(commonFields, getJdbcTemplate(),bibIdLists,
fileCount, batchExportHandler, batchProcessTxObject)));
numOfRecordsInFile += chunkSize;
if (numOfRecordsInFile >= fileSize) {
fileCount++;
numOfRecordsInFile = 0;
}
}
start += chunkSize;
} while (start <= totalCount);
} else {
do {
futures.add(executorService.submit(new ExportDaoCallableImpl(commonFields, getJdbcTemplate(), query,
start, chunkSize, fileCount, batchExportHandler, batchProcessTxObject)));
numOfRecordsInFile += chunkSize;
if (numOfRecordsInFile == fileSize) {
fileCount++;
numOfRecordsInFile = 0;
}
start += chunkSize;
} while (start <= totalCount);
}
prepareBatchExportResponse(futures, batchExportHandler, batchProcessTxObject, oleNGBatchExportResponse);
executorService.shutdown();
} catch (Exception e) {
Expand All @@ -84,14 +112,41 @@ public void export(BatchExportHandler batchExportHandler, String query, BatchPro
}
}

private void prepareBatchExportResponse(List<Future> futures, BatchExportHandler batchExportHandler, BatchProcessTxObject batchProcessTxObject, OleNGBatchExportResponse oleNGBatchExportResponse) {
private List<String> getBibIds(String query, long totalCount, int chunkSize, BatchExportHandler batchExportHandler, BatchProcessTxObject batchProcessTxObject) {
Set<String> bibIdentifiers = new HashSet<>();
List<Future> futures = new ArrayList<>();
int startIndex = 0;
ExecutorService executorService = Executors.newFixedThreadPool(getMaximumNumberOfThreadForExportService());
do {
futures.add(executorService.submit(new ExportBibIdFinderCallable(query,startIndex, chunkSize, batchExportHandler)));
startIndex += chunkSize;
} while (startIndex <= totalCount);

for (Future future : futures) {
try {
Object response = future.get();
if (null != response) {
Set<String> bibIds = (Set<String>) response;
bibIdentifiers.addAll(bibIds);
}
} catch (Exception e) {
e.printStackTrace();
batchExportHandler.addBatchExportFailureResponseToExchange(e, null, batchProcessTxObject.getExchangeObjectForBatchExport());
}
}
return new ArrayList<String>(bibIdentifiers);
}

private void prepareBatchExportResponse(List<Future> futures, BatchExportHandler batchExportHandler, BatchProcessTxObject batchProcessTxObject, OleNGBatchExportResponse originalResponse) {
if (CollectionUtils.isNotEmpty(futures)) {
for (Future future : futures) {
try {
if (null != future.get()) {
OleNGBatchExportResponse exportResponse = (OleNGBatchExportResponse) future.get();
batchProcessTxObject.getBatchJobDetails().setTotalFailureRecords(String.valueOf(exportResponse.getNoOfFailureRecords()));
batchProcessTxObject.getBatchJobDetails().setTotalRecordsProcessed(String.valueOf(exportResponse.getNoOfSuccessRecords() + exportResponse.getNoOfFailureRecords()));
Object response = future.get();
if (null != response) {
OleNGBatchExportResponse exportResponse = (OleNGBatchExportResponse) response;
mergeResponses(originalResponse, exportResponse);
batchProcessTxObject.getBatchJobDetails().setTotalFailureRecords(String.valueOf(originalResponse.getNoOfFailureRecords()));
batchProcessTxObject.getBatchJobDetails().setTotalRecordsProcessed(String.valueOf(originalResponse.getNoOfSuccessRecords() + originalResponse.getNoOfFailureRecords()));
batchExportHandler.updateBatchJob(batchProcessTxObject.getBatchJobDetails());
}
} catch (InterruptedException e) {
Expand All @@ -103,6 +158,14 @@ private void prepareBatchExportResponse(List<Future> futures, BatchExportHandler
}
}

private void mergeResponses(OleNGBatchExportResponse originalResponse, OleNGBatchExportResponse exportResponse) {
originalResponse.addNoOfSuccessRecords(exportResponse.getNoOfSuccessRecords());
originalResponse.addNoOfFailureRecords(exportResponse.getNoOfFailureRecords());
originalResponse.getBatchExportSuccessResponseList().addAll(exportResponse.getBatchExportSuccessResponseList());
originalResponse.getBatchExportFailureResponseList().addAll(exportResponse.getBatchExportFailureResponseList());
originalResponse.getDeletedBibIds().addAll(exportResponse.getDeletedBibIds());
}


private void fetchCallNumberType() throws SQLException {
SqlRowSet resultSet = getJdbcTemplate().queryForRowSet("SELECT SHVLG_SCHM_ID,SHVLG_SCHM_CD,SHVLG_SCHM_NM from OLE_CAT_SHVLG_SCHM_T");
Expand Down
Loading

0 comments on commit d7fc40c

Please sign in to comment.