Skip to content

Commit

Permalink
WARCSpout loads inputs using HDFS (#1122)
Browse files Browse the repository at this point in the history
* WARCSpout read inputs from HDFS FS, fix #1120

Signed-off-by: Julien Nioche <[email protected]>

* Added config info to README

Signed-off-by: Julien Nioche <[email protected]>

---------

Signed-off-by: Julien Nioche <[email protected]>
  • Loading branch information
jnioche authored Nov 17, 2023
1 parent 2eaa33d commit 0a8afbf
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 11 deletions.
27 changes: 26 additions & 1 deletion external/warc/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,32 @@ The WARCSpout is configured similar as FileSpout:
- input files are defined by
- read from the configured folder (available as local file system path)
- a pattern matching valid file names
- every line in the input files specifies one input WARC file as file path or URL
- every line in the input files specifies one input WARC file as file path or URL.

The non-http input paths are loaded via HDFS, which can be configured using the key `hdfs` e.g.

```
hdfs:
fs.s3a.access.key: ${awsAccessKeyId}
fs.s3a.secret.key: ${awsSecretAccessKey}
```

Please note that in order to access WARC files on AWS S3, you will need to add the following dependency to your project

```
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<version>2.10.1</version>
</dependency>
```

where the version should match the one used by Apache Storm. In doubt, you can check with

```
mvn dependency:tree | grep "org.apache.hadoop:hadoop-hdfs:jar"
```


To use the WARCSpout reading `*.paths` or `*.txt` files from the folder `input/`, you simply start to build your topology as

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,14 @@
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.nio.channels.ReadableByteChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import org.apache.hadoop.conf.Configuration;
import org.apache.storm.metric.api.MultiCountMetric;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
Expand Down Expand Up @@ -75,6 +74,8 @@ public class WARCSpout extends FileSpout {

private MultiCountMetric eventCounter;

protected transient Configuration hdfsConfig;

public WARCSpout(String... files) {
super(false, files);
}
Expand Down Expand Up @@ -140,13 +141,13 @@ private void openWARC() {
}
}

private static ReadableByteChannel openChannel(String path) throws IOException {
private ReadableByteChannel openChannel(String path) throws IOException {
if (path.matches("^https?://.*")) {
URL warcUrl = new URL(path);
return Channels.newChannel(warcUrl.openStream());
} else {
return FileChannel.open(Paths.get(path));
}
org.apache.hadoop.fs.Path hdfsPath = new org.apache.hadoop.fs.Path(path);
return Channels.newChannel(hdfsPath.getFileSystem(hdfsConfig).open(hdfsPath));
}

private void closeWARC() {
Expand Down Expand Up @@ -289,9 +290,8 @@ private byte[] getContent(WarcResponse record, TruncationStatus isTruncated)
} catch (ParsingException e) {
LOG.error("Failed to read chunked content of {}: {}", record.target(), e);
/*
* caused by an invalid Transfer-Encoding or a HTTP header
* `Transfer-Encoding: chunked` removed although the
* Transfer-Encoding was removed in the WARC file
* caused by an invalid Transfer-Encoding or a HTTP header `Transfer-Encoding:
* chunked` removed although the Transfer-Encoding was removed in the WARC file
*/
// TODO: should retry without chunked Transfer-Encoding
break;
Expand Down Expand Up @@ -396,6 +396,17 @@ record = Optional.empty();
eventCounter =
context.registerMetric(
"warc_spout_counter", new MultiCountMetric(), metricsTimeBucketSecs);

hdfsConfig = new Configuration();

String configKey = ConfUtils.getString(conf, "hdfs.config.key", "hdfs");

Map<String, Object> map = (Map<String, Object>) conf.get(configKey);
if (map != null) {
for (String key : map.keySet()) {
this.hdfsConfig.set(key, String.valueOf(map.get(key)));
}
}
}

@Override
Expand Down Expand Up @@ -501,8 +512,8 @@ public void nextTuple() {
long offset = warcReader.position();
metadata.addValue("warc.record.offset", Long.toString(offset));
/*
* note: warc.record.length must be calculated after WARC record has
* been entirely processed
* note: warc.record.length must be calculated after WARC record has been
* entirely processed
*/

if (status == Status.FETCHED && http.status() != 304) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/**
* Licensed to DigitalPebble Ltd under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership.
* DigitalPebble licenses this file to You under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. You may obtain a copy of the
* License at
*
* <p>http://www.apache.org/licenses/LICENSE-2.0
*
* <p>Unless required by applicable law or agreed to in writing, software distributed under the
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.digitalpebble.stormcrawler.warc;

import com.digitalpebble.stormcrawler.TestOutputCollector;
import com.digitalpebble.stormcrawler.TestUtil;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.storm.spout.SpoutOutputCollector;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class WARCSpoutTest {

private TestOutputCollector output;
private WARCSpout spout;
private Map<String, Object> conf;

@Before
public void setup() throws IOException {
output = new TestOutputCollector();

// pass it as input to the spout
java.io.File refInputFile = new java.io.File("src/test/resources/warc.inputs");

Map<String, Object> hdfsConf = new HashMap<>();
hdfsConf.put("fs.file.impl", "org.apache.hadoop.fs.RawLocalFileSystem");
conf = new HashMap<String, Object>();
conf.put("hdfs", hdfsConf);

spout = new WARCSpout(refInputFile.getAbsolutePath());
spout.open(conf, TestUtil.getMockedTopologyContext(), new SpoutOutputCollector(output));
spout.activate();
}

@After
public void cleanup() {
spout.close();
output = null;
}

@Test
public void test() throws IOException {
int expected = 17;
while (output.getEmitted().size() < expected) {
spout.nextTuple();
}
}
}
Binary file added external/warc/src/test/resources/test.warc
Binary file not shown.
1 change: 1 addition & 0 deletions external/warc/src/test/resources/warc.inputs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
src/test/resources/test.warc

0 comments on commit 0a8afbf

Please sign in to comment.