Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into ld-20240919-evaluat…
Browse files Browse the repository at this point in the history
…e-measure-timezone
  • Loading branch information
lukedegruchy committed Oct 2, 2024
2 parents 26a8cd0 + 5fe92bd commit 3e30423
Show file tree
Hide file tree
Showing 50 changed files with 1,622 additions and 400 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,15 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public abstract class BaseRuntimeElementDefinition<T extends IBase> {

private static final Class<Void> VOID_CLASS = Void.class;
private final Class<? extends T> myImplementingClass;
private final String myName;
private final boolean myStandardType;
private Map<Class<?>, Constructor<T>> myConstructors = Collections.synchronizedMap(new HashMap<>());
private final Map<Class<?>, Constructor<T>> myConstructors = new ConcurrentHashMap<>();
private List<RuntimeChildDeclaredExtensionDefinition> myExtensions = new ArrayList<>();
private List<RuntimeChildDeclaredExtensionDefinition> myExtensionsModifier = new ArrayList<>();
private List<RuntimeChildDeclaredExtensionDefinition> myExtensionsNonModifier = new ArrayList<>();
Expand Down Expand Up @@ -84,27 +85,24 @@ private Constructor<T> getConstructor(@Nullable Object theArgument) {
argumentType = theArgument.getClass();
}

Constructor<T> retVal = myConstructors.get(argumentType);
if (retVal == null) {
Constructor<T> retVal = myConstructors.computeIfAbsent(argumentType, type -> {
for (Constructor<?> next : getImplementingClass().getConstructors()) {
if (argumentType == VOID_CLASS) {
if (type == VOID_CLASS) {
if (next.getParameterTypes().length == 0) {
retVal = (Constructor<T>) next;
break;
}
} else if (next.getParameterTypes().length == 1) {
if (next.getParameterTypes()[0].isAssignableFrom(argumentType)) {
retVal = (Constructor<T>) next;
break;
return (Constructor<T>) next;
}
} else if (next.getParameterTypes().length == 1 && next.getParameterTypes()[0].isAssignableFrom(type)) {
return (Constructor<T>) next;
}
}
if (retVal == null) {
throw new ConfigurationException(Msg.code(1695) + "Class " + getImplementingClass()
+ " has no constructor with a single argument of type " + argumentType);
}
myConstructors.put(argumentType, retVal);
return null;
});

if (retVal == null) {
throw new ConfigurationException(Msg.code(1695) + "Class " + getImplementingClass()
+ " has no constructor with a single argument of type " + argumentType);
}

return retVal;
}

Expand Down
8 changes: 8 additions & 0 deletions hapi-fhir-checkstyle/src/checkstyle/hapi-base-checkstyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,12 @@
<property name="format" value="^(Base|Abstract).+$"/>
</module>
</module>

<!-- for suppression of rules; to use, surround code to exclude with comments: -->
<!-- CHECKSTYLE.OFF RuleToDisable AND CHECKSTYLE.ON RuleToDisable -->
<module name="SuppressWithPlainTextCommentFilter">
<property name="offCommentFormat" value="CHECKSTYLE.OFF\: ([\w\|]+)" />
<property name="onCommentFormat" value="CHECKSTYLE.ON\: ([\w\|]+)" />
</module>

</module>
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
---
type: fix
issue: 6285
title: "Updated the Reindex Batch2 job to allow
for an additional step that will check to ensure
that no pending 'reindex' work is needed.
This was done to prevent a bug in which
value set expansion would not return all
the existing CodeSystem Concepts after
a reindex call, due to some of the concepts
being deferred to future job runs.
As such, `$reindex` operations on CodeSystems
will no longer result in incorrect value set
expansion when such an expansion is called
'too soon' after a $reindex operation.
"
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
type: perf
issue: 6323
title: "A synchronization choke point was removed from the model object initialization code, reducing the risk of
multi-thread contention."
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import ca.uhn.fhir.batch2.api.IJobCoordinator;
import ca.uhn.fhir.batch2.api.IJobPartitionProvider;
import ca.uhn.fhir.batch2.jobs.reindex.ReindexAppCtx;
import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters;
import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;
import ca.uhn.fhir.context.FhirVersionEnum;
Expand Down Expand Up @@ -158,6 +157,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static ca.uhn.fhir.batch2.jobs.reindex.ReindexUtils.JOB_REINDEX;
import static java.util.Objects.isNull;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
Expand Down Expand Up @@ -1315,7 +1315,7 @@ protected void requestReindexForRelatedResources(
myJobPartitionProvider.getPartitionedUrls(theRequestDetails, urls).forEach(params::addPartitionedUrl);

JobInstanceStartRequest request = new JobInstanceStartRequest();
request.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX);
request.setJobDefinitionId(JOB_REINDEX);
request.setParameters(params);
myJobCoordinator.startInstance(theRequestDetails, request);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@
import ca.uhn.fhir.context.support.LookupCodeRequest;
import ca.uhn.fhir.context.support.ValidationSupportContext;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDaoCodeSystem;
import ca.uhn.fhir.jpa.api.dao.ReindexOutcome;
import ca.uhn.fhir.jpa.api.dao.ReindexParameters;
import ca.uhn.fhir.jpa.api.model.ReindexJobStatus;
import ca.uhn.fhir.jpa.api.svc.IIdHelperService;
import ca.uhn.fhir.jpa.model.cross.IBasePersistedResource;
import ca.uhn.fhir.jpa.model.entity.ResourceTable;
Expand Down Expand Up @@ -176,6 +180,47 @@ protected void preDelete(T theResourceToDelete, ResourceTable theEntityToDelete,
myTermDeferredStorageSvc.deleteCodeSystemForResource(theEntityToDelete);
}

/**
* If there are more code systems to process
* than {@link JpaStorageSettings#getDeferIndexingForCodesystemsOfSize()},
* then these codes will have their processing deferred (for a later time).
*
* This can result in future reindex steps *skipping* these code systems (if
* they're still deferred) and thus incorrect expansions resulting.
*
* So we override the reindex method for CodeSystems specifically to
* force reindex batch jobs to wait until all code systems are processed before
* moving on.
*/
@SuppressWarnings("rawtypes")
@Override
public ReindexOutcome reindex(
IResourcePersistentId thePid,
ReindexParameters theReindexParameters,
RequestDetails theRequest,
TransactionDetails theTransactionDetails) {
ReindexOutcome outcome = super.reindex(thePid, theReindexParameters, theRequest, theTransactionDetails);

if (outcome.getWarnings().isEmpty()) {
outcome.setHasPendingWork(true);
}
return outcome;
}

@Override
public ReindexJobStatus getReindexJobStatus() {
boolean isQueueEmpty = myTermDeferredStorageSvc.isStorageQueueEmpty(true);

ReindexJobStatus status = new ReindexJobStatus();
status.setHasReindexWorkPending(!isQueueEmpty);
if (status.isHasReindexWorkPending()) {
// force a run
myTermDeferredStorageSvc.saveDeferred();
}

return status;
}

@Override
public ResourceTable updateEntity(
RequestDetails theRequest,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ private void addConceptInHierarchy(
if (theStatisticsTracker.getUpdatedConceptCount() <= myStorageSettings.getDeferIndexingForCodesystemsOfSize()) {
saveConcept(conceptToAdd);
Long nextConceptPid = conceptToAdd.getId();
Validate.notNull(nextConceptPid);
Objects.requireNonNull(nextConceptPid);
} else {
myDeferredStorageSvc.addConceptToStorageQueue(conceptToAdd);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import ca.uhn.fhir.batch2.api.IJobCoordinator;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.jpa.batch.models.Batch2JobStartResponse;
import ca.uhn.fhir.jpa.dao.data.ITermCodeSystemDao;
import ca.uhn.fhir.jpa.dao.data.ITermCodeSystemVersionDao;
Expand Down Expand Up @@ -79,6 +78,8 @@ public class TermDeferredStorageSvcImpl implements ITermDeferredStorageSvc, IHas
private static final long SAVE_ALL_DEFERRED_WARN_MINUTES = 1;
private static final long SAVE_ALL_DEFERRED_ERROR_MINUTES = 5;
private boolean myAllowDeferredTasksTimeout = true;
private static final List<String> BATCH_JOBS_TO_CARE_ABOUT =
List.of(TERM_CODE_SYSTEM_DELETE_JOB_NAME, TERM_CODE_SYSTEM_VERSION_DELETE_JOB_NAME);
private final List<TermCodeSystem> myDeferredCodeSystemsDeletions = Collections.synchronizedList(new ArrayList<>());
private final Queue<TermCodeSystemVersion> myDeferredCodeSystemVersionsDeletions = new ConcurrentLinkedQueue<>();
private final List<TermConcept> myDeferredConcepts = Collections.synchronizedList(new ArrayList<>());
Expand Down Expand Up @@ -436,7 +437,7 @@ public boolean isStorageQueueEmpty(boolean theIncludeExecutingJobs) {
return retVal;
}

private boolean isJobsExecuting() {
public boolean isJobsExecuting() {
cleanseEndedJobs();

return !myJobExecutions.isEmpty();
Expand All @@ -448,15 +449,18 @@ private void cleanseEndedJobs() {
* This is mostly a fail-safe
* because "cancelled" jobs are never removed.
*/
List<String> executions = new ArrayList<>(myJobExecutions);
List<String> idsToDelete = new ArrayList<>();
for (String id : executions) {
// TODO - might want to consider a "fetch all instances"
JobInstance instance = myJobCoordinator.getInstance(id);
if (StatusEnum.getEndedStatuses().contains(instance.getStatus())) {
for (String jobId : BATCH_JOBS_TO_CARE_ABOUT) {
List<JobInstance> jobInstanceInEndedState = myJobCoordinator.getInstancesbyJobDefinitionIdAndEndedStatus(
jobId,
true, // ended = true (COMPLETED, FAILED, CANCELLED jobs only)
Math.max(myJobExecutions.size(), 1), // at most this many
0);
for (JobInstance instance : jobInstanceInEndedState) {
idsToDelete.add(instance.getInstanceId());
}
}

for (String id : idsToDelete) {
myJobExecutions.remove(id);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ default boolean isStorageQueueEmpty() {

void logQueueForUnitTest();

boolean isJobsExecuting();

/**
* Only to be used from tests - Disallow test timeouts on deferred tasks
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
package ca.uhn.fhir.jpa.provider.dstu3;

import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import ca.uhn.fhir.batch2.jobs.reindex.ReindexAppCtx;
import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.jpa.dao.BaseHapiFhirDao;
import ca.uhn.fhir.jpa.model.entity.StorageSettings;
import ca.uhn.fhir.jpa.model.entity.ResourceTable;
import ca.uhn.fhir.jpa.model.entity.StorageSettings;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.api.server.IBundleProvider;
Expand Down Expand Up @@ -46,9 +43,11 @@
import java.util.List;
import java.util.Map;

import static ca.uhn.fhir.batch2.jobs.reindex.ReindexUtils.JOB_REINDEX;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.fail;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.fail;


Expand Down Expand Up @@ -202,7 +201,7 @@ public void testCreatingParamMarksCorrectResourcesForReindexing() {
mySearchParameterDao.create(fooSp, mySrd);

runInTransaction(()->{
List<JobInstance> allJobs = myBatch2JobHelper.findJobsByDefinition(ReindexAppCtx.JOB_REINDEX);
List<JobInstance> allJobs = myBatch2JobHelper.findJobsByDefinition(JOB_REINDEX);
assertEquals(1, allJobs.size());
assertEquals(1, allJobs.get(0).getParameters(ReindexJobParameters.class).getPartitionedUrls().size());
assertEquals("Patient?", allJobs.get(0).getParameters(ReindexJobParameters.class).getPartitionedUrls().get(0).getUrl());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import java.util.UUID;
import java.util.stream.Collectors;

import static ca.uhn.fhir.batch2.jobs.reindex.ReindexUtils.JOB_REINDEX;
import static ca.uhn.fhir.jpa.dao.BaseHapiFhirDao.INDEX_STATUS_INDEXED;
import static ca.uhn.fhir.jpa.dao.BaseHapiFhirDao.INDEX_STATUS_INDEXING_FAILED;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -1075,7 +1076,7 @@ private void executeReindex(String... theUrls) {
parameters.addUrl(url);
}
JobInstanceStartRequest startRequest = new JobInstanceStartRequest();
startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX);
startRequest.setJobDefinitionId(JOB_REINDEX);
startRequest.setParameters(parameters);
Batch2JobStartResponse res = myJobCoordinator.startInstance(mySrd, startRequest);
ourLog.info("Started reindex job with id {}", res.getInstanceId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import ca.uhn.fhir.jpa.model.entity.StorageSettings;
import ca.uhn.fhir.jpa.model.util.SearchParamHash;
import ca.uhn.fhir.jpa.model.util.UcumServiceUtil;
import ca.uhn.fhir.jpa.reindex.ReindexStepTest;
import ca.uhn.fhir.jpa.reindex.ReindexStepV1Test;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.jpa.test.BaseJpaR4Test;
import ca.uhn.fhir.rest.param.BaseParam;
Expand Down Expand Up @@ -57,6 +57,7 @@

import java.util.List;

import static ca.uhn.fhir.batch2.jobs.reindex.ReindexUtils.JOB_REINDEX;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
Expand Down Expand Up @@ -312,7 +313,7 @@ private void executeReindex(String... theUrls) {
parameters.addUrl(url);
}
JobInstanceStartRequest startRequest = new JobInstanceStartRequest();
startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX);
startRequest.setJobDefinitionId(JOB_REINDEX);
startRequest.setParameters(parameters);
Batch2JobStartResponse res = myJobCoordinator.startInstance(mySrd, startRequest);
ourLog.info("Started reindex job with id {}", res.getInstanceId());
Expand All @@ -321,7 +322,7 @@ private void executeReindex(String... theUrls) {

// Additional existing tests with enabled IndexStorageOptimized
@Nested
public class IndexStorageOptimizedReindexStepTest extends ReindexStepTest {
public class IndexStorageOptimizedReindexStepTestV1 extends ReindexStepV1Test {
@BeforeEach
void setUp() {
myStorageSettings.setIndexStorageOptimized(true);
Expand Down
Loading

0 comments on commit 3e30423

Please sign in to comment.