diff --git a/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/data/MongoItemWriter.java b/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/data/MongoItemWriter.java index b7aa27f375..27f9507e73 100644 --- a/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/data/MongoItemWriter.java +++ b/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/data/MongoItemWriter.java @@ -16,6 +16,10 @@ package org.springframework.batch.item.data; +import static java.util.stream.Collectors.*; + +import java.util.List; + import org.bson.Document; import org.bson.types.ObjectId; @@ -92,6 +96,8 @@ public enum Mode { private Mode mode = Mode.UPSERT; + private List primaryKeys = List.of(ID_KEY); + public MongoItemWriter() { super(); this.bufferKey = new Object(); @@ -163,6 +169,22 @@ public String getCollection() { return collection; } + /** + * Set the primary keys to associate with the document being written. These fields + * should uniquely identify a single object. + * @param primaryKeys The primary keys to use. + * @since 5.2 + */ + public void setPrimaryKeys(List primaryKeys) { + Assert.notEmpty(primaryKeys, "The primaryKeys list must have one or more keys."); + + this.primaryKeys = primaryKeys; + } + + public List getPrimaryKeys() { + return primaryKeys; + } + /** * If a transaction is active, buffer items to be written just before commit. * Otherwise write items using the provided template. @@ -213,9 +235,14 @@ private void remove(Chunk chunk) { for (Object item : chunk) { Document document = new Document(); mongoConverter.write(item, document); - Object objectId = document.get(ID_KEY); - if (objectId != null) { - Query query = new Query().addCriteria(Criteria.where(ID_KEY).is(objectId)); + + List criteriaList = primaryKeys.stream() + .filter(document::containsKey) + .map(key -> Criteria.where(key).is(document.get(key))) + .collect(toList()); + if (!criteriaList.isEmpty()) { + Query query = new Query(); + criteriaList.forEach(query::addCriteria); bulkOperations.remove(query); } } @@ -229,8 +256,21 @@ private void upsert(Chunk chunk) { for (Object item : chunk) { Document document = new Document(); mongoConverter.write(item, document); - Object objectId = document.get(ID_KEY) != null ? document.get(ID_KEY) : new ObjectId(); - Query query = new Query().addCriteria(Criteria.where(ID_KEY).is(objectId)); + + Query query = new Query(); + List criteriaList = primaryKeys.stream() + .filter(document::containsKey) + .map(key -> Criteria.where(key).is(document.get(key))) + .collect(toList()); + + if (criteriaList.isEmpty()) { + Object objectId = document.get(ID_KEY) != null ? document.get(ID_KEY) : new ObjectId(); + query.addCriteria(Criteria.where(ID_KEY).is(objectId)); + } + else { + criteriaList.forEach(query::addCriteria); + } + bulkOperations.replaceOne(query, document, upsert); } bulkOperations.execute(); diff --git a/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/data/builder/MongoItemWriterBuilder.java b/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/data/builder/MongoItemWriterBuilder.java index 4df60a7d4c..3fb0305c25 100644 --- a/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/data/builder/MongoItemWriterBuilder.java +++ b/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/data/builder/MongoItemWriterBuilder.java @@ -16,6 +16,8 @@ package org.springframework.batch.item.data.builder; +import java.util.List; + import org.springframework.batch.item.data.MongoItemWriter; import org.springframework.batch.item.data.MongoItemWriter.Mode; import org.springframework.data.mongodb.core.MongoOperations; @@ -37,6 +39,8 @@ public class MongoItemWriterBuilder { private Mode mode = Mode.UPSERT; + private List primaryKeys = List.of(); + /** * Indicates if the items being passed to the writer are to be saved or removed from * the data store. If set to false (default), the items will be saved. If set to true, @@ -93,6 +97,32 @@ public MongoItemWriterBuilder collection(String collection) { return this; } + /** + * Set the primary keys to associate with the document being written. These fields + * should uniquely identify a single object. + * @param primaryKeys The keys to use. + * @see MongoItemWriter#setPrimaryKeys(List) + * @since 5.2 + */ + public MongoItemWriterBuilder primaryKeys(List primaryKeys) { + this.primaryKeys = List.copyOf(primaryKeys); + + return this; + } + + /** + * Set the primary keys to associate with the document being written. These fields + * should uniquely identify a single object. + * @param primaryKeys The keys to use. + * @see MongoItemWriter#setPrimaryKeys(List) + * @since 5.2 + */ + public MongoItemWriterBuilder primaryKeys(String... primaryKeys) { + this.primaryKeys = List.of(primaryKeys); + + return this; + } + /** * Validates and builds a {@link MongoItemWriter}. * @return a {@link MongoItemWriter} @@ -105,6 +135,10 @@ public MongoItemWriter build() { writer.setMode(this.mode); writer.setCollection(this.collection); + if (!this.primaryKeys.isEmpty()) { + writer.setPrimaryKeys(this.primaryKeys); + } + return writer; }