diff --git a/core/src/main/java/com/digitalpebble/stormcrawler/spout/FileSpout.java b/core/src/main/java/com/digitalpebble/stormcrawler/spout/FileSpout.java index 3b640a057..3ae49c364 100644 --- a/core/src/main/java/com/digitalpebble/stormcrawler/spout/FileSpout.java +++ b/core/src/main/java/com/digitalpebble/stormcrawler/spout/FileSpout.java @@ -58,6 +58,8 @@ public class FileSpout extends BaseRichSpout { protected LinkedList buffer = new LinkedList<>(); protected boolean active; private boolean withDiscoveredStatus = false; + protected int totalTasks; + protected int taskIndex; /** * @param dir containing the seed files @@ -139,7 +141,17 @@ protected void populateBuffer() throws IOException { while (linesRead < BATCH_SIZE && (line = currentBuffer.readLine()) != null) { if (StringUtils.isBlank(line)) continue; if (line.startsWith("#")) continue; - buffer.add(line.trim().getBytes(StandardCharsets.UTF_8)); + // check whether this entry should be skipped? + // totalTasks could be at 0 if a subclass forgot to + // call this classe's open() + if (totalTasks == 0 || linesRead % totalTasks == taskIndex) { + LOG.debug( + "Adding to buffer for spout {} -> line ({}) {}", + taskIndex, + linesRead, + line); + buffer.add(line.trim().getBytes(StandardCharsets.UTF_8)); + } linesRead++; } @@ -154,11 +166,11 @@ protected void populateBuffer() throws IOException { public void open( Map conf, TopologyContext context, SpoutOutputCollector collector) { _collector = collector; - try { - populateBuffer(); - } catch (IOException e) { - throw new RuntimeException(e); - } + + // if more than one instance is used we expect their number to be the + // same as the number of shards + totalTasks = context.getComponentTasks(context.getThisComponentId()).size(); + taskIndex = context.getThisTaskIndex(); } @Override diff --git a/external/warc/src/main/java/com/digitalpebble/stormcrawler/warc/WARCSpout.java b/external/warc/src/main/java/com/digitalpebble/stormcrawler/warc/WARCSpout.java index aaff0a835..add70294f 100644 --- a/external/warc/src/main/java/com/digitalpebble/stormcrawler/warc/WARCSpout.java +++ b/external/warc/src/main/java/com/digitalpebble/stormcrawler/warc/WARCSpout.java @@ -375,7 +375,9 @@ private void addVerbatimHttpHeaders( @Override public void open( Map conf, TopologyContext context, SpoutOutputCollector collector) { - _collector = collector; + + super.open(conf, context, collector); + record = Optional.empty(); maxContentSize = ConfUtils.getInt(conf, "http.content.limit", -1);