Skip to content

Commit

Permalink
fix: always flush the batch handler for CompleteDataSet import [DHIS2…
Browse files Browse the repository at this point in the history
…-15362] (#15618)

* fix: always flush the batch handler for CompleteDataSet import [DHIS2-15362]

* chore: removed unused parameter and method overloads [DHIS2-15362]
  • Loading branch information
jbee authored Nov 8, 2023
1 parent 367fde4 commit 53cda5d
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.hisp.dhis.common.IdSchemes;
import org.hisp.dhis.dxf2.common.ImportOptions;
import org.hisp.dhis.dxf2.importsummary.ImportSummary;
import org.hisp.dhis.scheduling.JobConfiguration;

/**
* Import/export service for {@link CompleteDataSetRegistration data set completion registrations}.
Expand Down Expand Up @@ -109,17 +108,6 @@ void writeCompleteDataSetRegistrationsJson(
*/
ImportSummary saveCompleteDataSetRegistrationsXml(InputStream in, ImportOptions importOptions);

/**
* Imports {@link CompleteDataSetRegistrations} from an XML payload.
*
* @param in the stream providing the XML payload.
* @param importOptions the options for the import.
* @param jobId the task (optional).
* @return a summary of the import process.
*/
ImportSummary saveCompleteDataSetRegistrationsXml(
InputStream in, ImportOptions importOptions, JobConfiguration jobId);

/**
* Imports {@link CompleteDataSetRegistrations} from a JSON payload.
*
Expand All @@ -129,22 +117,10 @@ ImportSummary saveCompleteDataSetRegistrationsXml(
*/
ImportSummary saveCompleteDataSetRegistrationsJson(InputStream in, ImportOptions importOptions);

/**
* Imports {@link CompleteDataSetRegistrations} from a JSON payload.
*
* @param in the stream providing the XML payload.
* @param importOptions the options for the import.
* @param jobId the task (optional).
* @return a summary of the import process.
*/
ImportSummary saveCompleteDataSetRegistrationsJson(
InputStream in, ImportOptions importOptions, JobConfiguration jobId);

/**
* Validates the given {@link ExportParams}.
*
* @param params the export parameters.
* @throws IllegalQueryException if validation failed.
*/
void validate(ExportParams params);
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@
package org.hisp.dhis.dxf2.dataset;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import javax.annotation.Nonnull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -76,7 +78,6 @@
import org.hisp.dhis.period.Period;
import org.hisp.dhis.period.PeriodService;
import org.hisp.dhis.period.PeriodType;
import org.hisp.dhis.scheduling.JobConfiguration;
import org.hisp.dhis.setting.SystemSettingManager;
import org.hisp.dhis.system.util.Clock;
import org.hisp.dhis.system.util.ValidationUtils;
Expand Down Expand Up @@ -223,47 +224,50 @@ public void writeCompleteDataSetRegistrationsJson(
@Transactional
public ImportSummary saveCompleteDataSetRegistrationsXml(
InputStream in, ImportOptions importOptions) {
return saveCompleteDataSetRegistrationsXml(in, importOptions, null);
return saveCompleteDataSetRegistrations(importOptions, () -> readRegistrationsFromXml(in));
}

@Override
@Transactional
public ImportSummary saveCompleteDataSetRegistrationsXml(
InputStream in, ImportOptions importOptions, JobConfiguration jobId) {
try {
in = StreamUtils.wrapAndCheckCompressionFormat(in);
CompleteDataSetRegistrations completeDataSetRegistrations =
new StreamingXmlCompleteDataSetRegistrations(XMLFactory.getXMLReader(in));

return saveCompleteDataSetRegistrations(importOptions, completeDataSetRegistrations);
} catch (Exception ex) {
return handleImportError(ex);
}
@Nonnull
private static CompleteDataSetRegistrations readRegistrationsFromXml(InputStream in)
throws IOException {
in = StreamUtils.wrapAndCheckCompressionFormat(in);
return new StreamingXmlCompleteDataSetRegistrations(XMLFactory.getXMLReader(in));
}

@Override
@Transactional
public ImportSummary saveCompleteDataSetRegistrationsJson(
InputStream in, ImportOptions importOptions) {
return saveCompleteDataSetRegistrationsJson(in, importOptions, null);
return saveCompleteDataSetRegistrations(importOptions, () -> readRegistrationsFromJson(in));
}

@Override
@Transactional
public ImportSummary saveCompleteDataSetRegistrationsJson(
InputStream in, ImportOptions importOptions, JobConfiguration jobId) {
private ImportSummary saveCompleteDataSetRegistrations(
ImportOptions importOptions,
Callable<CompleteDataSetRegistrations> deserializeRegistrations) {
BatchHandler<CompleteDataSetRegistration> batchHandler =
batchHandlerFactory.createBatchHandler(CompleteDataSetRegistrationBatchHandler.class);
try {
in = StreamUtils.wrapAndCheckCompressionFormat(in);
CompleteDataSetRegistrations completeDataSetRegistrations = deserializeRegistrations.call();
ImportSummary summary =
saveCompleteDataSetRegistrations(
importOptions, completeDataSetRegistrations, batchHandler);

CompleteDataSetRegistrations completeDataSetRegistrations =
jsonMapper.readValue(in, CompleteDataSetRegistrations.class);
batchHandler.flush();

return saveCompleteDataSetRegistrations(importOptions, completeDataSetRegistrations);
return summary;
} catch (Exception ex) {
batchHandler.flush();
return handleImportError(ex);
}
}

@Nonnull
private CompleteDataSetRegistrations readRegistrationsFromJson(InputStream in)
throws IOException {
in = StreamUtils.wrapAndCheckCompressionFormat(in);
return jsonMapper.readValue(in, CompleteDataSetRegistrations.class);
}

@Override
public void validate(ExportParams params) throws IllegalQueryException {
ErrorMessage error = null;
Expand Down Expand Up @@ -367,7 +371,9 @@ private ImportSummary handleImportError(Throwable ex) {
}

private ImportSummary saveCompleteDataSetRegistrations(
ImportOptions importOptions, CompleteDataSetRegistrations completeRegistrations) {
ImportOptions importOptions,
CompleteDataSetRegistrations completeRegistrations,
BatchHandler<CompleteDataSetRegistration> batchHandler) {
Clock clock =
new Clock(log).startClock().logTime("Starting complete data set registration import");

Expand Down Expand Up @@ -406,7 +412,8 @@ private ImportSummary saveCompleteDataSetRegistrations(
// ---------------------------------------------------------------------

int totalCount =
batchImport(completeRegistrations, cfg, importSummary, metaDataCallables, caches);
batchImport(
completeRegistrations, cfg, importSummary, metaDataCallables, caches, batchHandler);

ImportCount count = importSummary.getImportCount();

Expand All @@ -428,16 +435,14 @@ private int batchImport(
ImportConfig config,
ImportSummary summary,
MetadataCallables mdCallables,
MetadataCaches mdCaches) {
MetadataCaches mdCaches,
BatchHandler<CompleteDataSetRegistration> batchHandler) {
final User currentUser = currentUserService.getCurrentUser();
final String currentUserName = currentUser.getUsername();
final Set<OrganisationUnit> userOrgUnits = currentUserService.getCurrentUserOrganisationUnits();
final I18n i18n = i18nManager.getI18n();

BatchHandler<CompleteDataSetRegistration> batchHandler =
batchHandlerFactory
.createBatchHandler(CompleteDataSetRegistrationBatchHandler.class)
.init();
batchHandler.init();

int importCount = 0, updateCount = 0, deleteCount = 0, totalCount = 0;

Expand Down Expand Up @@ -607,8 +612,6 @@ private int batchImport(
}
}

batchHandler.flush();

finalizeSummary(summary, totalCount, importCount, updateCount, deleteCount);

return totalCount;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,9 @@ public void execute(JobConfiguration jobConfig, JobProgress progress) {
ImportSummary summary =
switch (contentType) {
case "application/json" -> progress.runStage(
() ->
registrationService.saveCompleteDataSetRegistrationsJson(
input, options, jobConfig));
() -> registrationService.saveCompleteDataSetRegistrationsJson(input, options));
case "application/xml" -> progress.runStage(
() ->
registrationService.saveCompleteDataSetRegistrationsXml(
input, options, jobConfig));
() -> registrationService.saveCompleteDataSetRegistrationsXml(input, options));
default -> {
progress.failedStage("Unknown format: " + contentType);
yield null;
Expand Down

0 comments on commit 53cda5d

Please sign in to comment.