diff --git a/README.md b/README.md index e340fb9..fbe6879 100644 --- a/README.md +++ b/README.md @@ -5,6 +5,7 @@ Welcome to the Solr Mongo Importer project. This project provides MongoDb suppor * Retrive data from a MongoDb collection * Authenticate using MongoDb authentication * Map Mongo fields to Solr fields +* Delta import available ## Classes @@ -17,6 +18,8 @@ Welcome to the Solr Mongo Importer project. This project provides MongoDb suppor * MongoEntityProcessor - Use with the MongoDataSource to query a MongoDb collection * collection (**required**) * query (**required**) + * deltaQuery (*optional*) + * deltaImportQuery (*optional*) * MongoMapperTransformer - Map MongoDb fields to your Solr schema * mongoField (**required**) @@ -46,6 +49,8 @@ Here is a sample data-config.xml showing the use of all components query="{'Active':1}" collection="ProductData" datasource="MyMongo" + deltaQuery="{'UpdateDate':{$gt:{$date:'${dih.last_index_time}'}}}" + deltaImportQuery="{'_id':'${dih.delta._id}'}" transformer="MongoMapperTransformer" > diff --git a/src/main/org/apache/solr/handler/dataimport/MongoEntityProcessor.java b/src/main/org/apache/solr/handler/dataimport/MongoEntityProcessor.java index 91b6bce..5830fd8 100644 --- a/src/main/org/apache/solr/handler/dataimport/MongoEntityProcessor.java +++ b/src/main/org/apache/solr/handler/dataimport/MongoEntityProcessor.java @@ -3,7 +3,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Iterator; import java.util.Map; import static org.apache.solr.handler.dataimport.DataImportHandlerException.SEVERE; @@ -49,13 +48,72 @@ protected void initQuery(String q) { @Override public Map nextRow() { - String query = context.getEntityAttribute( QUERY ); if (rowIterator == null) { + String query = this.getQuery(); initQuery(context.replaceTokens(query)); + + } + return getNext(); + } + + @Override + public Map nextModifiedRowKey() { + if (rowIterator == null) { + String deltaQuery = context.getEntityAttribute(DELTA_QUERY); + if(deltaQuery == null) + return null; + initQuery(context.replaceTokens(deltaQuery)); + } return getNext(); } + @Override + public Map nextDeletedRowKey() { + if (rowIterator == null) { + String deletedPkQuery = context.getEntityAttribute(DEL_PK_QUERY); + if(deletedPkQuery == null) + return null; + initQuery(context.replaceTokens(deletedPkQuery)); + } + return getNext(); + } + + @Override + public Map nextModifiedParentRowKey() { + if(this.rowIterator == null) { + String parentDeltaQuery = this.context.getEntityAttribute("parentDeltaQuery"); + if(parentDeltaQuery == null) { + return null; + } + + LOG.info("Running parentDeltaQuery for Entity: " + this.context.getEntityAttribute("name")); + this.initQuery(this.context.replaceTokens(parentDeltaQuery)); + } + + return this.getNext(); + } + + public String getQuery() { + String queryString = this.context.getEntityAttribute(QUERY); + if("FULL_DUMP".equals(this.context.currentProcess())) { + return queryString; + } else if("DELTA_DUMP".equals(this.context.currentProcess())) { + return this.context.getEntityAttribute(DELTA_IMPORT_QUERY); + + } else { + return null; + } + } + + public static final String QUERY = "query"; + + public static final String DELTA_QUERY = "deltaQuery"; + + public static final String DELTA_IMPORT_QUERY = "deltaImportQuery"; + + public static final String DEL_PK_QUERY = "deletedPkQuery"; + public static final String COLLECTION = "collection"; }