Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: always flush the batch handler for CompleteDataSet import [DHIS2-15362] #15618

Merged
merged 2 commits into from
Nov 8, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@
package org.hisp.dhis.dxf2.dataset;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import javax.annotation.Nonnull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -230,15 +232,15 @@
@Transactional
public ImportSummary saveCompleteDataSetRegistrationsXml(
InputStream in, ImportOptions importOptions, JobConfiguration jobId) {
try {
in = StreamUtils.wrapAndCheckCompressionFormat(in);
CompleteDataSetRegistrations completeDataSetRegistrations =
new StreamingXmlCompleteDataSetRegistrations(XMLFactory.getXMLReader(in));
return saveCompleteDataSetRegistrations(
importOptions, jobId, () -> readRegistrationsFromXml(in));
}

return saveCompleteDataSetRegistrations(importOptions, completeDataSetRegistrations);
} catch (Exception ex) {
return handleImportError(ex);
}
@Nonnull
private static CompleteDataSetRegistrations readRegistrationsFromXml(InputStream in)
throws IOException {
in = StreamUtils.wrapAndCheckCompressionFormat(in);
return new StreamingXmlCompleteDataSetRegistrations(XMLFactory.getXMLReader(in));
}

@Override
Expand All @@ -252,18 +254,38 @@
@Transactional
public ImportSummary saveCompleteDataSetRegistrationsJson(
InputStream in, ImportOptions importOptions, JobConfiguration jobId) {
return saveCompleteDataSetRegistrations(
importOptions, jobId, () -> readRegistrationsFromJson(in));
}

private ImportSummary saveCompleteDataSetRegistrations(
ImportOptions importOptions,
JobConfiguration jobId,
Fixed Show fixed Hide fixed
Callable<CompleteDataSetRegistrations> deserializeRegistrations) {
BatchHandler<CompleteDataSetRegistration> batchHandler =
batchHandlerFactory.createBatchHandler(CompleteDataSetRegistrationBatchHandler.class);
try {
in = StreamUtils.wrapAndCheckCompressionFormat(in);
CompleteDataSetRegistrations completeDataSetRegistrations = deserializeRegistrations.call();
ImportSummary summary =
saveCompleteDataSetRegistrations(
importOptions, completeDataSetRegistrations, batchHandler);

CompleteDataSetRegistrations completeDataSetRegistrations =
jsonMapper.readValue(in, CompleteDataSetRegistrations.class);
batchHandler.flush();

return saveCompleteDataSetRegistrations(importOptions, completeDataSetRegistrations);
return summary;
} catch (Exception ex) {
batchHandler.flush();
return handleImportError(ex);
}
}

@Nonnull
private CompleteDataSetRegistrations readRegistrationsFromJson(InputStream in)
throws IOException {
in = StreamUtils.wrapAndCheckCompressionFormat(in);
return jsonMapper.readValue(in, CompleteDataSetRegistrations.class);
}

@Override
public void validate(ExportParams params) throws IllegalQueryException {
ErrorMessage error = null;
Expand Down Expand Up @@ -367,7 +389,9 @@
}

private ImportSummary saveCompleteDataSetRegistrations(
ImportOptions importOptions, CompleteDataSetRegistrations completeRegistrations) {
ImportOptions importOptions,
CompleteDataSetRegistrations completeRegistrations,
BatchHandler<CompleteDataSetRegistration> batchHandler) {
Clock clock =
new Clock(log).startClock().logTime("Starting complete data set registration import");

Expand Down Expand Up @@ -406,7 +430,8 @@
// ---------------------------------------------------------------------

int totalCount =
batchImport(completeRegistrations, cfg, importSummary, metaDataCallables, caches);
batchImport(
completeRegistrations, cfg, importSummary, metaDataCallables, caches, batchHandler);

ImportCount count = importSummary.getImportCount();

Expand All @@ -428,16 +453,14 @@
ImportConfig config,
ImportSummary summary,
MetadataCallables mdCallables,
MetadataCaches mdCaches) {
MetadataCaches mdCaches,
BatchHandler<CompleteDataSetRegistration> batchHandler) {
final User currentUser = currentUserService.getCurrentUser();
final String currentUserName = currentUser.getUsername();
final Set<OrganisationUnit> userOrgUnits = currentUserService.getCurrentUserOrganisationUnits();
final I18n i18n = i18nManager.getI18n();

BatchHandler<CompleteDataSetRegistration> batchHandler =
batchHandlerFactory
.createBatchHandler(CompleteDataSetRegistrationBatchHandler.class)
.init();
batchHandler.init();

int importCount = 0, updateCount = 0, deleteCount = 0, totalCount = 0;

Expand Down Expand Up @@ -607,8 +630,6 @@
}
}

batchHandler.flush();

finalizeSummary(summary, totalCount, importCount, updateCount, deleteCount);

return totalCount;
Expand Down
Loading