From 01afe389981fb9bd86c6ad32adc22a3b0dda4395 Mon Sep 17 00:00:00 2001 From: blake_bauman Date: Mon, 21 Apr 2025 14:53:05 -0700 Subject: [PATCH] Allow keys other than _id for primary key in MongoItemWriter When moving data between different data store types, such as between MongoDB and a SQL database, not all systems have a "_id" as their pimary key. Most often it has a different name or can be multiple keys/columns used as a composite key. This change allows the app to specify an alternate key or a set of keys which, together, can be used as the primary key for a MongoDB insert/update/upsert. Signed-off-by: blake_bauman --- .../batch/item/data/MongoItemWriter.java | 50 +++++++++++++++++-- .../data/builder/MongoItemWriterBuilder.java | 34 +++++++++++++ 2 files changed, 79 insertions(+), 5 deletions(-) diff --git a/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/data/MongoItemWriter.java b/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/data/MongoItemWriter.java index b7aa27f375..27f9507e73 100644 --- a/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/data/MongoItemWriter.java +++ b/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/data/MongoItemWriter.java @@ -16,6 +16,10 @@ package org.springframework.batch.item.data; +import static java.util.stream.Collectors.*; + +import java.util.List; + import org.bson.Document; import org.bson.types.ObjectId; @@ -92,6 +96,8 @@ public enum Mode { private Mode mode = Mode.UPSERT; + private List primaryKeys = List.of(ID_KEY); + public MongoItemWriter() { super(); this.bufferKey = new Object(); @@ -163,6 +169,22 @@ public String getCollection() { return collection; } + /** + * Set the primary keys to associate with the document being written. These fields + * should uniquely identify a single object. + * @param primaryKeys The primary keys to use. + * @since 5.2 + */ + public void setPrimaryKeys(List primaryKeys) { + Assert.notEmpty(primaryKeys, "The primaryKeys list must have one or more keys."); + + this.primaryKeys = primaryKeys; + } + + public List getPrimaryKeys() { + return primaryKeys; + } + /** * If a transaction is active, buffer items to be written just before commit. * Otherwise write items using the provided template. @@ -213,9 +235,14 @@ private void remove(Chunk chunk) { for (Object item : chunk) { Document document = new Document(); mongoConverter.write(item, document); - Object objectId = document.get(ID_KEY); - if (objectId != null) { - Query query = new Query().addCriteria(Criteria.where(ID_KEY).is(objectId)); + + List criteriaList = primaryKeys.stream() + .filter(document::containsKey) + .map(key -> Criteria.where(key).is(document.get(key))) + .collect(toList()); + if (!criteriaList.isEmpty()) { + Query query = new Query(); + criteriaList.forEach(query::addCriteria); bulkOperations.remove(query); } } @@ -229,8 +256,21 @@ private void upsert(Chunk chunk) { for (Object item : chunk) { Document document = new Document(); mongoConverter.write(item, document); - Object objectId = document.get(ID_KEY) != null ? document.get(ID_KEY) : new ObjectId(); - Query query = new Query().addCriteria(Criteria.where(ID_KEY).is(objectId)); + + Query query = new Query(); + List criteriaList = primaryKeys.stream() + .filter(document::containsKey) + .map(key -> Criteria.where(key).is(document.get(key))) + .collect(toList()); + + if (criteriaList.isEmpty()) { + Object objectId = document.get(ID_KEY) != null ? document.get(ID_KEY) : new ObjectId(); + query.addCriteria(Criteria.where(ID_KEY).is(objectId)); + } + else { + criteriaList.forEach(query::addCriteria); + } + bulkOperations.replaceOne(query, document, upsert); } bulkOperations.execute(); diff --git a/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/data/builder/MongoItemWriterBuilder.java b/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/data/builder/MongoItemWriterBuilder.java index 4df60a7d4c..3fb0305c25 100644 --- a/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/data/builder/MongoItemWriterBuilder.java +++ b/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/data/builder/MongoItemWriterBuilder.java @@ -16,6 +16,8 @@ package org.springframework.batch.item.data.builder; +import java.util.List; + import org.springframework.batch.item.data.MongoItemWriter; import org.springframework.batch.item.data.MongoItemWriter.Mode; import org.springframework.data.mongodb.core.MongoOperations; @@ -37,6 +39,8 @@ public class MongoItemWriterBuilder { private Mode mode = Mode.UPSERT; + private List primaryKeys = List.of(); + /** * Indicates if the items being passed to the writer are to be saved or removed from * the data store. If set to false (default), the items will be saved. If set to true, @@ -93,6 +97,32 @@ public MongoItemWriterBuilder collection(String collection) { return this; } + /** + * Set the primary keys to associate with the document being written. These fields + * should uniquely identify a single object. + * @param primaryKeys The keys to use. + * @see MongoItemWriter#setPrimaryKeys(List) + * @since 5.2 + */ + public MongoItemWriterBuilder primaryKeys(List primaryKeys) { + this.primaryKeys = List.copyOf(primaryKeys); + + return this; + } + + /** + * Set the primary keys to associate with the document being written. These fields + * should uniquely identify a single object. + * @param primaryKeys The keys to use. + * @see MongoItemWriter#setPrimaryKeys(List) + * @since 5.2 + */ + public MongoItemWriterBuilder primaryKeys(String... primaryKeys) { + this.primaryKeys = List.of(primaryKeys); + + return this; + } + /** * Validates and builds a {@link MongoItemWriter}. * @return a {@link MongoItemWriter} @@ -105,6 +135,10 @@ public MongoItemWriter build() { writer.setMode(this.mode); writer.setCollection(this.collection); + if (!this.primaryKeys.isEmpty()) { + writer.setPrimaryKeys(this.primaryKeys); + } + return writer; }