diff --git a/pom.xml b/pom.xml
index c19f1d2..f529d6c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1,396 +1,411 @@
4.0.0
org.warcbase
warcbase
jar
0.1.0-SNAPSHOT
warcbase
WARC + HBase
http://warcbase.org/
The Apache Software License, Version 2.0
http://www.apache.org/licenses/LICENSE-2.0.txt
repo
scm:git:git@github.com:lintool/warcbase.git
scm:git:git@github.com:lintool/warcbase.git
git@github.com:lintool/warcbase.git
lintool
Jimmy Lin
jimmylin@umd.edu
milad621
Milad Gholami
mgholami@cs.umd.edu
jeffyRao
Jinfeng Rao
jinfeng@cs.umd.edu
org.sonatype.oss
oss-parent
7
UTF-8
UTF-8
8.1.12.v20130726
2.0.0-cdh4.4.0
0.94.6-cdh4.4.0
3.4.5-cdh4.4.0
1.0.0
maven-assembly-plugin
2.2.1
hadoop-job.xml
org.warcbase.data.ExtractLinks
make-assembly
package
single
org.apache.maven.plugins
maven-shade-plugin
2.1
package
shade
*:*
META-INF/*.SF
META-INF/*.DSA
META-INF/*.RSA
true
fatjar
org.apache.hadoop:*
org.codehaus.mojo
appassembler-maven-plugin
1.3.1
-Xms512M -Xmx1524M
org.warcbase.data.UriMappingBuilder
UriMappingBuilder
org.warcbase.data.ExtractLinks
ExtractLinks
org.warcbase.ingest.IngestFiles
IngestFiles
org.warcbase.ingest.SearchForUri
SearchForUri
org.warcbase.browser.WarcBrowser
WarcBrowser
org.warcbase.analysis.CountRowTypes
CountRowTypes
org.warcbase.analysis.DetectDuplicates
DetectDuplicates
org.warcbase.analysis.PrintAllUris
PrintAllUris
org.warcbase.analysis.ExtractText
ExtractText
internetarchive
Internet Archive Maven Repository
http://builds.archive.org:8080/maven2
cloudera
https://repository.cloudera.com/artifactory/cloudera-repos/
maven
http://repo.maven.apache.org/maven2/
junit
junit
4.11
test
+
+ commons-codec
+ commons-codec
+ 1.8
+
org.jsoup
jsoup
1.7.3
-
+
org.apache.lucene
lucene-core
4.5.1
com.google.guava
guava
14.0.1
tl.lin
lintools-datatypes
0.9.2
+
+
org.apache.hbase
hbase
${hbase.version}
slf4j-api
slf4j-api
org.slf4j
slf4j-api
org.slf4j
slf4j-api-1.4.3
jsp-api
jsp-api
org.mortbay.jetty
jsp-api
org.mortbay.jetty
jsp-api-2.1
org.mortbay.jetty
servlet-api-2.5
org.mortbay.jetty
servlet-api
servlet-api
servlet-api
org.mortbay.jetty
jetty
org.mortbay.jetty
jetty-util
org.mortbay.jetty
jsp-2.1
org.apache.hadoop
hadoop-common
${hadoop.version2}
javax.servlet
servlet-api
org.mortbay.jetty
jetty
org.mortbay.jetty
jetty-util
org.mortbay.jetty
jsp-2.1
org.apache.hadoop
hadoop-client
${hadoop.version2}
javax.servlet
servlet-api
org.mortbay.jetty
jetty
org.mortbay.jetty
jetty-util
org.mortbay.jetty
jsp-2.1
org.apache.zookeeper
zookeeper
${zookeeper.version}
+
+
+ org.netpreserve.openwayback
+ openwayback-core
+ 2.0.0.BETA.1
+
+
+
+
org.eclipse.jetty
jetty-server
${jettyVersion}
org.eclipse.jetty
jetty-webapp
${jettyVersion}
true
org.slf4j
slf4j-log4j12
1.6.4
-
- org.jwat
- jwat-common
- ${jwat.version}
-
-
- org.jwat
- jwat-gzip
- ${jwat.version}
-
-
- org.jwat
- jwat-arc
- ${jwat.version}
-
-
- org.jwat
- jwat-warc
- ${jwat.version}
-
-
- org.jwat
- jwat-tools
- 0.5.6-SNAPSHOT
-
-
- commons-codec
- commons-codec
- 1.8
-
+
+
+
+ org.jwat
+ jwat-common
+ ${jwat.version}
+
+
+ org.jwat
+ jwat-gzip
+ ${jwat.version}
+
+
+ org.jwat
+ jwat-arc
+ ${jwat.version}
+
+
+ org.jwat
+ jwat-warc
+ ${jwat.version}
+
+
+
+
diff --git a/src/main/java/org/warcbase/data/ExtractLinks.java b/src/main/java/org/warcbase/data/ExtractLinks.java
index 6d2a1d7..036b625 100644
--- a/src/main/java/org/warcbase/data/ExtractLinks.java
+++ b/src/main/java/org/warcbase/data/ExtractLinks.java
@@ -1,248 +1,248 @@
package org.warcbase.data;
import java.io.IOException;
import java.io.InputStream;
import java.io.StringWriter;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.StringTokenizer;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.Logger;
import org.jsoup.HttpStatusException;
import org.jsoup.Jsoup;
import org.jsoup.UnsupportedMimeTypeException;
import org.jsoup.helper.Validate;
import org.jsoup.nodes.Document;
import org.jsoup.nodes.Element;
import org.jsoup.select.Elements;
import org.jwat.arc.ArcRecordBase;
import org.jwat.common.HttpHeader;
import org.warcbase.mapreduce.ArcInputFormat;
-import cern.colt.Arrays;
+import java.util.Arrays;
/**
* Extract Links demo.
*
* @author Jinfeng Rao
* modified code based on CountTrecDocuments.java by Jimmy Lin
*/
public class ExtractLinks extends Configured implements Tool{
private static final Logger LOG = Logger.getLogger(ExtractLinks.class);
private static enum Records { TOTAL, LINK_COUNT };
public static class ExtractLinksMapper extends Mapper{
private IntWritable urlNode = new IntWritable();
private List linkNodes;
private static UriMapping fst;
@Override
public void setup(Context context){
try{
Configuration conf = context.getConfiguration();
Path[] localFiles = DistributedCache.getLocalCacheFiles(conf);
fst = (UriMapping) Class.forName(conf.get("UriMappingClass")).newInstance();
fst.loadMapping(localFiles[0].toString());// simply assume only one file in distributed cache
}catch(Exception e){
e.printStackTrace();
throw new RuntimeException("Error Initializing UriMapping");
}
}
@Override
public void map(LongWritable key, ArcRecordBase record, Context context)
throws IOException, InterruptedException {
context.getCounter(Records.TOTAL).increment(1);
String url = record.getUrlStr();
String type = record.getContentTypeStr();
InputStream content = record.getPayloadContent();
if(!type.equals("text/html"))
return;
Document doc = Jsoup.parse(content, "ISO-8859-1", url); //parse inputstream content in 'ISO-8859-1' charset
Elements links = doc.select("a[href]"); //empty if none match
if (fst.getID(url) != -1){ //the url is already indexed in UriMapping
urlNode.set(fst.getID(url));
linkNodes = new ArrayList();
Set linkUrlSet = new HashSet(); //use set to remove duplicate links
if(links != null){
for (Element link : links) {
String linkUrl = link.attr("abs:href");
if (fst.getID(linkUrl) != -1){ //linkUrl is already indexed
linkUrlSet.add(new IntWritable(fst.getID(linkUrl)));
}
}
boolean emitFlag = false;
for (IntWritable linkID: linkUrlSet){
linkNodes.add(linkID);
emitFlag = true;
context.getCounter(Records.LINK_COUNT).increment(1);
}
if(emitFlag==false){ //contain no links which are indexed in UriMapping
context.getCounter(Records.LINK_COUNT).increment(1);
}
}else{ // webpage without outgoing links
context.getCounter(Records.LINK_COUNT).increment(1);
}
context.write(urlNode, linkNodes);
}
}
}
/*public static class ExtractLinksReducer extends Reducer {
private Text links = new Text();
@Override
public void reduce(IntWritable key, Iterable values, Context context)
throws IOException, InterruptedException {
String linkIds = "";
for (Text link : values) {
linkIds += link.toString()+" ";
context.getCounter(Records.LINK_COUNT).increment(1);
}
links.set(linkIds);
context.write(key, links);
}
}*/
/**
* Creates an instance of this tool.
*/
public ExtractLinks() {}
private static final String INPUT = "input";
private static final String OUTPUT = "output";
private static final String URI_MAPPING = "uriMapping";
private static final String NUM_REDUCERS = "numReducers";
/**
* Runs this tool.
*/
@SuppressWarnings({ "static-access" })
public int run(String[] args) throws Exception {
Options options = new Options();
options.addOption(OptionBuilder.withArgName("path").hasArg()
.withDescription("input path").create(INPUT));
options.addOption(OptionBuilder.withArgName("path").hasArg()
.withDescription("output path").create(OUTPUT));
options.addOption(OptionBuilder.withArgName("path").hasArg()
.withDescription("uri mapping file path").create(URI_MAPPING));
options.addOption(OptionBuilder.withArgName("num").hasArg()
.withDescription("number of reducers").create(NUM_REDUCERS));
CommandLine cmdline;
CommandLineParser parser = new GnuParser();
try {
cmdline = parser.parse(options, args);
} catch (ParseException exp) {
System.err.println("Error parsing command line: "
+ exp.getMessage());
return -1;
}
if (!cmdline.hasOption(INPUT) || !cmdline.hasOption(OUTPUT)
|| !cmdline.hasOption(URI_MAPPING)) {
System.out.println("args: " + Arrays.toString(args));
HelpFormatter formatter = new HelpFormatter();
formatter.setWidth(120);
formatter.printHelp(this.getClass().getName(), options);
ToolRunner.printGenericCommandUsage(System.out);
return -1;
}
String inputPath = cmdline.getOptionValue(INPUT);
String outputPath = cmdline.getOptionValue(OUTPUT);
String mappingPath = cmdline.getOptionValue(URI_MAPPING);
int reduceTasks = cmdline.hasOption(NUM_REDUCERS) ? Integer
.parseInt(cmdline.getOptionValue(NUM_REDUCERS)) : 1;
LOG.info("Tool: " + ExtractLinks.class.getSimpleName());
LOG.info(" - input path: " + inputPath);
LOG.info(" - output path: " + outputPath);
LOG.info(" - mapping file path:" + mappingPath);
LOG.info(" - number of reducers: " + reduceTasks);
Job job = new Job(getConf(),ExtractLinks.class.getSimpleName());
job.setJarByClass(ExtractLinks.class);
// Pass in the class name as a String; this is makes the mapper general in being able to load
// any collection of Indexable objects that has url_id/url mapping specified by a UriMapping
// object.
job.getConfiguration().set("UriMappingClass", UriMapping.class.getCanonicalName());
// Put the mapping file in the distributed cache so each map worker will have it.
DistributedCache.addCacheFile(new URI(mappingPath), job.getConfiguration());
job.setNumReduceTasks(0); // no reducers
FileInputFormat.setInputPaths(job, new Path(inputPath));
FileOutputFormat.setOutputPath(job, new Path(outputPath));
job.setInputFormatClass(ArcInputFormat.class);
//set map (key,value) output format
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(List.class);
job.setMapperClass(ExtractLinksMapper.class);
// Delete the output directory if it exists already.
Path outputDir = new Path(outputPath);
FileSystem.get(job.getConfiguration()).delete(outputDir, true);
long startTime = System.currentTimeMillis();
job.waitForCompletion(true);
LOG.info("Job Finished in " + (System.currentTimeMillis() - startTime)
/ 1000.0 + " seconds");
Counters counters = job.getCounters();
int numRecords = (int) counters.findCounter(Records.TOTAL).getValue();
int numLinks = (int) counters.findCounter(Records.LINK_COUNT).getValue();
LOG.info("Read " + numRecords +" records.");
LOG.info("Extracts "+ numLinks +" links.");
return 0;
}
/**
* Dispatches command-line arguments to the tool via the {@code ToolRunner}.
*/
public static void main(String[] args) throws Exception {
ToolRunner.run(new ExtractLinks(), args);
}
}
\ No newline at end of file