Skip to content
This repository has been archived by the owner on Dec 11, 2019. It is now read-only.

Commit

Permalink
Modified topology to enable proper resuming and a fixed set of IDs to…
Browse files Browse the repository at this point in the history
… work against
  • Loading branch information
ymamakis committed Dec 15, 2015
1 parent 32da106 commit 48d8aa6
Show file tree
Hide file tree
Showing 15 changed files with 332 additions and 192 deletions.
2 changes: 1 addition & 1 deletion reindexing/enrichment-bolt/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
<dependency>
<groupId>eu.europeana.corelib</groupId>
<artifactId>corelib-storage</artifactId>
<version>2.1-SNAPSHOT</version>
<version>2.2-SNAPSHOT</version>
</dependency>
<dependency>
<version>1.0-SNAPSHOT</version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,14 @@
*/
package eu.europeana.reindexing.enrichment;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.zip.GZIPOutputStream;

import org.apache.commons.codec.binary.Base64;
import org.codehaus.jackson.map.ObjectMapper;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;

import com.mongodb.MongoClient;
import com.mongodb.MongoCredential;
import com.mongodb.Mongo;
import com.mongodb.ServerAddress;

import eu.europeana.corelib.edm.exceptions.MongoDBException;
import eu.europeana.corelib.mongo.server.impl.EdmMongoServerImpl;
import eu.europeana.corelib.solr.bean.impl.FullBeanImpl;
Expand All @@ -40,6 +23,19 @@
import eu.europeana.enrichment.rest.client.EnrichmentDriver;
import eu.europeana.reindexing.common.ReindexingFields;
import eu.europeana.reindexing.common.ReindexingTuple;
import org.apache.commons.codec.binary.Base64;
import org.codehaus.jackson.map.ObjectMapper;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.zip.GZIPOutputStream;

/**
*
Expand Down Expand Up @@ -70,7 +66,7 @@ public EnrichmentBolt(String path, String[] mongoAddresses, String dbName, Strin

@Override
public void declareOutputFields(OutputFieldsDeclarer ofd) {
ofd.declare(new Fields(ReindexingFields.TASKID, ReindexingFields.IDENTIFIER, ReindexingFields.NUMFOUND, ReindexingFields.QUERY, ReindexingFields.ENTITYWRAPPER));
ofd.declare(new Fields(ReindexingFields.TASKID, ReindexingFields.BATCHID,ReindexingFields.IDENTIFIER, ReindexingFields.NUMFOUND, ReindexingFields.QUERY, ReindexingFields.ENTITYWRAPPER));
}

@Override
Expand All @@ -93,10 +89,11 @@ public void prepare(Map map, TopologyContext tc, OutputCollector oc) {

}
}
List<MongoCredential> credentialsList = new ArrayList<MongoCredential>();
MongoCredential credential = MongoCredential.createCredential(dbUser, dbName, dbPassword.toCharArray());
credentialsList.add(credential);
MongoClient mongo = new MongoClient(addresses, credentialsList);
// List<MongoCredential> credentialsList = new ArrayList<MongoCredential>();
// MongoCredential credential = MongoCredential.createCredential(dbUser, dbName, dbPassword.toCharArray());
// credentialsList.add(credential);
// MongoClient mongo = new MongoClient(addresses, credentialsList);
Mongo mongo = new Mongo(addresses);
mongoServer = new EdmMongoServerImpl(mongo, dbName, null, null);
} catch (MongoDBException ex) {
Logger.getLogger(EnrichmentBolt.class.getName()).log(Level.SEVERE, null, ex);
Expand Down Expand Up @@ -130,7 +127,7 @@ public void execute(Tuple tuple) {

Logger.getGlobal().log( Level.INFO, "*** Converting to String for enriched " + fBean.getAbout()
+ " took " + (new Date().getTime() - startConvert) + " ms ***");
collector.emit(new ReindexingTuple(task.getTaskId(), task .getIdentifier(), task.getNumFound(), task.getQuery(), enrichments).toTuple());
collector.emit(new ReindexingTuple(task.getTaskId(), task.getBatchId(), task .getIdentifier(), task.getNumFound(), task.getQuery(), enrichments).toTuple());
} catch (IOException ex) {
Logger.getLogger(EnrichmentBolt.class.getName()).log(Level.SEVERE, null, ex);
}
Expand Down
7 changes: 7 additions & 0 deletions reindexing/enrichment-topology/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@
<dependencies>
<dependency>
<version>1.0-SNAPSHOT</version>
<exclusions>
<exclusion>
<artifactId>jsr311-api</artifactId>
<groupId>javax.ws.rs</groupId>
</exclusion>
</exclusions>
<groupId>eu.europeana.reindexing</groupId>
<artifactId>enrichment-bolt</artifactId>
</dependency>
Expand All @@ -31,6 +37,7 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>

</dependencies>
<build>
<plugins>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,31 +4,31 @@

### Ingestion Storage - target###
ingestion.solr.host=http://sol1.ingest.eanadev.org:9191/solr,http://sol2.ingest.eanadev.org:9191/solr,http://sol3.ingest.eanadev.org:9191/solr
ingestion.solr.collection=search_1
ingestion.solr.collection=search_test
#ingestion.mongo.host=78.109.62.163,78.109.62.164,78.109.62.165
ingestion.mongo.host=85.10.194.87,85.10.194.178,136.243.13.206
ingestion.mongo.database=europeana
ingestion.mongo.user=mdfsmklmkldsfdsmkl
ingestion.mongo.password=nfjds32kn$%&Njk124sdfnks
ingestion.mongo.host=176.9.7.91,176.9.7.182,148.251.183.82,78.46.60.203
ingestion.mongo.database=europeana_test
ingestion.mongo.user=
ingestion.mongo.password=
ingestion.enrichment.restendpoint=http://144.76.50.251:8080/enrichment-framework-rest-0.1-SNAPSHOT/enrich/
ingestion.zookeeper.host=sol1.ingest.eanadev.org:2181

### Production Storage - source and target ###
production.solr.host = http://sol1.eanadev.org:9191/solr,http://sol2.eanadev.org:9191/solr,http://sol3.eanadev.org:9191/solr,http://sol4.eanadev.org:9191/solr,http://sol4.eanadev.org:9191/solr,http://sol5.eanadev.org:9191/solr,http://sol6.eaandev.org:9191/solr
production.solr.collection=search_1
production.solr.collection=search_test
#production.mongo.host=78.109.62.166, 78.109.62.167, 78.109.62.168
production.mongo.host=136.243.3.92,148.251.133.84,136.243.21.27,213.133.98.198,85.10.194.84,85.10.194.82
production.mongo.database=europeana
production.mongo.user=nvvncmnmewnfjkdnv
production.mongo.password=dsjaklHFNDnfd3912sndc
production.mongo.host=176.9.7.91,176.9.7.182,148.251.183.82,78.46.60.203
production.mongo.database=europeana_test
production.mongo.user=
production.mongo.password=
production.enrichment.restendpoint=http://144.76.50.251:8080/enrichment-framework-rest-0.1-SNAPSHOT/enrich/
production.zookeeper.host=sol1.eanadev.org:2181

### Task Reports Storage ###
taskreport.solr.host=http://sol1.eanadev.org:9191/solr,http://sol2.eanadev.org:9191/solr,http://sol3.eanadev.org:9191/solr,http://sol4.eanadev.org:9191/solr,http://sol4.eanadev.org:9191/solr,http://sol5.eanadev.org:9191/solr,http://sol6.eaandev.org:9191/solr
taskreport.solr.collection=search_1
taskreport.solr.collection=search_test
taskreport.mongo.host=176.9.7.91,176.9.7.182,148.251.183.82,78.46.60.203
taskreport.mongo.database=
taskreport.mongo.database= task_report_test
taskreport.mongo.user=
taskreport.mongo.password=
taskreport.enrichment.restendpoint=http://144.76.50.251:8080/enrichment-framework-rest-0.1-SNAPSHOT/enrich/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ private List<ReindexingTuple> prepareTuples() {
long numFound = results.getNumFound();
//task id is mocked
for (SolrDocument solrDoc : results) {
tuples.add(new ReindexingTuple(System.currentTimeMillis(), solrDoc.getFieldValue("europeana_id").toString(), numFound, query, null));
tuples.add(new ReindexingTuple(System.currentTimeMillis(), 0,solrDoc.getFieldValue("europeana_id").toString(), numFound, query, null));
}
} catch (SolrServerException | MalformedURLException | UnknownHostException ex) {
Logger.getLogger(EnrichmentTopologyTest.class.getName()).log(Level.SEVERE, null, ex);
Expand Down
2 changes: 1 addition & 1 deletion reindexing/recordread/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
<dependency>
<groupId>eu.europeana.corelib</groupId>
<artifactId>corelib-storage</artifactId>
<version>2.1-SNAPSHOT</version>
<version>2.2-SNAPSHOT</version>
</dependency>
</dependencies>
</project>
Loading

0 comments on commit 48d8aa6

Please sign in to comment.