Skip to content
This repository has been archived by the owner on Apr 13, 2023. It is now read-only.

Commit

Permalink
Merge pull request #63 from jvfullam/issue#18
Browse files Browse the repository at this point in the history
Added test for Issue #18 that verifies each partition gets its own PUD
  • Loading branch information
scottkurz authored Sep 29, 2016
2 parents 4608372 + 752a27d commit dbed125
Show file tree
Hide file tree
Showing 5 changed files with 272 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Copyright 2016 International Business Machines Corp.
*
* See the NOTICE file distributed with this work for additional information
* regarding copyright ownership. Licensed under the Apache License,
* Version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ibm.jbatch.tck.artifacts.specialized;

import java.io.Serializable;

import javax.batch.api.BatchProperty;
import javax.batch.api.chunk.AbstractItemReader;
import javax.batch.runtime.context.JobContext;
import javax.batch.runtime.context.StepContext;
import javax.inject.Inject;

import com.ibm.jbatch.tck.artifacts.basicchunk.BasicItem;

/*NOTE: Code for this class is taken substantially from basicchunk.BasicReader*/
@javax.inject.Named("PUDPartitionReader")
public class PUDPartitionReader extends AbstractItemReader {

@Inject
JobContext jobCtx;

@Inject
StepContext stepCtx;

@Inject @BatchProperty(name="partition.number")
String partitionNumber;

@Inject @BatchProperty(name="execution.number")
String executionNumber;

@Inject
@BatchProperty(name = "number.of.items.to.be.read")
String injectedNumberOfItemsToBeRead;
//Default: read 10 items
private int numberOfItemsToBeRead = 10;

@Inject
@BatchProperty(name = "throw.reader.exception.for.these.items")
String injectedThrowReaderExceptionForTheseItems;
//Default: don't throw any exceptions
private int[] throwReaderExceptionForTheseItems = {};

private String PUDString;
private int currentItemId = -1;
private BasicItem currentItem = null;

@Override
public void open(Serializable checkpoint) {

PUDString = "PUD for Partition: " +partitionNumber;

// Set on the first execution; on later executions it will need to be obtained
// from the job repository's persistent store.
if (checkpoint == null) {
stepCtx.setPersistentUserData(PUDString);
}

if (injectedNumberOfItemsToBeRead != null) {
numberOfItemsToBeRead = Integer.parseInt(injectedNumberOfItemsToBeRead);
}

if (injectedThrowReaderExceptionForTheseItems != null) {
String[] exceptionsStringArray = injectedThrowReaderExceptionForTheseItems.split(",");
throwReaderExceptionForTheseItems = new int[exceptionsStringArray.length];
for (int i = 0; i < exceptionsStringArray.length; i++) {
throwReaderExceptionForTheseItems[i] = Integer.parseInt(exceptionsStringArray[i]);
}
}
}

@Override
public BasicItem readItem() throws Exception {

if (executionNumber.equals("2")) {
if (!stepCtx.getPersistentUserData().equals(PUDString)) {
throw new Exception("BadPersistentUserData: PUD for partition "+partitionNumber+" is expected to be "+PUDString+", but found "+stepCtx.getPersistentUserData());
}
}

//Code below is take from BasicReader

/* Note that BasicReader has no concept of rolling back after a retryable exception is thrown.
* Example: chunk size is 2, we plan to read 10 items (#0-#9), but a retryable exception is thrown while reading item #1
* In this case, the reader goes on to read #2 when it should be rolling back to #0, and so the writer will never receive
* #0, even though it was previously read successfully */

currentItemId++;

if (currentItemId < numberOfItemsToBeRead) {
currentItem = new BasicItem(currentItemId);
if (readerExceptionShouldBeThrownForCurrentItem()) {
//set the job exit status so we can determine which exception was last thrown
jobCtx.setExitStatus("Exception:Item#" + currentItem.getId());
throw new Exception("Exception thrown for item " + currentItem.getId());
}
currentItem.setRead(true);
return currentItem;
}

return null;
}

private boolean readerExceptionShouldBeThrownForCurrentItem() {

for (int i: throwReaderExceptionForTheseItems) {
if (currentItem.getId()==i) { return true; }
}

return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/**
* Copyright 2016 International Business Machines Corp.
*
* See the NOTICE file distributed with this work for additional information
* regarding copyright ownership. Licensed under the Apache License,
* Version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ibm.jbatch.tck.artifacts.specialized;

import javax.batch.api.partition.AbstractPartitionReducer;
import javax.batch.runtime.context.StepContext;
import javax.inject.Inject;

@javax.inject.Named("PUDPartitionReducer")
public class PUDPartitionReducer extends AbstractPartitionReducer {

@Inject
StepContext stepCtx;

public final String TOP_LEVEL_PUD = "This is the Persistent User Data for the top-level stepCtx of the partitioned step!";

@Override
public void beginPartitionedStep() throws Exception {
stepCtx.setPersistentUserData(TOP_LEVEL_PUD);
}

@Override
public void afterPartitionedStepCompletion(PartitionStatus status) throws Exception {
if (!stepCtx.getPersistentUserData().equals(TOP_LEVEL_PUD)) {
throw new Exception("Unexpected PUD at the top level of the Partitioned Step!");
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import com.ibm.jbatch.tck.ann.*;
import com.ibm.jbatch.tck.utils.JobOperatorBridge;
import com.ibm.jbatch.tck.utils.TCKJobExecutionWrapper;

Expand Down Expand Up @@ -598,6 +599,72 @@ public void testPartitionedMapperOverrideTrueSamePartitionNumOnRestart() throws
}
}

@TCKTest(
versions = {"1.1.WORKING"},
assertions = {"Each partition of a partitioned step has its own unique Persistent User Data."},
specRefs = {
@SpecRef(
version = "1.0RevA", section = "9.4.1.1",
citations = "For a partitioned step, there is one StepContext for the parent step/thread; there is a distinct StepContext for each sub-thread "
+ "and each StepContext has its own distinct persistent user data for each sub-thread.",
notes = "See 2. StepContext"
),
@SpecRef(
version = "1.0", section = "10.9.2",
citations = "The setPersistentUserData method stores a persistent data object into the current step. [...] This data is saved as part of a step's "
+ "checkpoint. [...] It is available upon restart.",
notes = "APIRef for StepContext"
)
},
apiRefs = { @APIRef(className="javax.batch.runtime.context.StepContext", methodNames={"setPersistentUserData", "getPersistentUserData"}) },
issueRefs = {"https://github.com/WASdev/standards.jsr352.tck/issues/18"},
strategy = "See comments in the code for this test"
)
@Test
@org.junit.Test
public void testPartitionedStepPersistentUserData() throws Exception {
String METHOD = "testPartitionedStepPersistentUserData";
begin(METHOD);

try {
/* Test Strategy:
* - Job is made up of one chunk step, split into two partitions
* - Each partition will process 3 items
* - The item-count for the step is set to 1, so each partition should checkpoint after each
* item it reads.
*
* - During Job Execution #1, set the PUD for each partition while reading item 1
* fail on purpose while reading item 2 (now that the partitions have already check-pointed)
* - During Job Execution #2, check that the PUD has been persisted from previous job execution
* - We also check that the partition level PUDs do not bubble up into the top-level of the step
* by setting and verifying a top-level PUD with the PartitionReducer
*/

Properties jobParams = new Properties();

//Job Execution #1
jobParams.setProperty("execution.number", "1");
jobParams.setProperty("number.of.items.to.be.read", "3");
jobParams.setProperty("throw.reader.exception.for.these.items", "1"); //The second item

Reporter.log("Locate job XML file: partitioned_step_persistent_user_data.xml<p>");
Reporter.log("Invoke startJobAndWaitForResult<p>");
JobExecution jobExec1 = jobOp.startJobAndWaitForResult("partitioned_step_persistent_user_data", jobParams);
assertWithMessage("Expected job execution 1 to fail", BatchStatus.FAILED, jobExec1.getBatchStatus());

//Job Execution #2
jobParams.setProperty("execution.number", "2");
jobParams.setProperty("throw.reader.exception.for.these.items", "");

Reporter.log("Invoke restartJobAndWaitForResult<p>");
JobExecution jobExec2 = jobOp.restartJobAndWaitForResult(jobExec1.getExecutionId(), jobParams);
assertWithMessage("Expected job execution 2 to complete", BatchStatus.COMPLETED, jobExec2.getBatchStatus());

} catch (Exception e) {
handleException(METHOD, e);
}
}

private static void handleException(String methodName, Exception e) throws Exception {
Reporter.log("Caught exception: " + e.getMessage()+"<p>");
Reporter.log(methodName + " failed<p>");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- Copyright 2016 International Business Machines Corp. See the NOTICE file distributed with this work
for additional information regarding copyright ownership. Licensed under the Apache License, Version
2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain
a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law
or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language
governing permissions and limitations under the License. -->
<job id="partitioned_step_persistent_user_data" xmlns="http://xmlns.jcp.org/xml/ns/javaee" version="1.0">
<step id="step1">
<chunk item-count="1">
<reader ref="PUDPartitionReader">
<properties>
<property name="execution.number" value="#{jobParameters['execution.number']}"/>
<property name="number.of.items.to.be.read" value="#{jobParameters['number.of.items.to.be.read']}"/>
<property name="throw.reader.exception.for.these.items" value="#{jobParameters['throw.reader.exception.for.these.items']}"/>
<property name="partition.number" value="#{partitionPlan['partition.number']}"/>
</properties>
</reader>
<processor ref="basicProcessor"/>
<writer ref="basicWriter"/>
</chunk>
<partition>
<plan partitions="2">
<properties partition="0">
<property name="partition.number" value="0"/>
</properties>
<properties partition="1">
<property name="partition.number" value="1"/>
</properties>
</plan>
<reducer ref="PUDPartitionReducer"/>
</partition>
</step>
</job>
2 changes: 2 additions & 0 deletions com.ibm.jbatch.tck/src/main/resources/META-INF/batch.xml
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@
<ref id="numbersSkipWriteListener" class="com.ibm.jbatch.tck.artifacts.specialized.NumbersSkipWriteListener" />
<ref id="overrideOnAttributeValuesUponRestartBatchlet" class="com.ibm.jbatch.tck.artifacts.specialized.OverrideOnAttributeValuesUponRestartBatchlet" />
<ref id="parsingPartitionAnalyzer" class="com.ibm.jbatch.tck.artifacts.specialized.ParsingPartitionAnalyzer" />
<ref id="PUDPartitionReader" class="com.ibm.jbatch.tck.artifacts.specialized.PUDPartitionReader"/>
<ref id="PUDPartitionReducer" class="com.ibm.jbatch.tck.artifacts.specialized.PUDPartitionReducer"/>
<ref id="skipProcessor" class="com.ibm.jbatch.tck.artifacts.specialized.SkipProcessor" />
<ref id="skipReader" class="com.ibm.jbatch.tck.artifacts.specialized.SkipReader" />
<ref id="skipReaderMultipleExceptions" class="com.ibm.jbatch.tck.artifacts.specialized.SkipReaderMultipleExceptions" />
Expand Down

0 comments on commit dbed125

Please sign in to comment.