diff --git a/src/main/java/org/warcbase/WarcbaseAdmin.java b/src/main/java/org/warcbase/WarcbaseAdmin.java index 893856d..5e953f0 100644 --- a/src/main/java/org/warcbase/WarcbaseAdmin.java +++ b/src/main/java/org/warcbase/WarcbaseAdmin.java @@ -1,139 +1,155 @@ +/* + * Warcbase: an open-source platform for managing web archives + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.warcbase; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.Option; import org.apache.commons.cli.OptionBuilder; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.util.Bytes; import org.apache.log4j.Level; import org.apache.log4j.Logger; public class WarcbaseAdmin { private static final Logger LOG = Logger.getLogger(WarcbaseAdmin.class); private static final String METATABLE = "warcbase.meta"; private static final String INITIALIZE_OPTION = "initialize"; private static final String FORCE_OPTION = "force"; private static final String HELP_OPTION = "help"; private static final String ADD_OPTION = "addCollection"; private static final String DELETE_OPTION = "deleteCollection"; private static final String DUMP_OPTION = "dump"; @SuppressWarnings("static-access") public static void main(String[] args) throws Exception { Options options = new Options(); options.addOption(new Option(INITIALIZE_OPTION, "initialize metadata table")); options.addOption(new Option(FORCE_OPTION, "force initialization of metadata table")); options.addOption(new Option(HELP_OPTION, "prints help message")); options.addOption(new Option(DUMP_OPTION, "dumps metadata table")); options.addOption(OptionBuilder.withArgName("name").hasArg() .withDescription("add a collection").create(ADD_OPTION)); options.addOption(OptionBuilder.withArgName("name").hasArg() .withDescription("remove a collection").create(DELETE_OPTION)); // ZooKeeper is fairly noisy in logging. Normally, not a big deal, but in this case // gets in the way. Logger.getLogger(org.apache.zookeeper.ZooKeeper.class).setLevel(Level.WARN); Logger.getLogger(org.apache.zookeeper.ClientCnxn.class).setLevel(Level.WARN); Logger.getLogger(org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper.class).setLevel(Level.WARN); Logger.getLogger(org.apache.hadoop.hbase.client.HConnectionManager.class).setLevel(Level.WARN); CommandLine cmdline = null; CommandLineParser parser = new GnuParser(); try { cmdline = parser.parse(options, args); } catch (ParseException exp) { HelpFormatter formatter = new HelpFormatter(); formatter.printHelp(WarcbaseAdmin.class.getName(), options); System.exit(-1); } if (cmdline.hasOption(HELP_OPTION) || cmdline.getOptions().length == 0) { HelpFormatter formatter = new HelpFormatter(); formatter.printHelp(WarcbaseAdmin.class.getName(), options); System.exit(-1); } Configuration hbaseConfig = HBaseConfiguration.create(); HBaseAdmin admin = new HBaseAdmin(hbaseConfig); LOG.info("Successfully created connection to HBase."); if (cmdline.hasOption(INITIALIZE_OPTION)) { if (!admin.tableExists(METATABLE) || cmdline.hasOption(FORCE_OPTION)) { if (admin.tableExists(METATABLE)) { LOG.info(METATABLE + " exists already. Dropping."); admin.disableTable(METATABLE); admin.deleteTable(METATABLE); } HTableDescriptor tableDesc = new HTableDescriptor(METATABLE); HColumnDescriptor hColumnDesc = new HColumnDescriptor("m"); tableDesc.addFamily(hColumnDesc); admin.createTable(tableDesc); LOG.info("Sucessfully created " + METATABLE); } else { LOG.info(METATABLE + " exists already. Doing nothing."); LOG.info("To destory existing " + METATABLE + " and reinitialize, use -" + FORCE_OPTION); } } else if (cmdline.hasOption(ADD_OPTION)) { String name = cmdline.getOptionValue(ADD_OPTION); HTable table = new HTable(hbaseConfig, METATABLE); Get get = new Get(Bytes.toBytes(name)); if (table.get(get).isEmpty()) { Put put = new Put(Bytes.toBytes(name)); put.add(Bytes.toBytes("m"), Bytes.toBytes(name), Bytes.toBytes(System.currentTimeMillis() + "")); table.put(put); LOG.info("Adding collection '" + name + "'"); } else { LOG.info("Error, collection '" + name + "' already exists!"); } table.close(); } else if (cmdline.hasOption(DELETE_OPTION)) { String name = cmdline.getOptionValue(DELETE_OPTION); HTable table = new HTable(hbaseConfig, METATABLE); Get get = new Get(Bytes.toBytes(name)); if (table.get(get).isEmpty()) { LOG.info("Error, collection '" + name + "' doesn't exist!"); } else { Delete delete = new Delete(Bytes.toBytes(name)); table.delete(delete); LOG.info("Deleted collection '" + name + "'"); } table.close(); } else if (cmdline.hasOption(DUMP_OPTION)) { HTable table = new HTable(hbaseConfig, METATABLE); ResultScanner scanner = table.getScanner(Bytes.toBytes("m")); Result result; while ( (result = scanner.next()) != null) { LOG.info("---> collection: " + new String(result.getRow())); for ( KeyValue kv : result.list()) { LOG.info(new String(kv.getFamily()) + ":" + new String(kv.getQualifier()) + ", length=" + kv.getValue().length); } } table.close(); } admin.close(); } } diff --git a/src/main/java/org/warcbase/analysis/FindArcUrls.java b/src/main/java/org/warcbase/analysis/FindArcUrls.java index 8bb0da2..3677c3b 100644 --- a/src/main/java/org/warcbase/analysis/FindArcUrls.java +++ b/src/main/java/org/warcbase/analysis/FindArcUrls.java @@ -1,152 +1,168 @@ +/* + * Warcbase: an open-source platform for managing web archives + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.warcbase.analysis; import java.io.IOException; import java.util.Arrays; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.OptionBuilder; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.log4j.Logger; import org.archive.io.arc.ARCRecordMetaData; import org.warcbase.io.ArcRecordWritable; import org.warcbase.mapreduce.WacArcInputFormat; public class FindArcUrls extends Configured implements Tool { private static final Logger LOG = Logger.getLogger(FindArcUrls.class); private static enum Records { TOTAL }; private static class MyMapper extends Mapper { private static final Text KEY = new Text(); private static final Text VALUE = new Text(); private String pattern = null; @Override public void setup(Context context) { Configuration conf = context.getConfiguration(); pattern = conf.get(PATTERN_OPTION); } @Override public void map(LongWritable key, ArcRecordWritable record, Context context) throws IOException, InterruptedException { context.getCounter(Records.TOTAL).increment(1); ARCRecordMetaData meta = record.getRecord().getMetaData(); String url = meta.getUrl(); String date = meta.getDate(); String type = meta.getMimetype(); if (url.matches(pattern)) { String fileName = ((FileSplit) context.getInputSplit()).getPath().getName(); KEY.set(fileName + " " + url + " " + type); VALUE.set(date); context.write(KEY, VALUE); } } } public FindArcUrls() {} public static final String INPUT_OPTION = "input"; public static final String OUTPUT_OPTION = "output"; public static final String PATTERN_OPTION = "pattern"; /** * Runs this tool. */ @SuppressWarnings("static-access") public int run(String[] args) throws Exception { Options options = new Options(); options.addOption(OptionBuilder.withArgName("path").hasArg() .withDescription("input path").create(INPUT_OPTION)); options.addOption(OptionBuilder.withArgName("path").hasArg() .withDescription("output path").create(OUTPUT_OPTION)); options.addOption(OptionBuilder.withArgName("regexp").hasArg() .withDescription("URL pattern").create(PATTERN_OPTION)); CommandLine cmdline; CommandLineParser parser = new GnuParser(); try { cmdline = parser.parse(options, args); } catch (ParseException exp) { HelpFormatter formatter = new HelpFormatter(); formatter.printHelp(this.getClass().getName(), options); ToolRunner.printGenericCommandUsage(System.out); System.err.println("Error parsing command line: " + exp.getMessage()); return -1; } if (!cmdline.hasOption(INPUT_OPTION) || !cmdline.hasOption(OUTPUT_OPTION)) { HelpFormatter formatter = new HelpFormatter(); formatter.printHelp(this.getClass().getName(), options); ToolRunner.printGenericCommandUsage(System.out); return -1; } String input = cmdline.getOptionValue(INPUT_OPTION); Path output = new Path(cmdline.getOptionValue(OUTPUT_OPTION)); String pattern = cmdline.getOptionValue(PATTERN_OPTION); LOG.info("Tool name: " + FindArcUrls.class.getSimpleName()); LOG.info(" - input: " + input); LOG.info(" - output: " + output); Job job = Job.getInstance(getConf(), FindArcUrls.class.getSimpleName() + ":" + input); job.setJarByClass(FindArcUrls.class); job.setNumReduceTasks(1); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); FileInputFormat.addInputPaths(job, input); FileOutputFormat.setOutputPath(job, output); job.setInputFormatClass(WacArcInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setMapperClass(MyMapper.class); job.getConfiguration().set(PATTERN_OPTION, pattern); FileSystem fs = FileSystem.get(getConf()); if ( FileSystem.get(getConf()).exists(output)) { fs.delete(output, true); } job.waitForCompletion(true); Counters counters = job.getCounters(); int numDocs = (int) counters.findCounter(Records.TOTAL).getValue(); LOG.info("Read " + numDocs + " records."); return 0; } /** * Dispatches command-line arguments to the tool via the ToolRunner. */ public static void main(String[] args) throws Exception { LOG.info("Running " + FindArcUrls.class.getCanonicalName() + " with args " + Arrays.toString(args)); ToolRunner.run(new FindArcUrls(), args); } } diff --git a/src/main/java/org/warcbase/analysis/FindWarcUrls.java b/src/main/java/org/warcbase/analysis/FindWarcUrls.java index f2563d0..630ccea 100644 --- a/src/main/java/org/warcbase/analysis/FindWarcUrls.java +++ b/src/main/java/org/warcbase/analysis/FindWarcUrls.java @@ -1,181 +1,197 @@ +/* + * Warcbase: an open-source platform for managing web archives + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.warcbase.analysis; import java.io.IOException; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.Arrays; import java.util.Date; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.OptionBuilder; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.log4j.Logger; import org.archive.io.ArchiveRecordHeader; import org.archive.util.ArchiveUtils; import org.warcbase.data.WarcRecordUtils; import org.warcbase.io.WarcRecordWritable; import org.warcbase.mapreduce.WacWarcInputFormat; public class FindWarcUrls extends Configured implements Tool { private static final Logger LOG = Logger.getLogger(FindWarcUrls.class); private static final DateFormat ISO8601 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssX"); private static enum Records { TOTAL }; private static class MyMapper extends Mapper { private static final Text KEY = new Text(); private static final Text VALUE = new Text(); private String pattern = null; @Override public void setup(Context context) { Configuration conf = context.getConfiguration(); pattern = conf.get(PATTERN_OPTION); } @Override public void map(LongWritable key, WarcRecordWritable record, Context context) throws IOException, InterruptedException { context.getCounter(Records.TOTAL).increment(1); ArchiveRecordHeader header = record.getRecord().getHeader(); // Only consider response records. if (!header.getHeaderValue("WARC-Type").equals("response")) { return; } String url = header.getUrl(); byte[] content = null; String type = null; Date d = null; String date = null; try { content = WarcRecordUtils.getContent(record.getRecord()); type = WarcRecordUtils.getWarcResponseMimeType(content); d = ISO8601.parse(header.getDate()); date = ArchiveUtils.get14DigitDate(d); } catch (OutOfMemoryError e) { // When we get a corrupt record, this will happen... // Try to recover and move on... LOG.error("Encountered OutOfMemoryError ingesting " + url); LOG.error("Attempting to continue..."); } catch (java.text.ParseException e) { LOG.error("Encountered ParseException ingesting " + url); } if ((url != null) && url.matches(pattern)) { String fileName = ((FileSplit) context.getInputSplit()).getPath().getName(); KEY.set(fileName + " " + url + " " + type); VALUE.set(date); context.write(KEY, VALUE); } } } public FindWarcUrls() {} public static final String INPUT_OPTION = "input"; public static final String OUTPUT_OPTION = "output"; public static final String PATTERN_OPTION = "pattern"; /** * Runs this tool. */ @SuppressWarnings("static-access") public int run(String[] args) throws Exception { Options options = new Options(); options.addOption(OptionBuilder.withArgName("path").hasArg() .withDescription("input path").create(INPUT_OPTION)); options.addOption(OptionBuilder.withArgName("path").hasArg() .withDescription("output path").create(OUTPUT_OPTION)); options.addOption(OptionBuilder.withArgName("regexp").hasArg() .withDescription("URL pattern").create(PATTERN_OPTION)); CommandLine cmdline; CommandLineParser parser = new GnuParser(); try { cmdline = parser.parse(options, args); } catch (ParseException exp) { HelpFormatter formatter = new HelpFormatter(); formatter.printHelp(this.getClass().getName(), options); ToolRunner.printGenericCommandUsage(System.out); System.err.println("Error parsing command line: " + exp.getMessage()); return -1; } if (!cmdline.hasOption(INPUT_OPTION) || !cmdline.hasOption(OUTPUT_OPTION)) { HelpFormatter formatter = new HelpFormatter(); formatter.printHelp(this.getClass().getName(), options); ToolRunner.printGenericCommandUsage(System.out); return -1; } String input = cmdline.getOptionValue(INPUT_OPTION); Path output = new Path(cmdline.getOptionValue(OUTPUT_OPTION)); String pattern = cmdline.getOptionValue(PATTERN_OPTION); LOG.info("Tool name: " + FindWarcUrls.class.getSimpleName()); LOG.info(" - input: " + input); LOG.info(" - output: " + output); Job job = Job.getInstance(getConf(), FindWarcUrls.class.getSimpleName() + ":" + input); job.setJarByClass(FindWarcUrls.class); job.setNumReduceTasks(1); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); FileInputFormat.addInputPaths(job, input); FileOutputFormat.setOutputPath(job, output); job.setInputFormatClass(WacWarcInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setMapperClass(MyMapper.class); job.getConfiguration().set(PATTERN_OPTION, pattern); FileSystem fs = FileSystem.get(getConf()); if ( FileSystem.get(getConf()).exists(output)) { fs.delete(output, true); } job.waitForCompletion(true); Counters counters = job.getCounters(); int numDocs = (int) counters.findCounter(Records.TOTAL).getValue(); LOG.info("Read " + numDocs + " records."); return 0; } /** * Dispatches command-line arguments to the tool via the ToolRunner. */ public static void main(String[] args) throws Exception { LOG.info("Running " + FindWarcUrls.class.getCanonicalName() + " with args " + Arrays.toString(args)); ToolRunner.run(new FindWarcUrls(), args); } } diff --git a/src/main/java/org/warcbase/analysis/graph/ExtractLinksWac.java b/src/main/java/org/warcbase/analysis/graph/ExtractLinksWac.java index 9943fc8..5d508cb 100644 --- a/src/main/java/org/warcbase/analysis/graph/ExtractLinksWac.java +++ b/src/main/java/org/warcbase/analysis/graph/ExtractLinksWac.java @@ -1,477 +1,493 @@ +/* + * Warcbase: an open-source platform for managing web archives + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.warcbase.analysis.graph; import it.unimi.dsi.fastutil.ints.IntAVLTreeSet; import java.io.IOException; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.Arrays; import java.util.Date; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.OptionBuilder; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.log4j.Logger; import org.archive.io.ArchiveRecordHeader; import org.archive.io.arc.ARCRecord; import org.archive.io.arc.ARCRecordMetaData; import org.archive.io.warc.WARCRecord; import org.archive.util.ArchiveUtils; import org.jsoup.Jsoup; import org.jsoup.nodes.Document; import org.jsoup.nodes.Element; import org.jsoup.select.Elements; import org.warcbase.data.ArcRecordUtils; import org.warcbase.data.WarcRecordUtils; import org.warcbase.data.UrlMapping; import org.warcbase.io.ArcRecordWritable; import org.warcbase.io.WarcRecordWritable; import org.warcbase.mapreduce.WacArcInputFormat; import org.warcbase.mapreduce.WacWarcInputFormat; import com.google.common.base.Joiner; /** * Program for extracting links from ARC files. */ public class ExtractLinksWac extends Configured implements Tool { private static final Logger LOG = Logger.getLogger(ExtractLinksWac.class); private static enum Counts { RECORDS, HTML_PAGES, LINKS }; public static class ExtractLinksHdfsArcMapper extends Mapper { private final Joiner joiner = Joiner.on(","); private final IntWritable outKey = new IntWritable(); private final Text outValue = new Text(); private final DateFormat df = new SimpleDateFormat("yyyyMMdd"); private UrlMapping fst; private String beginDate, endDate; @Override public void setup(Context context) { try { Configuration conf = context.getConfiguration(); beginDate = conf.get("beginDate"); endDate = conf.get("endDate"); // There appears to be a bug in getCacheFiles() which returns null, // even though getLocalCacheFiles is deprecated... @SuppressWarnings("deprecation") Path[] localFiles = context.getLocalCacheFiles(); LOG.info("cache contents: " + Arrays.toString(localFiles)); System.out.println("cache contents: " + Arrays.toString(localFiles)); // load FST UriMapping from file fst = (UrlMapping) Class.forName(conf.get("UriMappingClass")).newInstance(); String fstFileName = localFiles[0].toString(); if (fstFileName.startsWith("file:")) { fstFileName = fstFileName.substring(5, fstFileName.length()); } fst.loadMapping(fstFileName); // simply assume only one file in distributed cache. } catch (Exception e) { e.printStackTrace(); throw new RuntimeException("Error Initializing UriMapping"); } } @Override public void map(LongWritable k, ArcRecordWritable r, Context context) throws IOException, InterruptedException { context.getCounter(Counts.RECORDS).increment(1); ARCRecord record = r.getRecord(); ARCRecordMetaData meta = record.getMetaData(); String url = meta.getUrl(); String type = meta.getMimetype(); Date date = null; try { date = ArchiveUtils.parse14DigitDate(meta.getDate()); } catch (java.text.ParseException e) { e.printStackTrace(); } if (date == null) { return; } String time = df.format(date); if (beginDate != null && endDate != null) { if (time.compareTo(beginDate) < 0 || time.compareTo(endDate) > 0) { return; } } else if (beginDate == null && endDate != null) { if (time.compareTo(endDate) > 0) { return; } } else if (beginDate != null && endDate == null) { if (time.compareTo(beginDate) < 0) { return; } } if (!type.equals("text/html")) { return; } if (fst.getID(url) == -1) { return; } context.getCounter(Counts.HTML_PAGES).increment(1); byte[] bytes = ArcRecordUtils.getBodyContent(record); Document doc = Jsoup.parse(new String(bytes, "UTF8"), url); Elements links = doc.select("a[href]"); if (links == null) { return; } outKey.set(fst.getID(url)); IntAVLTreeSet linkUrlSet = new IntAVLTreeSet(); for (Element link : links) { String linkUrl = link.attr("abs:href"); if (fst.getID(linkUrl) != -1) { // link already exists linkUrlSet.add(fst.getID(linkUrl)); } } if (linkUrlSet.size() == 0) { // Emit empty entry even if there aren't any outgoing links outValue.set(""); context.write(outKey, outValue); return; } outValue.set(joiner.join(linkUrlSet)); context.getCounter(Counts.LINKS).increment(linkUrlSet.size()); context.write(outKey, outValue); } } public static class ExtractLinksHdfsWarcMapper extends Mapper { private final Joiner joiner = Joiner.on(","); private final IntWritable outKey = new IntWritable(); private final Text outValue = new Text(); private final DateFormat df = new SimpleDateFormat("yyyyMMdd"); private final DateFormat iso8601 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssX"); private UrlMapping fst; private String beginDate, endDate; @Override public void setup(Context context) { try { Configuration conf = context.getConfiguration(); beginDate = conf.get("beginDate"); endDate = conf.get("endDate"); // There appears to be a bug in getCacheFiles() which returns null, // even though getLocalCacheFiles is deprecated... @SuppressWarnings("deprecation") Path[] localFiles = context.getLocalCacheFiles(); LOG.info("cache contents: " + Arrays.toString(localFiles)); System.out.println("cache contents: " + Arrays.toString(localFiles)); // load FST UriMapping from file fst = (UrlMapping) Class.forName(conf.get("UriMappingClass")).newInstance(); String fstFileName = localFiles[0].toString(); if (fstFileName.startsWith("file:")) { fstFileName = fstFileName.substring(5, fstFileName.length()); } fst.loadMapping(fstFileName); // simply assume only one file in distributed cache. } catch (Exception e) { e.printStackTrace(); throw new RuntimeException("Error Initializing UriMapping"); } } @Override public void map(LongWritable k, WarcRecordWritable r, Context context) throws IOException, InterruptedException { context.getCounter(Counts.RECORDS).increment(1); WARCRecord record = r.getRecord(); ArchiveRecordHeader header = record.getHeader(); byte[] recordBytes; byte[] content; String url; String type; // Corrupt records can cause these methods to throw OOM exceptions: catch and ignore record. try { recordBytes = WarcRecordUtils.toBytes(record); content = WarcRecordUtils.getContent(WarcRecordUtils.fromBytes(recordBytes)); url = header.getUrl(); type = WarcRecordUtils.getWarcResponseMimeType(content); } catch (java.lang.OutOfMemoryError e) { LOG.error("Caught OutOfMemoryError, skipping record."); return; } if (type == null) type = ""; Date date = null; try { date = iso8601.parse(header.getDate()); } catch (java.text.ParseException e) { e.printStackTrace(); } if (date == null) { return; } String time = df.format(date); if (beginDate != null && endDate != null) { if (time.compareTo(beginDate) < 0 || time.compareTo(endDate) > 0) { return; } } else if (beginDate == null && endDate != null) { if (time.compareTo(endDate) > 0) { return; } } else if (beginDate != null && endDate == null) { if (time.compareTo(beginDate) < 0) { return; } } if (!type.equals("text/html")) { return; } if (fst.getID(url) == -1) { return; } context.getCounter(Counts.HTML_PAGES).increment(1); byte[] bytes; try { bytes = WarcRecordUtils.getBodyContent(WarcRecordUtils.fromBytes(recordBytes)); } catch (Exception e) { LOG.error(e.getMessage() + ": skipping record."); return; } catch (java.lang.OutOfMemoryError e) { LOG.error("Caught OutOfMemoryError, skipping record."); return; } Document doc = Jsoup.parse(new String(bytes, "UTF8"), url); Elements links = doc.select("a[href]"); if (links == null) { return; } outKey.set(fst.getID(url)); IntAVLTreeSet linkUrlSet = new IntAVLTreeSet(); for (Element link : links) { String linkUrl = link.attr("abs:href"); if (fst.getID(linkUrl) != -1) { // link already exists linkUrlSet.add(fst.getID(linkUrl)); } } if (linkUrlSet.size() == 0) { // Emit empty entry even if there aren't any outgoing links outValue.set(""); context.write(outKey, outValue); return; } outValue.set(joiner.join(linkUrlSet)); context.getCounter(Counts.LINKS).increment(linkUrlSet.size()); context.write(outKey, outValue); } } /** * Creates an instance of this tool. */ public ExtractLinksWac() {} private static final String HDFS = "hdfs"; private static final String HBASE = "hbase"; private static final String OUTPUT = "output"; private static final String URI_MAPPING = "urlMapping"; private static final String BEGIN = "begin"; private static final String END = "end"; private static String beginDate = null, endDate = null; /** * Runs this tool. */ @SuppressWarnings({ "static-access" }) public int run(String[] args) throws Exception { Options options = new Options(); options.addOption(OptionBuilder.withArgName("path").hasArg() .withDescription("HDFS input path").create(HDFS)); options.addOption(OptionBuilder.withArgName("path").hasArg() .withDescription("HBASE table name").create(HBASE)); options.addOption(OptionBuilder.withArgName("path").hasArg() .withDescription("output path").create(OUTPUT)); options.addOption(OptionBuilder.withArgName("path").hasArg() .withDescription("uri mapping file path").create(URI_MAPPING)); options.addOption(OptionBuilder.withArgName("path").hasArg() .withDescription("begin date (optional)").create(BEGIN)); options.addOption(OptionBuilder.withArgName("path").hasArg() .withDescription("end date (optional)").create(END)); CommandLine cmdline; CommandLineParser parser = new GnuParser(); try { cmdline = parser.parse(options, args); } catch (ParseException exp) { System.err.println("Error parsing command line: " + exp.getMessage()); return -1; } if ( (!cmdline.hasOption(HDFS) && !cmdline.hasOption(HBASE)) // No HDFS and HBase input || !cmdline.hasOption(OUTPUT) || !cmdline.hasOption(URI_MAPPING)) { System.out.println("args: " + Arrays.toString(args)); HelpFormatter formatter = new HelpFormatter(); formatter.setWidth(120); formatter.printHelp(this.getClass().getName(), options); ToolRunner.printGenericCommandUsage(System.out); return -1; } FileSystem fs = FileSystem.get(getConf()); String HDFSPath = null, HBaseTableName = null; boolean isHDFSInput = true; // set default as HDFS input if (cmdline.hasOption(HDFS)) { HDFSPath = cmdline.getOptionValue(HDFS); } else { HBaseTableName = cmdline.getOptionValue(HBASE); isHDFSInput = false; } String outputPath = cmdline.getOptionValue(OUTPUT); Path mappingPath = new Path(cmdline.getOptionValue(URI_MAPPING)); LOG.info("Tool: " + ExtractLinksWac.class.getSimpleName()); if (isHDFSInput) { LOG.info(" - HDFS input path: " + HDFSPath); } else { LOG.info(" - HBase table name: " + HBaseTableName); } LOG.info(" - output path: " + outputPath); LOG.info(" - mapping file path: " + mappingPath); if (cmdline.hasOption(BEGIN)) { beginDate = cmdline.getOptionValue(BEGIN); LOG.info(" - begin date: " + beginDate); } if (cmdline.hasOption(END)) { endDate = cmdline.getOptionValue(END); LOG.info(" - end date: " + endDate); } if (!fs.exists(mappingPath)) { throw new Exception("mappingPath doesn't exist: " + mappingPath); } Configuration conf; if (isHDFSInput) { conf = getConf(); // passing global variable values to individual nodes if(beginDate != null) { conf.set("beginDate", beginDate); } if(endDate != null) { conf.set("endDate", endDate); } } else { conf = HBaseConfiguration.create(getConf()); conf.set("hbase.zookeeper.quorum", "bespinrm.umiacs.umd.edu"); } Job job = Job.getInstance(conf, ExtractLinksWac.class.getSimpleName()); job.setJarByClass(ExtractLinksWac.class); job.getConfiguration().set("UriMappingClass", UrlMapping.class.getCanonicalName()); // Put the mapping file in the distributed cache so each map worker will have it. job.addCacheFile(mappingPath.toUri()); job.setNumReduceTasks(0); // no reducers if (isHDFSInput) { // HDFS input Path path = new Path(HDFSPath); RemoteIterator itr = fs.listFiles(path, true); LocatedFileStatus fileStatus; while (itr.hasNext()) { fileStatus = itr.next(); Path p = fileStatus.getPath(); if ((p.getName().endsWith(".warc.gz")) || (p.getName().endsWith(".warc"))) { // WARC MultipleInputs.addInputPath(job, p, WacWarcInputFormat.class, ExtractLinksHdfsWarcMapper.class); } else { // Assume ARC MultipleInputs.addInputPath(job, p, WacArcInputFormat.class, ExtractLinksHdfsArcMapper.class); } } // set map (key,value) output format job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(Text.class); } else { // HBase input throw new UnsupportedOperationException("HBase not supported yet!"); } FileOutputFormat.setOutputPath(job, new Path(outputPath)); // Delete the output directory if it exists already. Path outputDir = new Path(outputPath); fs.delete(outputDir, true); long startTime = System.currentTimeMillis(); job.waitForCompletion(true); LOG.info("Job Finished in " + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds"); Counters counters = job.getCounters(); LOG.info("Read " + counters.findCounter(Counts.RECORDS).getValue() + " records."); LOG.info("Processed " + counters.findCounter(Counts.HTML_PAGES).getValue() + " HTML pages."); LOG.info("Extracted " + counters.findCounter(Counts.LINKS).getValue() + " links."); return 0; } /** * Dispatches command-line arguments to the tool via the {@code ToolRunner}. */ public static void main(String[] args) throws Exception { ToolRunner.run(new ExtractLinksWac(), args); } } diff --git a/src/main/java/org/warcbase/analysis/graph/ExtractSiteLinks.java b/src/main/java/org/warcbase/analysis/graph/ExtractSiteLinks.java index 499e6f6..51526ec 100644 --- a/src/main/java/org/warcbase/analysis/graph/ExtractSiteLinks.java +++ b/src/main/java/org/warcbase/analysis/graph/ExtractSiteLinks.java @@ -1,502 +1,518 @@ +/* + * Warcbase: an open-source platform for managing web archives + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.warcbase.analysis.graph; import it.unimi.dsi.fastutil.ints.Int2IntAVLTreeMap; import java.io.IOException; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Arrays; import java.util.Date; import java.util.List; import java.util.Map.Entry; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.OptionBuilder; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.log4j.Logger; import org.archive.io.ArchiveRecordHeader; import org.archive.io.arc.ARCRecord; import org.archive.io.arc.ARCRecordMetaData; import org.archive.io.warc.WARCRecord; import org.archive.util.ArchiveUtils; import org.jsoup.Jsoup; import org.jsoup.nodes.Document; import org.jsoup.nodes.Element; import org.jsoup.select.Elements; import org.warcbase.analysis.graph.PrefixMapping.PrefixNode; import org.warcbase.data.ArcRecordUtils; import org.warcbase.data.UrlMapping; import org.warcbase.data.WarcRecordUtils; import org.warcbase.io.ArcRecordWritable; import org.warcbase.io.WarcRecordWritable; import org.warcbase.mapreduce.WacArcInputFormat; import org.warcbase.mapreduce.WacWarcInputFormat; public class ExtractSiteLinks extends Configured implements Tool { private static final Logger LOG = Logger.getLogger(ExtractSiteLinks.class); private static enum Counts { RECORDS, HTML_PAGES, LINKS }; public static class ExtractSiteLinksArcMapper extends Mapper { private static final DateFormat df = new SimpleDateFormat("yyyyMMdd"); private static String beginDate, endDate; private static final IntWritable KEY = new IntWritable(); private static final IntWritable VALUE = new IntWritable(); private static UrlMapping fst; private static PrefixMapping prefixMap; private static ArrayList prefix; @Override public void setup(Context context) { try { Configuration conf = context.getConfiguration(); beginDate = conf.get("beginDate"); endDate = conf.get("endDate"); @SuppressWarnings("deprecation") //Path[] localFiles = DistributedCache.getLocalCacheFiles(conf); Path[] localFiles = context.getLocalCacheFiles(); // load FST UriMapping from file fst = (UrlMapping) Class.forName(conf.get("UriMappingClass")).newInstance(); String fstFileName = localFiles[0].toString(); if (fstFileName.startsWith("file:")) { fstFileName = fstFileName.substring(5, fstFileName.length()); } fst.loadMapping(fstFileName); // load Prefix Mapping from file prefixMap = (PrefixMapping) Class.forName(conf.get("PrefixMappingClass")).newInstance(); String prefixFileName = localFiles[1].toString(); if (prefixFileName.startsWith("file:")) { prefixFileName = prefixFileName.substring(5, prefixFileName.length()); } prefix = PrefixMapping.loadPrefix(prefixFileName, fst); } catch (Exception e) { e.printStackTrace(); throw new RuntimeException("Error Initializing UriMapping"); } } @Override public void map(LongWritable key, ArcRecordWritable r, Context context) throws IOException, InterruptedException { context.getCounter(Counts.RECORDS).increment(1); ARCRecord record = r.getRecord(); ARCRecordMetaData meta = record.getMetaData(); String url = meta.getUrl(); String type = meta.getMimetype(); Date date = null; try { date = ArchiveUtils.parse14DigitDate(meta.getDate()); } catch (java.text.ParseException e) { e.printStackTrace(); } if (date == null) { return; } String time = df.format(date); if (beginDate != null && endDate != null) { if (time.compareTo(beginDate) < 0 || time.compareTo(endDate) > 0) { return; } } else if (beginDate == null && endDate != null) { if (time.compareTo(endDate) > 0) { return; } } else if (beginDate != null && endDate == null) { if (time.compareTo(beginDate) < 0) { return; } } if (!type.equals("text/html")) { return; } context.getCounter(Counts.HTML_PAGES).increment(1); byte[] bytes = ArcRecordUtils.getBodyContent(record); Document doc = Jsoup.parse(new String(bytes, "UTF8"), url); Elements links = doc.select("a[href]"); // empty if none match if (links == null) { return; } int sourcePrefixId = prefixMap.getPrefixId(fst.getID(url), prefix); // this url is indexed in FST and its prefix is appeared in prefix map (thus declared in // prefix file) if (fst.getID(url) != -1 && sourcePrefixId != -1) { KEY.set(sourcePrefixId); List linkUrlList = new ArrayList(); for (Element link : links) { String linkUrl = link.attr("abs:href"); int targetPrefixId = prefixMap.getPrefixId(fst.getID(linkUrl), prefix); // target url is indexed in FST and its prefix url is found if (fst.getID(linkUrl) != -1 && targetPrefixId != -1) { linkUrlList.add(targetPrefixId); } } for (Integer linkID : linkUrlList) { VALUE.set(linkID); context.write(KEY, VALUE); } } } } public static class ExtractSiteLinksWarcMapper extends Mapper { private static final DateFormat df = new SimpleDateFormat("yyyyMMdd"); private static final DateFormat iso8601 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssX"); private static String beginDate, endDate; private static final IntWritable KEY = new IntWritable(); private static final IntWritable VALUE = new IntWritable(); private static UrlMapping fst; private static PrefixMapping prefixMap; private static ArrayList prefix; @Override public void setup(Context context) { try { Configuration conf = context.getConfiguration(); beginDate = conf.get("beginDate"); endDate = conf.get("endDate"); @SuppressWarnings("deprecation") //Path[] localFiles = DistributedCache.getLocalCacheFiles(conf); Path[] localFiles = context.getLocalCacheFiles(); // load FST UriMapping from file fst = (UrlMapping) Class.forName(conf.get("UriMappingClass")).newInstance(); String fstFileName = localFiles[0].toString(); if (fstFileName.startsWith("file:")) { fstFileName = fstFileName.substring(5, fstFileName.length()); } fst.loadMapping(fstFileName); // load Prefix Mapping from file prefixMap = (PrefixMapping) Class.forName(conf.get("PrefixMappingClass")).newInstance(); String prefixFileName = localFiles[1].toString(); if (prefixFileName.startsWith("file:")) { prefixFileName = prefixFileName.substring(5, prefixFileName.length()); } prefix = PrefixMapping.loadPrefix(prefixFileName, fst); } catch (Exception e) { e.printStackTrace(); throw new RuntimeException("Error Initializing UriMapping"); } } @Override public void map(LongWritable key, WarcRecordWritable r, Context context) throws IOException, InterruptedException { context.getCounter(Counts.RECORDS).increment(1); WARCRecord record = r.getRecord(); ArchiveRecordHeader header = record.getHeader(); byte[] recordBytes = WarcRecordUtils.toBytes(record); byte[] content = WarcRecordUtils.getContent(WarcRecordUtils.fromBytes(recordBytes)); String url = header.getUrl(); String type = WarcRecordUtils.getWarcResponseMimeType(content); if (type == null) type = ""; Date date = null; try { date = iso8601.parse(header.getDate()); } catch (java.text.ParseException e) { e.printStackTrace(); } if (date == null) { return; } String time = df.format(date); if (beginDate != null && endDate != null) { if (time.compareTo(beginDate) < 0 || time.compareTo(endDate) > 0) { return; } } else if (beginDate == null && endDate != null) { if (time.compareTo(endDate) > 0) { return; } } else if (beginDate != null && endDate == null) { if (time.compareTo(beginDate) < 0) { return; } } if (!type.equals("text/html")) { return; } context.getCounter(Counts.HTML_PAGES).increment(1); byte[] bytes = WarcRecordUtils.getBodyContent(WarcRecordUtils.fromBytes(recordBytes)); Document doc = Jsoup.parse(new String(bytes, "UTF8"), url); Elements links = doc.select("a[href]"); // empty if none match if (links == null) { return; } int sourcePrefixId = prefixMap.getPrefixId(fst.getID(url), prefix); // this url is indexed in FST and its prefix is appeared in prefix map (thus declared in // prefix file) if (fst.getID(url) != -1 && sourcePrefixId != -1) { KEY.set(sourcePrefixId); List linkUrlList = new ArrayList(); for (Element link : links) { String linkUrl = link.attr("abs:href"); int targetPrefixId = prefixMap.getPrefixId(fst.getID(linkUrl), prefix); // target url is indexed in FST and its prefix url is found if (fst.getID(linkUrl) != -1 && targetPrefixId != -1) { linkUrlList.add(targetPrefixId); } } for (Integer linkID : linkUrlList) { VALUE.set(linkID); context.write(KEY, VALUE); } } } } private static class ExtractSiteLinksReducer extends Reducer { @Override public void reduce(IntWritable key, Iterable values, Context context) throws IOException, InterruptedException { Int2IntAVLTreeMap links = new Int2IntAVLTreeMap(); // remove duplicate links for (IntWritable value : values) { if (links.containsKey(value.get())) { // increment 1 link count links.put(value.get(), links.get(value.get()) + 1); } else { links.put(value.get(), 1); } } context.getCounter(Counts.LINKS).increment(links.entrySet().size()); for (Entry link : links.entrySet()) { String outputValue = String.valueOf(link.getKey()) + "," + String.valueOf(link.getValue()); context.write(key, new Text(outputValue)); } } } /** * Creates an instance of this tool. */ public ExtractSiteLinks() { } private static final String HDFS = "hdfs"; private static final String HBASE = "hbase"; private static final String OUTPUT = "output"; private static final String URI_MAPPING = "urlMapping"; private static final String PREFIX_FILE = "prefixFile"; private static final String NUM_REDUCERS = "numReducers"; private static final String BEGIN = "begin"; private static final String END = "end"; private static String beginDate = null, endDate = null; /** * Runs this tool. */ @SuppressWarnings({ "static-access" }) public int run(String[] args) throws Exception { Options options = new Options(); options.addOption(OptionBuilder.withArgName("path").hasArg() .withDescription("HDFS input path").create(HDFS)); options.addOption(OptionBuilder.withArgName("path").hasArg() .withDescription("HBASE table name").create(HBASE)); options.addOption(OptionBuilder.withArgName("path").hasArg() .withDescription("output path").create(OUTPUT)); options.addOption(OptionBuilder.withArgName("path").hasArg() .withDescription("uri mapping file path").create(URI_MAPPING)); options.addOption(OptionBuilder.withArgName("path").hasArg() .withDescription("prefix mapping file path").create(PREFIX_FILE)); options.addOption(OptionBuilder.withArgName("num").hasArg() .withDescription("number of reducers").create(NUM_REDUCERS)); options.addOption(OptionBuilder.withArgName("path").hasArg() .withDescription("begin date (optional)").create(BEGIN)); options.addOption(OptionBuilder.withArgName("path").hasArg() .withDescription("end date (optional)").create(END)); CommandLine cmdline; CommandLineParser parser = new GnuParser(); try { cmdline = parser.parse(options, args); } catch (ParseException exp) { System.err.println("Error parsing command line: " + exp.getMessage()); return -1; } if ((!cmdline.hasOption(HDFS) && !cmdline.hasOption(HBASE)) // No HDFS and HBase input || !cmdline.hasOption(OUTPUT) || !cmdline.hasOption(URI_MAPPING) || !cmdline.hasOption(PREFIX_FILE)) { System.out.println("args: " + Arrays.toString(args)); HelpFormatter formatter = new HelpFormatter(); formatter.setWidth(120); formatter.printHelp(this.getClass().getName(), options); ToolRunner.printGenericCommandUsage(System.out); return -1; } FileSystem fs = FileSystem.get(getConf()); String HDFSPath = null, HBaseTableName = null; boolean isHDFSInput = true; // set default as HDFS input if (cmdline.hasOption(HDFS)) { HDFSPath = cmdline.getOptionValue(HDFS); } else { HBaseTableName = cmdline.getOptionValue(HBASE); isHDFSInput = false; } String outputPath = cmdline.getOptionValue(OUTPUT); Path mappingPath = new Path(cmdline.getOptionValue(URI_MAPPING)); Path prefixFilePath = new Path(cmdline.getOptionValue(PREFIX_FILE)); int reduceTasks = cmdline.hasOption(NUM_REDUCERS) ? Integer.parseInt(cmdline .getOptionValue(NUM_REDUCERS)) : 1; LOG.info("Tool: " + ExtractSiteLinks.class.getSimpleName()); if (isHDFSInput) { LOG.info(" - HDFS input path: " + HDFSPath); } else { LOG.info(" - HBase table name: " + HBaseTableName); } LOG.info(" - output path: " + outputPath); LOG.info(" - mapping file path:" + mappingPath); LOG.info(" - prefix file path:" + prefixFilePath); LOG.info(" - number of reducers: " + reduceTasks); if (cmdline.hasOption(BEGIN)) { beginDate = cmdline.getOptionValue(BEGIN); LOG.info(" - begin date: " + beginDate); } if (cmdline.hasOption(END)) { endDate = cmdline.getOptionValue(END); LOG.info(" - end date: " + endDate); } if (!fs.exists(mappingPath)) { throw new Exception("mappingPath doesn't exist: " + mappingPath); } if (!fs.exists(prefixFilePath)) { throw new Exception("prefixFilePath doesn't exist: " + prefixFilePath); } Configuration conf; if (isHDFSInput) { conf = getConf(); // passing global variable values to individual nodes if(beginDate != null) { conf.set("beginDate", beginDate); } if(endDate != null) { conf.set("endDate", endDate); } } else { conf = HBaseConfiguration.create(getConf()); conf.set("hbase.zookeeper.quorum", "bespinrm.umiacs.umd.edu"); } Job job = Job.getInstance(conf, ExtractSiteLinks.class.getSimpleName()); job.setJarByClass(ExtractSiteLinks.class); job.getConfiguration().set("UriMappingClass", UrlMapping.class.getCanonicalName()); job.getConfiguration().set("PrefixMappingClass", PrefixMapping.class.getCanonicalName()); // Put the mapping file and prefix file in the distributed cache // so each map worker will have it. job.addCacheFile(mappingPath.toUri()); job.addCacheFile(prefixFilePath.toUri()); job.setNumReduceTasks(reduceTasks); // no reducers if (isHDFSInput) { // HDFS input Path path = new Path(HDFSPath); RemoteIterator itr = fs.listFiles(path, true); LocatedFileStatus fileStatus; while (itr.hasNext()) { fileStatus = itr.next(); Path p = fileStatus.getPath(); if ((p.getName().endsWith(".warc.gz")) || (p.getName().endsWith(".warc"))) { // WARC MultipleInputs.addInputPath(job, p, WacWarcInputFormat.class, ExtractSiteLinksWarcMapper.class); } else { // Assume ARC MultipleInputs.addInputPath(job, p, WacArcInputFormat.class, ExtractSiteLinksArcMapper.class); } } // set map (key,value) output format job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(IntWritable.class); } else { // HBase input throw new UnsupportedOperationException("HBase not supported yet!"); } FileOutputFormat.setOutputPath(job, new Path(outputPath)); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(Text.class); job.setReducerClass(ExtractSiteLinksReducer.class); // Delete the output directory if it exists already. Path outputDir = new Path(outputPath); FileSystem.get(job.getConfiguration()).delete(outputDir, true); long startTime = System.currentTimeMillis(); job.waitForCompletion(true); LOG.info("Job Finished in " + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds"); Counters counters = job.getCounters(); LOG.info("Read " + counters.findCounter(Counts.RECORDS).getValue() + " records."); LOG.info("Processed " + counters.findCounter(Counts.HTML_PAGES).getValue() + " HTML pages."); LOG.info("Extracted " + counters.findCounter(Counts.LINKS).getValue() + " links."); return 0; } /** * Dispatches command-line arguments to the tool via the {@code ToolRunner}. */ public static void main(String[] args) throws Exception { ToolRunner.run(new ExtractSiteLinks(), args); } } diff --git a/src/main/java/org/warcbase/analysis/graph/InvertAnchorText.java b/src/main/java/org/warcbase/analysis/graph/InvertAnchorText.java index dcbf014..15a8a76 100644 --- a/src/main/java/org/warcbase/analysis/graph/InvertAnchorText.java +++ b/src/main/java/org/warcbase/analysis/graph/InvertAnchorText.java @@ -1,469 +1,485 @@ +/* + * Warcbase: an open-source platform for managing web archives + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.warcbase.analysis.graph; import it.unimi.dsi.fastutil.ints.Int2ObjectMap; import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; import java.io.IOException; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.Arrays; import java.util.Date; import java.util.List; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.OptionBuilder; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.log4j.Logger; import org.archive.io.ArchiveRecordHeader; import org.archive.io.arc.ARCRecord; import org.archive.io.arc.ARCRecordMetaData; import org.archive.io.warc.WARCRecord; import org.archive.util.ArchiveUtils; import org.jsoup.Jsoup; import org.jsoup.nodes.Document; import org.jsoup.nodes.Element; import org.jsoup.select.Elements; import org.warcbase.data.ArcRecordUtils; import org.warcbase.data.WarcRecordUtils; import org.warcbase.data.UrlMapping; import org.warcbase.io.ArcRecordWritable; import org.warcbase.io.WarcRecordWritable; import org.warcbase.mapreduce.WacArcInputFormat; import org.warcbase.mapreduce.WacWarcInputFormat; import com.google.common.collect.Lists; /** * Program for extracting links from ARC files or HBase. */ public class InvertAnchorText extends Configured implements Tool { private static final Logger LOG = Logger.getLogger(InvertAnchorText.class); private static enum Counts { RECORDS, HTML_PAGES, LINKS }; private static Int2ObjectMap> extractLinks(String content, String url, UrlMapping fst) throws IOException { Document doc = Jsoup.parse(content, url); Elements links = doc.select("a[href]"); // Note that if there are outgoing links to the same destination page, we retain all copies // (and their anchor texts). This behavior is explicitly different from that of ExtractLinks, // which de-duplicates outgoing links to the same destination. Int2ObjectMap> anchors = new Int2ObjectOpenHashMap>(); if (links != null) { for (Element link : links) { String linkUrl = link.attr("abs:href"); int id = fst.getID(linkUrl); if (id != -1) { if (anchors.containsKey(id)) { anchors.get(id).add(link.text()); } else { anchors.put(id, Lists.newArrayList(link.text())); } } } } return anchors; } public static class InvertAnchorTextArcMapper extends Mapper { private final DateFormat df = new SimpleDateFormat("yyyyMMdd"); private final IntWritable key = new IntWritable(); private final Text value = new Text(); private UrlMapping fst; @Override public void setup(Context context) { try { Configuration conf = context.getConfiguration(); // There appears to be a bug in getCacheFiles() which returns null, // even though getLocalCacheFiles is deprecated... @SuppressWarnings("deprecation") Path[] localFiles = context.getLocalCacheFiles(); LOG.info("cache contents: " + Arrays.toString(localFiles)); System.out.println("cache contents: " + Arrays.toString(localFiles)); // load FST UriMapping from file fst = (UrlMapping) Class.forName(conf.get("UriMappingClass")).newInstance(); String fstFileName = localFiles[0].toString(); if (fstFileName.startsWith("file:")) { fstFileName = fstFileName.substring(5, fstFileName.length()); } fst.loadMapping(fstFileName); // simply assume only one file in distributed cache. } catch (Exception e) { e.printStackTrace(); throw new RuntimeException("Error Initializing UriMapping"); } } @Override public void map(LongWritable k, ArcRecordWritable r, Context context) throws IOException, InterruptedException { context.getCounter(Counts.RECORDS).increment(1); ARCRecord record = r.getRecord(); ARCRecordMetaData meta = record.getMetaData(); String url = meta.getUrl(); String type = meta.getMimetype(); Date date = null; try { date = ArchiveUtils.parse14DigitDate(meta.getDate()); } catch (java.text.ParseException e) { e.printStackTrace(); } if (date == null) { return; } String time = df.format(date); if (beginDate != null && endDate != null) { if (time.compareTo(beginDate) < 0 || time.compareTo(endDate) > 0) { return; } } else if (beginDate == null && endDate != null) { if (time.compareTo(endDate) > 0) { return; } } else if (beginDate != null && endDate == null) { if (time.compareTo(beginDate) < 0) { return; } } int srcId = -1; try { srcId = fst.getID(url); } catch (Exception e) { LOG.error("Error looking up URL: " + url); e.printStackTrace(); } if (!type.equals("text/html") || srcId == -1) { return; } context.getCounter(Counts.HTML_PAGES).increment(1); byte[] bytes = ArcRecordUtils.getBodyContent(record); Int2ObjectMap> anchors = InvertAnchorText.extractLinks(new String(bytes, "UTF8"), url, fst); for (Int2ObjectMap.Entry> entry : anchors.int2ObjectEntrySet()) { key.set(entry.getIntKey()); for (String s : entry.getValue()) { value.set(srcId + "\t" + s); context.write(key, value); } context.getCounter(Counts.LINKS).increment(entry.getValue().size()); } } } public static class InvertAnchorTextWarcMapper extends Mapper { private final DateFormat df = new SimpleDateFormat("yyyyMMdd"); private final DateFormat iso8601 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssX"); private final IntWritable key = new IntWritable(); private final Text value = new Text(); private UrlMapping fst; @Override public void setup(Context context) { try { Configuration conf = context.getConfiguration(); // There appears to be a bug in getCacheFiles() which returns null, // even though getLocalCacheFiles is deprecated... @SuppressWarnings("deprecation") Path[] localFiles = context.getLocalCacheFiles(); LOG.info("cache contents: " + Arrays.toString(localFiles)); System.out.println("cache contents: " + Arrays.toString(localFiles)); // load FST UriMapping from file fst = (UrlMapping) Class.forName(conf.get("UriMappingClass")).newInstance(); String fstFileName = localFiles[0].toString(); if (fstFileName.startsWith("file:")) { fstFileName = fstFileName.substring(5, fstFileName.length()); } fst.loadMapping(fstFileName); // simply assume only one file in distributed cache. } catch (Exception e) { e.printStackTrace(); throw new RuntimeException("Error Initializing UriMapping"); } } @Override public void map(LongWritable k, WarcRecordWritable r, Context context) throws IOException, InterruptedException { context.getCounter(Counts.RECORDS).increment(1); WARCRecord record = r.getRecord(); ArchiveRecordHeader header = record.getHeader(); byte[] recordBytes; byte[] content; String url; String type; // Corrupt records can cause these methods to throw OOM exceptions: catch and ignore record. try { recordBytes = WarcRecordUtils.toBytes(record); content = WarcRecordUtils.getContent(WarcRecordUtils.fromBytes(recordBytes)); url = header.getUrl(); type = WarcRecordUtils.getWarcResponseMimeType(content); } catch (java.lang.OutOfMemoryError e) { LOG.error("Caught OutOfMemoryError, skipping record."); return; } if (type == null) type = ""; Date date = null; try { date = iso8601.parse(header.getDate()); } catch (java.text.ParseException e) { e.printStackTrace(); } if (date == null) { return; } String time = df.format(date); if (beginDate != null && endDate != null) { if (time.compareTo(beginDate) < 0 || time.compareTo(endDate) > 0) { return; } } else if (beginDate == null && endDate != null) { if (time.compareTo(endDate) > 0) { return; } } else if (beginDate != null && endDate == null) { if (time.compareTo(beginDate) < 0) { return; } } int srcId = -1; try { srcId = fst.getID(url); } catch (Exception e) { LOG.error("Error looking up URL: " + url); e.printStackTrace(); } if (!type.equals("text/html") || srcId == -1) { return; } context.getCounter(Counts.HTML_PAGES).increment(1); byte[] bytes; try { bytes = WarcRecordUtils.getBodyContent(WarcRecordUtils.fromBytes(recordBytes)); } catch (Exception e) { LOG.error(e.getMessage() + ": skipping record."); return; } catch (java.lang.OutOfMemoryError e) { LOG.error("Caught OutOfMemoryError, skipping record."); return; } Int2ObjectMap> anchors = InvertAnchorText.extractLinks(new String(bytes, "UTF8"), url, fst); for (Int2ObjectMap.Entry> entry : anchors.int2ObjectEntrySet()) { key.set(entry.getIntKey()); for (String s : entry.getValue()) { value.set(srcId + "\t" + s); context.write(key, value); } context.getCounter(Counts.LINKS).increment(entry.getValue().size()); } } } /** * Creates an instance of this tool. */ public InvertAnchorText() {} private static final String HDFS = "hdfs"; private static final String HBASE = "hbase"; private static final String OUTPUT = "output"; private static final String URI_MAPPING = "urlMapping"; private static final String BEGIN = "begin"; private static final String END = "end"; private static final String NUM_REDUCERS = "numReducers"; private static String beginDate = null, endDate = null; /** * Runs this tool. */ @SuppressWarnings({ "static-access" }) public int run(String[] args) throws Exception { Options options = new Options(); options.addOption(OptionBuilder.withArgName("path").hasArg() .withDescription("HDFS input path").create(HDFS)); options.addOption(OptionBuilder.withArgName("path").hasArg() .withDescription("HBASE table name").create(HBASE)); options.addOption(OptionBuilder.withArgName("path").hasArg() .withDescription("output path").create(OUTPUT)); options.addOption(OptionBuilder.withArgName("path").hasArg() .withDescription("uri mapping file path").create(URI_MAPPING)); options.addOption(OptionBuilder.withArgName("path").hasArg() .withDescription("begin date (optional)").create(BEGIN)); options.addOption(OptionBuilder.withArgName("path").hasArg() .withDescription("end date (optional)").create(END)); options.addOption(OptionBuilder.withArgName("num").hasArg() .withDescription("number of reducers").create(NUM_REDUCERS)); CommandLine cmdline; CommandLineParser parser = new GnuParser(); try { cmdline = parser.parse(options, args); } catch (ParseException exp) { System.err.println("Error parsing command line: " + exp.getMessage()); return -1; } if ( (!cmdline.hasOption(HDFS) && !cmdline.hasOption(HBASE)) // No HDFS and HBase input || !cmdline.hasOption(OUTPUT) || !cmdline.hasOption(URI_MAPPING)) { System.out.println("args: " + Arrays.toString(args)); HelpFormatter formatter = new HelpFormatter(); formatter.setWidth(120); formatter.printHelp(this.getClass().getName(), options); ToolRunner.printGenericCommandUsage(System.out); return -1; } FileSystem fs = FileSystem.get(getConf()); String HDFSPath = null, table = null; boolean isHdfs; if (cmdline.hasOption(HDFS)) { HDFSPath = cmdline.getOptionValue(HDFS); isHdfs = true; } else { table = cmdline.getOptionValue(HBASE); isHdfs = false; } String outputPath = cmdline.getOptionValue(OUTPUT); Path mappingPath = new Path(cmdline.getOptionValue(URI_MAPPING)); LOG.info("Tool: " + InvertAnchorText.class.getSimpleName()); if (isHdfs) { LOG.info(" - HDFS input path: " + HDFSPath); } else { LOG.info(" - HBase table name: " + table); } LOG.info(" - output path: " + outputPath); LOG.info(" - mapping file path: " + mappingPath); if (cmdline.hasOption(BEGIN)) { beginDate = cmdline.getOptionValue(BEGIN); LOG.info(" - begin date: " + beginDate); } if (cmdline.hasOption(END)) { endDate = cmdline.getOptionValue(END); LOG.info(" - end date: " + endDate); } if (!fs.exists(mappingPath)) { throw new Exception("mappingPath doesn't exist: " + mappingPath); } Configuration conf; if (isHdfs) { conf = getConf(); } else { conf = HBaseConfiguration.create(getConf()); conf.set("hbase.zookeeper.quorum", "bespinrm.umiacs.umd.edu"); } Job job = Job.getInstance(conf, InvertAnchorText.class.getSimpleName() + (isHdfs ? ":HDFS:" + HDFSPath : ":HBase:" + table)); job.setJarByClass(InvertAnchorText.class); job.getConfiguration().set("UriMappingClass", UrlMapping.class.getCanonicalName()); // Put the mapping file in the distributed cache so each map worker will have it. job.addCacheFile(mappingPath.toUri()); int numReducers = cmdline.hasOption(NUM_REDUCERS) ? Integer.parseInt(cmdline.getOptionValue(NUM_REDUCERS)) : 100; job.setNumReduceTasks(numReducers); if (isHdfs) { // HDFS input Path path = new Path(HDFSPath); RemoteIterator itr = fs.listFiles(path, true); LocatedFileStatus fileStatus; while (itr.hasNext()) { fileStatus = itr.next(); Path p = fileStatus.getPath(); if ((p.getName().endsWith(".warc.gz")) || (p.getName().endsWith(".warc"))) { // WARC MultipleInputs.addInputPath(job, p, WacWarcInputFormat.class, InvertAnchorTextWarcMapper.class); } else { // Assume ARC MultipleInputs.addInputPath(job, p, WacArcInputFormat.class, InvertAnchorTextArcMapper.class); } } // set map (key,value) output format job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(Text.class); } else { // HBase input throw new UnsupportedOperationException("HBase not supported yet!"); } FileOutputFormat.setOutputPath(job, new Path(outputPath)); // Delete the output directory if it exists already. Path outputDir = new Path(outputPath); fs.delete(outputDir, true); long startTime = System.currentTimeMillis(); job.waitForCompletion(true); LOG.info("Job Finished in " + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds"); Counters counters = job.getCounters(); LOG.info("Read " + counters.findCounter(Counts.RECORDS).getValue() + " records."); LOG.info("Processed " + counters.findCounter(Counts.HTML_PAGES).getValue() + " HTML pages."); LOG.info("Extracted " + counters.findCounter(Counts.LINKS).getValue() + " links."); return 0; } /** * Dispatches command-line arguments to the tool via the {@code ToolRunner}. */ public static void main(String[] args) throws Exception { ToolRunner.run(new InvertAnchorText(), args); } } diff --git a/src/main/java/org/warcbase/analysis/graph/PrefixMapping.java b/src/main/java/org/warcbase/analysis/graph/PrefixMapping.java index aab4a2e..7ba9798 100644 --- a/src/main/java/org/warcbase/analysis/graph/PrefixMapping.java +++ b/src/main/java/org/warcbase/analysis/graph/PrefixMapping.java @@ -1,94 +1,110 @@ +/* + * Warcbase: an open-source platform for managing web archives + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.warcbase.analysis.graph; import java.io.FileReader; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.List; import org.warcbase.data.UrlMapping; import au.com.bytecode.opencsv.CSVReader; public class PrefixMapping { public class PrefixNode { int id; String url; int startPos; int endPos; public PrefixNode(int id, String url, int startPos, int endPos) { this.id = id; this.url = url; this.startPos = startPos; this.endPos = endPos; } public int getId() { return id; } public String getUrl() { return url; } public int getStartPos() { return startPos; } public int getEndPos() { return endPos; } } public static ArrayList loadPrefix(String prefixFile, UrlMapping map) throws IOException { PrefixMapping instance = new PrefixMapping(); final Comparator comparator = new Comparator() { @Override public int compare(PrefixNode n1, PrefixNode n2) { if (n1.startPos > n2.startPos) { return 1; } else if (n1.startPos == n2.startPos) { return 0; } else { return -1; } } }; ArrayList prefixes = new ArrayList(); CSVReader reader = new CSVReader(new FileReader(prefixFile), ','); reader.readNext(); // Ignore first line of CSV file String[] record = null; while ((record = reader.readNext()) != null) { if (record.length < 2) continue; int id = Integer.valueOf(record[0]); String url = record[1]; List results = map.prefixSearch(url); int[] boundary = map.getIdRange(results.get(0), results.get(results.size() - 1)); PrefixNode node = instance.new PrefixNode(id, url, boundary[0], boundary[1]); prefixes.add(node); } Collections.sort(prefixes, comparator); reader.close(); return prefixes; } public int getPrefixId(int id, ArrayList prefixes) { int start = 0, end = prefixes.size() - 1; int mid; while (start <= end) { mid = (start + end) / 2; if (prefixes.get(mid).getStartPos() <= id && prefixes.get(mid).getEndPos() >= id) { return prefixes.get(mid).getId(); } else if (prefixes.get(mid).getStartPos() > id) { end = mid - 1; } else { start = mid + 1; } } return -1; } } diff --git a/src/main/java/org/warcbase/browser/SeleniumBrowser.java b/src/main/java/org/warcbase/browser/SeleniumBrowser.java index a8f12ac..b31cb9a 100644 --- a/src/main/java/org/warcbase/browser/SeleniumBrowser.java +++ b/src/main/java/org/warcbase/browser/SeleniumBrowser.java @@ -1,54 +1,70 @@ +/* + * Warcbase: an open-source platform for managing web archives + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.warcbase.browser; import java.util.List; import java.util.Random; import org.openqa.selenium.By; import org.openqa.selenium.WebDriver; import org.openqa.selenium.WebElement; import org.openqa.selenium.firefox.FirefoxDriver; import com.google.common.collect.Lists; // Hard-coded currently for the Congress108 collection; should parameterize. public class SeleniumBrowser { // These make for interesting starting points for browsing. private static final String[] jumpTargets = new String[] { "http://localhost:9090/wayback/*/http://www.senate.gov/general/contact_information/senators_cfm.cfm", "http://localhost:9090/wayback/*/http://www.house.gov/house/MemberWWW.html", "http://localhost:9090/wayback/*/http://www.senate.gov/pagelayout/committees/d_three_sections_with_teasers/committees_home.htm", "http://localhost:9090/wayback/*/http://www.house.gov/house/CommitteeWWW.html", }; public static void main(String[] args) throws InterruptedException { WebDriver driver = new FirefoxDriver(); Random r = new Random(System.currentTimeMillis()); driver.get(jumpTargets[r.nextInt(jumpTargets.length)]); for (int i = 0; i < 1000; i++) { List links = driver.findElements(By.tagName("a")); List candidates = Lists.newArrayList(); for (WebElement myElement : links) { String href = myElement.getAttribute("href"); if (href != null && href.matches("^http://localhost:9090/wayback/\\d+.*$")) { candidates.add(href); } } if (candidates.size() < 3 ) { driver.navigate().back(); } else if (r.nextFloat() < 0.1f) { String target = jumpTargets[r.nextInt(jumpTargets.length)]; System.out.println("Jumping to " + target); driver.get(target); } else { String target = candidates.get(r.nextInt(candidates.size())); System.out.println("Navigating to " + target); driver.get(target); } } driver.quit(); } } \ No newline at end of file diff --git a/src/main/java/org/warcbase/browser/WarcBrowser.java b/src/main/java/org/warcbase/browser/WarcBrowser.java index cddb14e..3e7fc8c 100755 --- a/src/main/java/org/warcbase/browser/WarcBrowser.java +++ b/src/main/java/org/warcbase/browser/WarcBrowser.java @@ -1,89 +1,105 @@ +/* + * Warcbase: an open-source platform for managing web archives + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.warcbase.browser; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.OptionBuilder; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; import org.apache.hadoop.util.ToolRunner; import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.servlet.DefaultServlet; import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.servlet.ServletHolder; public class WarcBrowser { private static final Logger LOG = Logger.getLogger(WarcBrowser.class); private final Server server; public WarcBrowser(int runningPort) throws Exception { server = new Server(runningPort); ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS); context.setContextPath("/"); server.setHandler(context); context.addServlet(new ServletHolder(new WarcBrowserServlet()), "/*"); ServletHolder holder = context.addServlet(DefaultServlet.class, "/warcbase/*"); holder.setInitParameter("resourceBase", "src/main/webapp/"); holder.setInitParameter("pathInfoOnly", "true"); } public void start() throws Exception { server.start(); } public void stop() throws Exception { server.stop(); server.join(); } public boolean isStarted() { return server.isStarted(); } public boolean isStopped() { return server.isStopped(); } private static final String PORT_OPTION = "port"; @SuppressWarnings("static-access") public static void main(String[] args) throws Exception { Options options = new Options(); options.addOption(OptionBuilder.withArgName("num") .hasArg().withDescription("port to serve on").create(PORT_OPTION)); CommandLine cmdline = null; CommandLineParser parser = new GnuParser(); try { cmdline = parser.parse(options, args); } catch (ParseException exp) { HelpFormatter formatter = new HelpFormatter(); formatter.printHelp(WarcBrowser.class.getName(), options); ToolRunner.printGenericCommandUsage(System.out); System.err.println("Error parsing command line: " + exp.getMessage()); System.exit(-1); } if (!cmdline.hasOption(PORT_OPTION)) { HelpFormatter formatter = new HelpFormatter(); formatter.printHelp(WarcBrowser.class.getClass().getName(), options); ToolRunner.printGenericCommandUsage(System.out); System.exit(-1); } int port = Integer.parseInt(cmdline.getOptionValue(PORT_OPTION)); LOG.info("Starting server on port " + port); LOG.setLevel(Level.OFF); WarcBrowser browser = new WarcBrowser(port); browser.start(); } } diff --git a/src/main/java/org/warcbase/browser/WarcBrowserServlet.java b/src/main/java/org/warcbase/browser/WarcBrowserServlet.java index f3f47e4..28fce8e 100755 --- a/src/main/java/org/warcbase/browser/WarcBrowserServlet.java +++ b/src/main/java/org/warcbase/browser/WarcBrowserServlet.java @@ -1,164 +1,180 @@ +/* + * Warcbase: an open-source platform for managing web archives + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.warcbase.browser; import java.io.IOException; import java.io.PrintWriter; import java.sql.Date; import java.text.ParseException; import java.util.Arrays; import java.util.regex.Matcher; import java.util.regex.Pattern; import javax.servlet.ServletException; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.util.Bytes; import org.apache.log4j.Logger; import org.archive.util.ArchiveUtils; import org.warcbase.data.HBaseTableManager; import org.warcbase.data.UrlUtils; public class WarcBrowserServlet extends HttpServlet { private static final long serialVersionUID = 847405540723915805L; private static final Logger LOG = Logger.getLogger(WarcBrowserServlet.class); private final Configuration hbaseConfig; private HBaseAdmin hbaseAdmin; private HConnection hbaseConnection; private final Pattern p1 = Pattern.compile("^/([^//]+)/(\\d+)/(http://.*)$"); private final Pattern p2 = Pattern.compile("^/([^//]+)/\\*/(http://.*)$"); public WarcBrowserServlet() throws IOException, MasterNotRunningException, ZooKeeperConnectionException { this.hbaseConfig = HBaseConfiguration.create(); hbaseAdmin = new HBaseAdmin(hbaseConfig); hbaseConnection = HConnectionManager.createConnection(hbaseConfig); } protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { String path = req.getPathInfo(); if (req.getQueryString() != null) { path = path + "?" + req.getQueryString(); } LOG.info("Servlet called: " + path); Matcher m1 = p1.matcher(path); if (m1.find()) { // collection, url, 14 digit date String url = m1.group(3); url = url.replaceAll(" ", "%20"); writeContent(resp, m1.group(1), url, m1.group(2)); return; } Matcher m2 = p2.matcher(path); if (m2.find()) { String url = m2.group(2); url = url.replaceAll(" ", "%20"); writeCaptureDates(resp, m2.group(1), url); return; } // Otherwise, just list the dates of the available collections listCollections(resp); } protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { PrintWriter out = resp.getWriter(); out.println(""); out.println(""); out.println("Sorry, only GET is supported."); out.println(""); out.println(""); out.close(); } public void listCollections(HttpServletResponse resp) throws IOException { HTableDescriptor[] htableDescriptors = null; htableDescriptors = hbaseAdmin.listTables(); resp.setContentType("text/plain"); resp.setStatus(HttpServletResponse.SC_OK); PrintWriter out = resp.getWriter(); for (HTableDescriptor htableDescriptor : htableDescriptors) { String tableNameTmp = htableDescriptor.getNameAsString(); out.println(tableNameTmp); } } public void writeCaptureDates(HttpServletResponse resp, String tableName, String query) throws IOException { String q = UrlUtils.urlToKey(query); HTableInterface table = hbaseConnection.getTable(tableName); Get get = new Get(Bytes.toBytes(q)); get.setMaxVersions(HBaseTableManager.MAX_VERSIONS); Result result = table.get(get); String type = null; long[] dates = new long[result.size()]; Cell[] cells = result.rawCells(); for (int i = 0; i < cells.length; i++) { dates[i] = cells[i].getTimestamp(); if (type == null && cells[i] != null) { type = Bytes.toString(CellUtil.cloneQualifier(cells[i])); } } Arrays.sort(dates, 0, cells.length); // Will the versions diff in type? resp.setContentType("text/plain"); resp.setStatus(HttpServletResponse.SC_OK); PrintWriter out = resp.getWriter(); for (int i = 0; i < cells.length; i++) { String date14digit = ArchiveUtils.get14DigitDate(new Date(dates[i])); out.println(date14digit + "\t" + type + "\t" + "/" + tableName + "/" + date14digit + "/" + query); } table.close(); } public void writeContent(HttpServletResponse resp, String tableName, String url, String date14digit) throws IOException { String key = UrlUtils.urlToKey(url); HTableInterface table = hbaseConnection.getTable(tableName); Get get = new Get(Bytes.toBytes(key)); try { get.setTimeStamp(ArchiveUtils.parse14DigitDate(date14digit).getTime()); } catch (ParseException e) { e.printStackTrace(); } Result result = table.get(get); Cell[] cells = result.rawCells(); if (cells.length == 1) { // We should have exactly one result here... byte[] data = CellUtil.cloneValue(cells[0]); String type = Bytes.toString(CellUtil.cloneQualifier(cells[0])); LOG.info("Fetching " + key + " at " + date14digit); resp.setHeader("Content-Type", type); resp.setContentLength(data.length); resp.getOutputStream().write(data); } table.close(); } } diff --git a/src/main/java/org/warcbase/data/ArcRecordUtils.java b/src/main/java/org/warcbase/data/ArcRecordUtils.java index 9eed997..7331f8c 100644 --- a/src/main/java/org/warcbase/data/ArcRecordUtils.java +++ b/src/main/java/org/warcbase/data/ArcRecordUtils.java @@ -1,105 +1,121 @@ +/* + * Warcbase: an open-source platform for managing web archives + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.warcbase.data; import java.io.BufferedInputStream; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; import java.io.IOException; import java.io.InputStream; import org.apache.log4j.Logger; import org.archive.io.arc.ARCReader; import org.archive.io.arc.ARCReaderFactory; import org.archive.io.arc.ARCRecord; import org.archive.io.arc.ARCRecordMetaData; /** * Utilities for working with {@code ARCRecord}s (from archive.org APIs). */ public class ArcRecordUtils { private static final Logger LOG = Logger.getLogger(ArcRecordUtils.class); // TODO: these methods work fine, but there's a lot of unnecessary buffer copying, which is // terrible from a performance perspective. /** * Converts raw bytes into an {@code ARCRecord}. * * @param bytes raw bytes * @return parsed {@code ARCRecord} * @throws IOException */ public static ARCRecord fromBytes(byte[] bytes) throws IOException { ARCReader reader = (ARCReader) ARCReaderFactory.get("", new BufferedInputStream(new ByteArrayInputStream(bytes)), false); return (ARCRecord) reader.get(); } public static byte[] toBytes(ARCRecord record) throws IOException { ARCRecordMetaData meta = record.getMetaData(); String metaline = meta.getUrl() + " " + meta.getIp() + " " + meta.getDate() + " " + meta.getMimetype() + " " + (int) meta.getLength(); ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream dout = new DataOutputStream(baos); dout.write(metaline.getBytes()); dout.write("\n".getBytes()); copyStream(record, (int) meta.getLength(), true, dout); return baos.toByteArray(); } /** * Extracts raw contents from an {@code ARCRecord} (including HTTP headers). * * @param record the {@code ARCRecord} * @return raw contents * @throws IOException */ public static byte[] getContent(ARCRecord record) throws IOException { ARCRecordMetaData meta = record.getMetaData(); ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream dout = new DataOutputStream(baos); copyStream(record, (int) meta.getLength(), true, dout); return baos.toByteArray(); } /** * Extracts contents of the body from an {@code ARCRecord} (excluding HTTP headers). * * @param record the {@code ARCRecord} * @return contents of the body * @throws IOException */ public static byte[] getBodyContent(ARCRecord record) throws IOException { byte[] raw = getContent(record); int bodyOffset = record.getBodyOffset(); byte[] content = new byte[raw.length - bodyOffset]; System.arraycopy(raw, bodyOffset, content, 0, content.length); return content; } private static long copyStream(final InputStream is, final int recordLength, boolean enforceLength, final DataOutputStream out) throws IOException { byte [] scratchbuffer = new byte[recordLength]; int read = 0; long tot = 0; while ((tot < recordLength) && (read = is.read(scratchbuffer)) != -1) { int write = read; // never write more than enforced length write = (int) Math.min(write, recordLength - tot); tot += read; out.write(scratchbuffer, 0, write); } if (enforceLength && tot != recordLength) { LOG.error("Read " + tot + " bytes but expected " + recordLength + " bytes. Continuing..."); } return tot; } } diff --git a/src/main/java/org/warcbase/data/HBaseTableManager.java b/src/main/java/org/warcbase/data/HBaseTableManager.java index 0c0ca2d..e8ac5b1 100755 --- a/src/main/java/org/warcbase/data/HBaseTableManager.java +++ b/src/main/java/org/warcbase/data/HBaseTableManager.java @@ -1,85 +1,101 @@ +/* + * Warcbase: an open-source platform for managing web archives + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.warcbase.data; import java.lang.reflect.Field; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.util.Bytes; import org.apache.log4j.Logger; import org.archive.util.ArchiveUtils; import org.warcbase.ingest.IngestFiles; public class HBaseTableManager { private static final Logger LOG = Logger.getLogger(HBaseTableManager.class); private static final String[] FAMILIES = { "c" }; private static final int MAX_KEY_VALUE_SIZE = IngestFiles.MAX_CONTENT_SIZE + 1024; // Add a bit of padding for headers, etc. public static final int MAX_VERSIONS = Integer.MAX_VALUE; private final HTable table; private final HBaseAdmin admin; public HBaseTableManager(String name, boolean create, Compression.Algorithm compression) throws Exception { Configuration hbaseConfig = HBaseConfiguration.create(); admin = new HBaseAdmin(hbaseConfig); if (admin.tableExists(name) && !create) { LOG.info(String.format("Table '%s' exists: doing nothing.", name)); } else { if (admin.tableExists(name)) { LOG.info(String.format("Table '%s' exists: dropping table and recreating.", name)); LOG.info(String.format("Disabling table '%s'", name)); admin.disableTable(name); LOG.info(String.format("Droppping table '%s'", name)); admin.deleteTable(name); } HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(name)); for (int i = 0; i < FAMILIES.length; i++) { HColumnDescriptor hColumnDesc = new HColumnDescriptor(FAMILIES[i]); hColumnDesc.setMaxVersions(MAX_VERSIONS); hColumnDesc.setCompressionType(compression); hColumnDesc.setCompactionCompressionType(compression); hColumnDesc.setTimeToLive(HConstants.FOREVER); tableDesc.addFamily(hColumnDesc); } admin.createTable(tableDesc); LOG.info(String.format("Successfully created table '%s'", name)); } table = new HTable(hbaseConfig, name); Field maxKeyValueSizeField = HTable.class.getDeclaredField("maxKeyValueSize"); maxKeyValueSizeField.setAccessible(true); maxKeyValueSizeField.set(table, MAX_KEY_VALUE_SIZE); LOG.info("Setting maxKeyValueSize to " + maxKeyValueSizeField.get(table)); admin.close(); } public boolean insertRecord(final String key, final String date14digits, final byte[] data, final String type) { try { long timestamp = ArchiveUtils.parse14DigitDate(date14digits).getTime(); Put put = new Put(Bytes.toBytes(key)); put.setDurability(Durability.SKIP_WAL); put.add(Bytes.toBytes(FAMILIES[0]), Bytes.toBytes(type), timestamp, data); table.put(put); return true; } catch (Exception e) { LOG.error("Couldn't insert key: " + key + ", size: " + data.length); LOG.error(e.getMessage()); e.printStackTrace(); return false; } } } diff --git a/src/main/java/org/warcbase/data/RecordUtils.java b/src/main/java/org/warcbase/data/RecordUtils.java index edd4bce..93dd3f2 100644 --- a/src/main/java/org/warcbase/data/RecordUtils.java +++ b/src/main/java/org/warcbase/data/RecordUtils.java @@ -1,5 +1,21 @@ +/* + * Warcbase: an open-source platform for managing web archives + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.warcbase.data; public interface RecordUtils { } diff --git a/src/main/java/org/warcbase/data/UrlMapping.java b/src/main/java/org/warcbase/data/UrlMapping.java index c29721e..917c92b 100644 --- a/src/main/java/org/warcbase/data/UrlMapping.java +++ b/src/main/java/org/warcbase/data/UrlMapping.java @@ -1,237 +1,253 @@ +/* + * Warcbase: an open-source platform for managing web archives + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.warcbase.data; import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.OptionBuilder; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; import org.apache.log4j.Logger; import org.apache.lucene.util.ArrayUtil; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.IntsRef; import org.apache.lucene.util.fst.FST; import org.apache.lucene.util.fst.FST.Arc; import org.apache.lucene.util.fst.FST.BytesReader; import org.apache.lucene.util.fst.PositiveIntOutputs; import org.apache.lucene.util.fst.Util; import org.warcbase.ingest.IngestFiles; public class UrlMapping { private static final Logger LOG = Logger.getLogger(UrlMapping.class); private FST fst; public UrlMapping(FST fst) { this.fst = fst; } public UrlMapping() { } public UrlMapping(String outputFileName) { PositiveIntOutputs outputs = PositiveIntOutputs.getSingleton(); File outputFile = new File(outputFileName); try { this.fst = FST.read(outputFile, outputs); } catch (IOException e) { LOG.error("Build FST Failed!"); e.printStackTrace(); } } public void loadMapping(String outputFileName) { UrlMapping tmp = new UrlMapping(outputFileName); this.fst = tmp.fst; } public FST getFst() { return fst; } public int getID(String url) { Long id = null; try { id = Util.get(fst, new BytesRef(url)); } catch (IOException e) { // Log error, but assume that URL doesn't exist. LOG.error("Error fetching " + url); e.printStackTrace(); return -1; } return id == null ? -1 : id.intValue(); } public String getUrl(int id) { BytesRef scratchBytes = new BytesRef(); IntsRef key = null; try { key = Util.getByOutput(fst, id); } catch (IOException e) { LOG.error("Error id " + id); e.printStackTrace(); return null; } if (key == null) { return null; } return Util.toBytesRef(key, scratchBytes).utf8ToString(); } public List prefixSearch(String prefix) { if (prefix == null || prefix.length() == 0 ) { return new ArrayList(); } List strResults = null; try { // descend to the arc of the prefix string Arc arc = fst.getFirstArc(new Arc()); BytesReader fstReader = fst.getBytesReader(); BytesRef bref = new BytesRef(prefix); for (int i = 0; i < bref.length; i++) { Arc retArc = fst.findTargetArc(bref.bytes[i + bref.offset] & 0xFF, arc, arc, fstReader); if (retArc == null) { // no matched prefix return new ArrayList(); } } // collect all substrings started from the arc of prefix string. List result = new ArrayList(); BytesRef newPrefixBref = new BytesRef(prefix.substring(0, prefix.length() - 1)); collect(result, fstReader, newPrefixBref, arc); // convert BytesRef results to String results strResults = new ArrayList(); Iterator iter = result.iterator(); while (iter.hasNext()) { strResults.add(iter.next().utf8ToString()); } } catch (IOException e) { LOG.error("Error: " + e); e.printStackTrace(); return new ArrayList(); } return strResults; } public int[] getIdRange(String first, String last){ if (first == null || last == null) { return null; } Long startId = null, endId = null; try { startId = Util.get(fst, new BytesRef(first)); endId = Util.get(fst, new BytesRef(last)); if (startId == null || endId == null) { return null; } } catch (IOException e) { LOG.error("Error: " + e); e.printStackTrace(); return null; } return new int[] { (int) startId.longValue(), (int) endId.longValue() }; } private boolean collect(List res, BytesReader fstReader, BytesRef output, Arc arc) throws IOException { if (output.length == output.bytes.length) { output.bytes = ArrayUtil.grow(output.bytes); } assert output.offset == 0; output.bytes[output.length++] = (byte) arc.label; fst.readFirstTargetArc(arc, arc, fstReader); while (true) { if (arc.label == FST.END_LABEL) { res.add(BytesRef.deepCopyOf(output)); } else { int save = output.length; if (collect(res, fstReader, output, new Arc().copyFrom(arc))) { return true; } output.length = save; } if (arc.isLast()) { break; } fst.readNextArc(arc, fstReader); } return false; } @SuppressWarnings("static-access") public static void main(String[] args) throws Exception { final String DATA = "data"; final String ID = "getId"; final String URL = "getUrl"; final String PREFIX = "getPrefix"; Options options = new Options(); options.addOption(OptionBuilder.withArgName("path").hasArg() .withDescription("FST data file").create(DATA)); options.addOption(OptionBuilder.withArgName("path").hasArg() .withDescription("get id").create(ID)); options.addOption(OptionBuilder.withArgName("path").hasArg() .withDescription("get url").create(URL)); options.addOption(OptionBuilder.withArgName("path").hasArg() .withDescription("get prefix").create(PREFIX)); CommandLine cmdline = null; CommandLineParser parser = new GnuParser(); try { cmdline = parser.parse(options, args); } catch (ParseException exp) { System.err.println("Error parsing command line: " + exp.getMessage()); System.exit(-1); } if (!cmdline.hasOption(DATA) || (!cmdline.hasOption(ID) && !cmdline.hasOption(URL) && !cmdline.hasOption(PREFIX))) { HelpFormatter formatter = new HelpFormatter(); formatter.printHelp(IngestFiles.class.getCanonicalName(), options); System.exit(-1); } String filePath = cmdline.getOptionValue(DATA); UrlMapping map = new UrlMapping(filePath); map.loadMapping(filePath); if (cmdline.hasOption(ID)) { String url = cmdline.getOptionValue(ID); System.out.println(map.getID(url)); } if (cmdline.hasOption(URL)) { int id = Integer.parseInt(cmdline.getOptionValue(URL)); System.out.println(map.getUrl(id)); } if (cmdline.hasOption(PREFIX)) { String prefix = cmdline.getOptionValue(PREFIX); List urls = map.prefixSearch(prefix); for (String s : urls) { System.out.println(s); } } } } diff --git a/src/main/java/org/warcbase/data/UrlMappingBuilder.java b/src/main/java/org/warcbase/data/UrlMappingBuilder.java index 81f5819..fe5cb67 100644 --- a/src/main/java/org/warcbase/data/UrlMappingBuilder.java +++ b/src/main/java/org/warcbase/data/UrlMappingBuilder.java @@ -1,127 +1,143 @@ +/* + * Warcbase: an open-source platform for managing web archives + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.warcbase.data; import java.io.BufferedReader; import java.io.File; import java.io.FileReader; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.OptionBuilder; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; import org.apache.hadoop.util.ToolRunner; import org.apache.log4j.Logger; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.IntsRef; import org.apache.lucene.util.fst.Builder; import org.apache.lucene.util.fst.FST; import org.apache.lucene.util.fst.FST.INPUT_TYPE; import org.apache.lucene.util.fst.PositiveIntOutputs; import org.apache.lucene.util.fst.Util; public class UrlMappingBuilder { private static final Logger LOG = Logger.getLogger(UrlMappingBuilder.class); private static void readUrlFromFile(File f, List urls) throws IOException { BufferedReader br = new BufferedReader(new FileReader(f)); String line; while ((line = br.readLine()) != null) { if (!line.equals("")) { String url = line.split("\\s+")[0]; urls.add(url); } } LOG.info("Read " + f + ", " + urls.size() + " URLs"); br.close(); } private static List readUrlFromFolder(String folderName) throws IOException { File folder = new File(folderName); List urls = new ArrayList(); if (folder.isDirectory()) { for (File file : folder.listFiles()) { readUrlFromFile(file, urls); } } else { readUrlFromFile(folder, urls); } LOG.info("Sorting URLs..."); Collections.sort(urls); // sort URLs alphabetically LOG.info("Done sorting!"); return urls; } public static final String INPUT_OPTION = "input"; public static final String OUTPUT_OPTION = "output"; @SuppressWarnings("static-access") public static void main(String[] args) throws IOException { Options options = new Options(); options.addOption(OptionBuilder.withArgName("path").hasArg() .withDescription("input path").create(INPUT_OPTION)); options.addOption(OptionBuilder.withArgName("path").hasArg() .withDescription("output path").create(OUTPUT_OPTION)); CommandLine cmdline = null; CommandLineParser parser = new GnuParser(); try { cmdline = parser.parse(options, args); } catch (ParseException exp) { HelpFormatter formatter = new HelpFormatter(); formatter.printHelp(UrlMappingBuilder.class.getName(), options); ToolRunner.printGenericCommandUsage(System.out); System.err.println("Error parsing command line: " + exp.getMessage()); System.exit(-1); } if (!cmdline.hasOption(INPUT_OPTION) || !cmdline.hasOption(OUTPUT_OPTION)) { HelpFormatter formatter = new HelpFormatter(); formatter.printHelp(UrlMappingBuilder.class.getName(), options); ToolRunner.printGenericCommandUsage(System.out); System.exit(-1); } String input = cmdline.getOptionValue(INPUT_OPTION); String output = cmdline.getOptionValue(OUTPUT_OPTION); List inputValues = null; try { inputValues = readUrlFromFolder(input); // read data } catch (IOException e) { e.printStackTrace(); } PositiveIntOutputs outputs = PositiveIntOutputs.getSingleton(); Builder builder = new Builder(INPUT_TYPE.BYTE1, outputs); BytesRef scratchBytes = new BytesRef(); IntsRef scratchInts = new IntsRef(); for (int i = 0; i < inputValues.size(); i++) { if (i % 100000 == 0) { LOG.info(i + " URLs processed."); } scratchBytes.copyChars((String) inputValues.get(i)); try { builder.add(Util.toIntsRef(scratchBytes, scratchInts), (long) i); } catch (UnsupportedOperationException e) { System.out.println("Duplicate URL:" + inputValues.get(i)); } catch (IOException e) { e.printStackTrace(); } } FST fst = builder.finish(); // Save FST to file File outputFile = new File(output); fst.save(outputFile); LOG.info("Wrote output to " + output); } } diff --git a/src/main/java/org/warcbase/data/UrlMappingMapReduceBuilder.java b/src/main/java/org/warcbase/data/UrlMappingMapReduceBuilder.java index b7f277e..d5a309c 100644 --- a/src/main/java/org/warcbase/data/UrlMappingMapReduceBuilder.java +++ b/src/main/java/org/warcbase/data/UrlMappingMapReduceBuilder.java @@ -1,254 +1,270 @@ +/* + * Warcbase: an open-source platform for managing web archives + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.warcbase.data; import java.io.IOException; import java.io.OutputStream; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.OptionBuilder; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.log4j.Logger; import org.apache.lucene.store.OutputStreamDataOutput; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.IntsRef; import org.apache.lucene.util.fst.Builder; import org.apache.lucene.util.fst.FST; import org.apache.lucene.util.fst.FST.INPUT_TYPE; import org.apache.lucene.util.fst.PositiveIntOutputs; import org.apache.lucene.util.fst.Util; import org.archive.io.ArchiveRecordHeader; import org.warcbase.io.ArcRecordWritable; import org.warcbase.io.WarcRecordWritable; import org.warcbase.mapreduce.WacArcInputFormat; import org.warcbase.mapreduce.WacWarcInputFormat; public class UrlMappingMapReduceBuilder extends Configured implements Tool { private static final Logger LOG = Logger.getLogger(UrlMappingMapReduceBuilder.class); private static enum Records { TOTAL, UNIQUE }; public static class ArcUriMappingBuilderMapper extends Mapper { public static final Text KEY = new Text(); public static final Text VALUE = new Text(); public void map(LongWritable key, ArcRecordWritable record, Context context) throws IOException, InterruptedException { context.getCounter(Records.TOTAL).increment(1); String url = record.getRecord().getMetaData().getUrl(); if (url.startsWith("http://")) { KEY.set(url); context.write(KEY, VALUE); } } } public static class WarcUriMappingBuilderMapper extends Mapper { public static final Text KEY = new Text(); public static final Text VALUE = new Text(); public void map(LongWritable key, WarcRecordWritable record, Context context) throws IOException, InterruptedException { context.getCounter(Records.TOTAL).increment(1); ArchiveRecordHeader header = record.getRecord().getHeader(); if (header.getHeaderValue("WARC-Type").equals("response")) { return; } String url = header.getUrl(); if ((url != null) && url.startsWith("http://")) { KEY.set(url); context.write(KEY, VALUE); } } } public static class UrlMappingBuilderReducer extends Reducer { public static List urls = new ArrayList(); private static String path; @Override public void setup(Context context) { // read PATH variable, which is where to write the FST data Configuration conf = context.getConfiguration(); path = conf.get("PATH"); } @Override public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { context.getCounter(Records.UNIQUE).increment(1); urls.add(key.toString()); } @Override public void cleanup(Context context) throws IOException { PositiveIntOutputs outputs = PositiveIntOutputs.getSingleton(); Builder builder = new Builder(INPUT_TYPE.BYTE1, outputs); BytesRef scratchBytes = new BytesRef(); IntsRef scratchInts = new IntsRef(); for (int i = 0; i < urls.size(); i++) { if (i % 100000 == 0) { LOG.info(i + " URLs processed."); } scratchBytes.copyChars((String) urls.get(i)); try { builder.add(Util.toIntsRef(scratchBytes, scratchInts), (long) i); } catch (UnsupportedOperationException e) { LOG.error("Duplicate URL:" + urls.get(i)); } catch (IOException e) { LOG.error(e.getMessage()); e.printStackTrace(); } } FST fst = builder.finish(); LOG.info("PATH: " + path); // Delete the output directory if it exists already. Path outputDir = new Path(path); FileSystem.get(context.getConfiguration()).delete(outputDir, true); // Save FST to file FileSystem fs = FileSystem.get(context.getConfiguration()); Path fstPath = new Path(path); OutputStream fStream = fs.create(fstPath); OutputStreamDataOutput fstStream = new OutputStreamDataOutput(fStream); boolean success = false; try { LOG.info("Writing output..."); fst.save(fstStream); success = true; } finally { if (success) { IOUtils.close(fstStream); LOG.info("Done!"); } else { IOUtils.closeWhileHandlingException(fstStream); LOG.info("Error!"); } } } } public UrlMappingMapReduceBuilder() {} private static final String INPUT = "input"; private static final String OUTPUT = "output"; /** * Runs this tool. */ @SuppressWarnings({ "static-access" }) public int run(String[] args) throws Exception { Options options = new Options(); options.addOption(OptionBuilder.withArgName("path") .hasArg().withDescription("input path").create(INPUT)); options.addOption(OptionBuilder.withArgName("path") .hasArg().withDescription("output path").create(OUTPUT)); CommandLine cmdline; CommandLineParser parser = new GnuParser(); try { cmdline = parser.parse(options, args); } catch (ParseException exp) { System.err.println("Error parsing command line: " + exp.getMessage()); return -1; } if (!cmdline.hasOption(INPUT) || !cmdline.hasOption(OUTPUT)) { System.out.println("args: " + Arrays.toString(args)); HelpFormatter formatter = new HelpFormatter(); formatter.setWidth(120); formatter.printHelp(this.getClass().getName(), options); ToolRunner.printGenericCommandUsage(System.out); return -1; } String inputPath = cmdline.getOptionValue(INPUT); String outputPath = cmdline.getOptionValue(OUTPUT); LOG.info("- input path: " + inputPath); LOG.info("- output path: " + outputPath); Configuration conf = getConf(); conf.set("PATH", outputPath); conf.set("mapreduce.reduce.java.opts", "-Xmx5120m"); Job job = Job.getInstance(conf, UrlMappingMapReduceBuilder.class.getSimpleName()); job.setJarByClass(UrlMappingMapReduceBuilder.class); job.getConfiguration().set("UriMappingBuilderClass", UrlMappingMapReduceBuilder.class.getCanonicalName()); Path path = new Path(inputPath); FileSystem fs = path.getFileSystem(conf); RemoteIterator itr = fs.listFiles(path, true); LocatedFileStatus fileStatus; while (itr.hasNext()) { fileStatus = itr.next(); Path p = fileStatus.getPath(); if ((p.getName().endsWith(".warc.gz")) || (p.getName().endsWith(".warc"))) { // WARC MultipleInputs.addInputPath(job, p, WacWarcInputFormat.class, WarcUriMappingBuilderMapper.class); } else { // Assume ARC MultipleInputs.addInputPath(job, p, WacArcInputFormat.class, ArcUriMappingBuilderMapper.class); } } job.setOutputFormatClass(NullOutputFormat.class); // no output // set map (key,value) output format job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setReducerClass(UrlMappingBuilderReducer.class); // all the keys are shuffled to a single reducer job.setNumReduceTasks(1); long startTime = System.currentTimeMillis(); job.waitForCompletion(true); LOG.info("Job Finished in " + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds"); Counters counters = job.getCounters(); LOG.info("Read " + counters.findCounter(Records.TOTAL).getValue() + " total URLs."); LOG.info("Read " + counters.findCounter(Records.UNIQUE).getValue() + " unique URLs."); return 0; } public static void main(String[] args) throws Exception { ToolRunner.run(new UrlMappingMapReduceBuilder(), args); } } diff --git a/src/main/java/org/warcbase/data/UrlUtils.java b/src/main/java/org/warcbase/data/UrlUtils.java index dd2d6c8..f9688ac 100755 --- a/src/main/java/org/warcbase/data/UrlUtils.java +++ b/src/main/java/org/warcbase/data/UrlUtils.java @@ -1,79 +1,95 @@ +/* + * Warcbase: an open-source platform for managing web archives + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.warcbase.data; import java.net.MalformedURLException; import java.net.URL; import java.util.Arrays; import com.google.common.base.Joiner; import com.google.common.collect.Lists; public class UrlUtils { private static final Joiner JOINER = Joiner.on("."); public static String urlToKey(String in) { URL url = null; try { url = new URL(in); } catch (MalformedURLException mue) { return null; } String host = url.getHost(); StringBuilder key = new StringBuilder(); key.append(JOINER.join(Lists.reverse(Arrays.asList(host.split("\\.", 0))))); int port = url.getPort(); if (port != -1) { key.append(":").append(port); } key.append(url.getFile()); return key.toString(); } public static String reverseHostname(String h) { String[] splits = h.split("\\/"); String[] ports = splits[0].split("\\:", 0); StringBuilder host = new StringBuilder(); host.append(JOINER.join(Lists.reverse(Arrays.asList(ports[0].split("\\.", 0))))); if (ports.length > 1) { host.append(":" + ports[1]); } return host.toString(); } public static String keyToUrl(String reverse) { String domain = UrlUtils.getDomain(reverse); domain = UrlUtils.reverseHostname(domain); String[] splits = reverse.split("\\/"); if (splits.length < 2) { return domain; } String file = reverse.substring(splits[0].length()); return "http://" + domain + file; } // This method doesn't really make sense... should really be going with MIME types public static String getFileType(String url) { if (url.length() > 0 && url.charAt(url.length() - 1) == '/') return ""; String[] splits = url.split("\\/"); if (splits.length == 0) return ""; splits = splits[splits.length - 1].split("\\."); if (splits.length <= 1) return ""; String type = splits[splits.length - 1]; if (type.length() > 8) return ""; if (type.length() == 1 && Character.isDigit(type.charAt(0))) return ""; return type; } public static String getDomain(String url) { String[] splits = url.split("\\/"); return splits[0]; } } diff --git a/src/main/java/org/warcbase/data/WarcRecordUtils.java b/src/main/java/org/warcbase/data/WarcRecordUtils.java index bbf6858..0b2065a 100644 --- a/src/main/java/org/warcbase/data/WarcRecordUtils.java +++ b/src/main/java/org/warcbase/data/WarcRecordUtils.java @@ -1,125 +1,141 @@ +/* + * Warcbase: an open-source platform for managing web archives + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.warcbase.data; import org.apache.commons.httpclient.HttpParser; import org.apache.log4j.Logger; import org.archive.io.warc.WARCConstants; import org.archive.io.warc.WARCReader; import org.archive.io.warc.WARCReaderFactory; import org.archive.io.warc.WARCRecord; import java.io.*; import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; /** * Utilities for working with {@code WARCRecord}s (from archive.org APIs). */ public class WarcRecordUtils implements WARCConstants, RecordUtils { private static final Logger LOG = Logger.getLogger(WarcRecordUtils.class); // TODO: these methods work fine, but there's a lot of unnecessary buffer copying, which is // terrible from a performance perspective. /** * Converts raw bytes into an {@code WARCRecord}. * * @param bytes raw bytes * @return parsed {@code WARCRecord} * @throws IOException */ public static WARCRecord fromBytes(byte[] bytes) throws IOException { WARCReader reader = (WARCReader) WARCReaderFactory.get("", new BufferedInputStream(new ByteArrayInputStream(bytes)), false); return (WARCRecord) reader.get(); } public static byte[] toBytes(WARCRecord record) throws IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream dout = new DataOutputStream(baos); dout.write("WARC/0.17\n".getBytes()); for (Map.Entry entry : record.getHeader().getHeaderFields().entrySet()) { dout.write((entry.getKey() + ": " + entry.getValue().toString() + "\n").getBytes()); } dout.write("\n".getBytes()); record.dump(dout); return baos.toByteArray(); } /** * Extracts the MIME type of WARC response records (i.e., "WARC-Type" is "response"). * Note that this is different from the "Content-Type" in the WARC header. * * @param contents raw contents of the WARC response record * @return MIME type */ public static String getWarcResponseMimeType(byte[] contents) { // This is a somewhat janky way to get the MIME type of the response. // Note that this is different from the "Content-Type" in the WARC header. Pattern pattern = Pattern.compile("Content-Type: ([^\\s]+)", Pattern.CASE_INSENSITIVE); Matcher matcher = pattern.matcher(new String(contents)); if (matcher.find()) { return matcher.group(1).replaceAll(";$", ""); } return null; } /** * Extracts raw contents from a {@code WARCRecord} (including HTTP headers). * * @param record the {@code WARCRecord} * @return raw contents * @throws IOException */ public static byte[] getContent(WARCRecord record) throws IOException { int len = (int) record.getHeader().getContentLength(); ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream dout = new DataOutputStream(baos); copyStream(record, len, true, dout); return baos.toByteArray(); } /** * Extracts contents of the body from a {@code WARCRecord} (excluding HTTP headers). * * @param record the {@code WARCRecord} * @return contents of the body * @throws IOException */ public static byte[] getBodyContent(WARCRecord record) throws IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); String line = HttpParser.readLine(record, WARC_HEADER_ENCODING); if (line == null) { return null; } // Just using parseHeaders to move down input stream to body HttpParser.parseHeaders(record, WARC_HEADER_ENCODING); record.dump(baos); return baos.toByteArray(); } private static long copyStream(final InputStream is, final int recordLength, boolean enforceLength, final DataOutputStream out) throws IOException { byte [] scratchbuffer = new byte[recordLength]; int read = 0; long tot = 0; while ((tot < recordLength) && (read = is.read(scratchbuffer)) != -1) { int write = read; // never write more than enforced length write = (int) Math.min(write, recordLength - tot); tot += read; out.write(scratchbuffer, 0, write); } if (enforceLength && tot != recordLength) { LOG.error("Read " + tot + " bytes but expected " + recordLength + " bytes. Continuing..."); } return tot; } } diff --git a/src/main/java/org/warcbase/demo/WacMapReduceArcDemo.java b/src/main/java/org/warcbase/demo/WacMapReduceArcDemo.java index 9154928..c2e7989 100644 --- a/src/main/java/org/warcbase/demo/WacMapReduceArcDemo.java +++ b/src/main/java/org/warcbase/demo/WacMapReduceArcDemo.java @@ -1,132 +1,148 @@ +/* + * Warcbase: an open-source platform for managing web archives + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.warcbase.demo; import java.io.IOException; import java.util.Arrays; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.OptionBuilder; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.log4j.Logger; import org.archive.io.arc.ARCRecord; import org.archive.io.arc.ARCRecordMetaData; import org.warcbase.io.ArcRecordWritable; import org.warcbase.mapreduce.WacArcInputFormat; public class WacMapReduceArcDemo extends Configured implements Tool { private static final Logger LOG = Logger.getLogger(WacMapReduceArcDemo.class); public static enum Records { TOTAL }; public static class MyMapper extends Mapper { @Override public void map(LongWritable key, ArcRecordWritable r, Context context) throws IOException, InterruptedException { context.getCounter(Records.TOTAL).increment(1); ARCRecord record = r.getRecord(); ARCRecordMetaData meta = record.getMetaData(); String url = meta.getUrl(); String date = meta.getDate(); String type = meta.getMimetype(); context.write(new Text(url + " " + type), new Text(date)); } } public WacMapReduceArcDemo() {} public static final String INPUT_OPTION = "input"; public static final String OUTPUT_OPTION = "output"; /** * Runs this tool. */ @SuppressWarnings("static-access") public int run(String[] args) throws Exception { Options options = new Options(); options.addOption(OptionBuilder.withArgName("path").hasArg() .withDescription("input path").create(INPUT_OPTION)); options.addOption(OptionBuilder.withArgName("path").hasArg() .withDescription("output path").create(OUTPUT_OPTION)); CommandLine cmdline; CommandLineParser parser = new GnuParser(); try { cmdline = parser.parse(options, args); } catch (ParseException exp) { HelpFormatter formatter = new HelpFormatter(); formatter.printHelp(this.getClass().getName(), options); ToolRunner.printGenericCommandUsage(System.out); System.err.println("Error parsing command line: " + exp.getMessage()); return -1; } if (!cmdline.hasOption(INPUT_OPTION) || !cmdline.hasOption(OUTPUT_OPTION)) { HelpFormatter formatter = new HelpFormatter(); formatter.printHelp(this.getClass().getName(), options); ToolRunner.printGenericCommandUsage(System.out); return -1; } String input = cmdline.getOptionValue(INPUT_OPTION); Path output = new Path(cmdline.getOptionValue(OUTPUT_OPTION)); LOG.info("Tool name: " + WacMapReduceArcDemo.class.getSimpleName()); LOG.info(" - input: " + input); LOG.info(" - output: " + output); Job job = Job.getInstance(getConf(), WacMapReduceArcDemo.class.getSimpleName() + ":" + input); job.setJarByClass(WacMapReduceArcDemo.class); job.setNumReduceTasks(0); FileInputFormat.addInputPaths(job, input); FileOutputFormat.setOutputPath(job, output); job.setInputFormatClass(WacArcInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setMapperClass(MyMapper.class); FileSystem fs = FileSystem.get(getConf()); if ( FileSystem.get(getConf()).exists(output)) { fs.delete(output, true); } long startTime = System.currentTimeMillis(); job.waitForCompletion(true); LOG.info("Job Finished in " + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds"); Counters counters = job.getCounters(); int numDocs = (int) counters.findCounter(Records.TOTAL).getValue(); LOG.info("Read " + numDocs + " records."); return 0; } /** * Dispatches command-line arguments to the tool via the ToolRunner. */ public static void main(String[] args) throws Exception { LOG.info("Running " + WacMapReduceArcDemo.class.getCanonicalName() + " with args " + Arrays.toString(args)); ToolRunner.run(new WacMapReduceArcDemo(), args); } } diff --git a/src/main/java/org/warcbase/demo/WacMapReduceHBaseDemo.java b/src/main/java/org/warcbase/demo/WacMapReduceHBaseDemo.java index a1f8978..6acc57e 100644 --- a/src/main/java/org/warcbase/demo/WacMapReduceHBaseDemo.java +++ b/src/main/java/org/warcbase/demo/WacMapReduceHBaseDemo.java @@ -1,171 +1,187 @@ +/* + * Warcbase: an open-source platform for managing web archives + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.warcbase.demo; import java.io.IOException; import java.util.Arrays; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.OptionBuilder; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.log4j.Logger; import org.archive.io.arc.ARCRecord; import org.warcbase.data.ArcRecordUtils; public class WacMapReduceHBaseDemo extends Configured implements Tool { private static final Logger LOG = Logger.getLogger(WacMapReduceHBaseDemo.class); private static enum Counts { ROWS, RECORDS }; private static class MyMapper extends TableMapper { private final Text keyOut = new Text(); private final Text valueOut = new Text(); @Override public void map(ImmutableBytesWritable row, Result result, Context context) throws IOException, InterruptedException { context.getCounter(Counts.ROWS).increment(1); // set KEY to row key (reversed URL) keyOut.set(row.get()); for (Cell cell : result.listCells()) { context.getCounter(Counts.RECORDS).increment(1); String mimeType = new String(CellUtil.cloneQualifier(cell)); ARCRecord record = ArcRecordUtils.fromBytes(CellUtil.cloneValue(cell)); byte[] body = ArcRecordUtils.getBodyContent(record); if (mimeType.startsWith("text")) { String content = new String(body, "UTF8").replaceFirst("\\s+", ""); String excerpt = content.substring(0, Math.min(100, content.length())).replaceAll("[\\n\\r]+", ""); valueOut.set(mimeType + "\t" + cell.getTimestamp() + "\n" + record.getHeaderString() + "\n" + excerpt + "...\n"); } else { valueOut.set(mimeType + "\t" + cell.getTimestamp() + "\n" + record.getHeaderString() + "\n"); } context.write(keyOut, valueOut); } } } public WacMapReduceHBaseDemo() {} public static final String INPUT_OPTION = "input"; public static final String OUTPUT_OPTION = "output"; /** * Runs this tool. */ @SuppressWarnings("static-access") public int run(String[] args) throws Exception { Options options = new Options(); options.addOption(OptionBuilder.withArgName("path").hasArg() .withDescription("input path").create(INPUT_OPTION)); options.addOption(OptionBuilder.withArgName("path").hasArg() .withDescription("output path").create(OUTPUT_OPTION)); CommandLine cmdline; CommandLineParser parser = new GnuParser(); try { cmdline = parser.parse(options, args); } catch (ParseException exp) { HelpFormatter formatter = new HelpFormatter(); formatter.printHelp(this.getClass().getName(), options); ToolRunner.printGenericCommandUsage(System.out); System.err.println("Error parsing command line: " + exp.getMessage()); return -1; } if (!cmdline.hasOption(INPUT_OPTION) || !cmdline.hasOption(OUTPUT_OPTION)) { HelpFormatter formatter = new HelpFormatter(); formatter.printHelp(this.getClass().getName(), options); ToolRunner.printGenericCommandUsage(System.out); return -1; } String input = cmdline.getOptionValue(INPUT_OPTION); Path output = new Path(cmdline.getOptionValue(OUTPUT_OPTION)); LOG.info("Tool name: " + WacMapReduceHBaseDemo.class.getSimpleName()); LOG.info(" - input: " + input); LOG.info(" - output: " + output); Configuration config = HBaseConfiguration.create(getConf()); // This should be fetched from external config files, // but not working due to weirdness in current config. config.set("hbase.zookeeper.quorum", "bespinrm.umiacs.umd.edu"); Job job = Job.getInstance(config, WacMapReduceHBaseDemo.class.getSimpleName() + ":" + input); job.setJarByClass(WacMapReduceHBaseDemo.class); Scan scan = new Scan(); scan.addFamily("c".getBytes()); // Very conservative settings because a single row might not fit in memory // if we have many captured version of a URL. scan.setCaching(1); // Controls the number of rows to pre-fetch scan.setBatch(10); // Controls the number of columns to fetch on a per row basis scan.setCacheBlocks(false); // Don't set to true for MR jobs scan.setMaxVersions(); // We want all versions TableMapReduceUtil.initTableMapperJob( input, // input HBase table name scan, // Scan instance to control CF and attribute selection MyMapper.class, // mapper Text.class, // mapper output key Text.class, // mapper output value job); job.setNumReduceTasks(0); FileOutputFormat.setOutputPath(job, output); job.setOutputFormatClass(TextOutputFormat.class); FileSystem fs = FileSystem.get(getConf()); if ( FileSystem.get(getConf()).exists(output)) { fs.delete(output, true); } long startTime = System.currentTimeMillis(); job.waitForCompletion(true); LOG.info("Job Finished in " + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds"); Counters counters = job.getCounters(); LOG.info("Read " + counters.findCounter(Counts.ROWS).getValue() + " rows."); LOG.info("Read " + counters.findCounter(Counts.RECORDS).getValue() + " records."); return 0; } /** * Dispatches command-line arguments to the tool via the ToolRunner. */ public static void main(String[] args) throws Exception { LOG.info("Running " + WacMapReduceHBaseDemo.class.getCanonicalName() + " with args " + Arrays.toString(args)); ToolRunner.run(new WacMapReduceHBaseDemo(), args); } } diff --git a/src/main/java/org/warcbase/demo/WacMapReduceHBaseWrapperDemo.java b/src/main/java/org/warcbase/demo/WacMapReduceHBaseWrapperDemo.java index 7ef2c99..5733cf8 100644 --- a/src/main/java/org/warcbase/demo/WacMapReduceHBaseWrapperDemo.java +++ b/src/main/java/org/warcbase/demo/WacMapReduceHBaseWrapperDemo.java @@ -1,138 +1,154 @@ +/* + * Warcbase: an open-source platform for managing web archives + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.warcbase.demo; import java.util.Arrays; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.OptionBuilder; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.log4j.Logger; import org.warcbase.io.ArcRecordWritable; import org.warcbase.mapreduce.lib.HBaseRowToArcRecordWritableMapper; import org.warcbase.mapreduce.lib.TableChainMapper; public class WacMapReduceHBaseWrapperDemo extends Configured implements Tool { private static final Logger LOG = Logger.getLogger(WacMapReduceHBaseDemo.class); public WacMapReduceHBaseWrapperDemo() {} public static final String INPUT_OPTION = "input"; public static final String OUTPUT_OPTION = "output"; /** * Runs this tool. */ @SuppressWarnings("static-access") public int run(String[] args) throws Exception { Options options = new Options(); options.addOption(OptionBuilder.withArgName("path") .hasArg().withDescription("input path").create(INPUT_OPTION)); options.addOption(OptionBuilder.withArgName("path") .hasArg().withDescription("output path").create(OUTPUT_OPTION)); CommandLine cmdline; CommandLineParser parser = new GnuParser(); try { cmdline = parser.parse(options, args); } catch (ParseException exp) { HelpFormatter formatter = new HelpFormatter(); formatter.printHelp(this.getClass().getName(), options); ToolRunner.printGenericCommandUsage(System.out); System.err.println("Error parsing command line: " + exp.getMessage()); return -1; } if (!cmdline.hasOption(INPUT_OPTION) || !cmdline.hasOption(OUTPUT_OPTION)) { HelpFormatter formatter = new HelpFormatter(); formatter.printHelp(this.getClass().getName(), options); ToolRunner.printGenericCommandUsage(System.out); return -1; } String input = cmdline.getOptionValue(INPUT_OPTION); Path output = new Path(cmdline.getOptionValue(OUTPUT_OPTION)); LOG.info("Tool name: " + WacMapReduceHBaseWrapperDemo.class.getSimpleName()); LOG.info(" - input: " + input); LOG.info(" - output: " + output); Configuration config = HBaseConfiguration.create(getConf()); // This should be fetched from external config files, // but not working due to weirdness in current config. config.set("hbase.zookeeper.quorum", "bespinrm.umiacs.umd.edu"); Job job = Job.getInstance(config, WacMapReduceHBaseWrapperDemo.class.getSimpleName() + ":" + input); job.setJarByClass(WacMapReduceHBaseWrapperDemo.class); Scan scan = new Scan(); scan.addFamily("c".getBytes()); // Very conservative settings because a single row might not fit in memory // if we have many captured version of a URL. scan.setCaching(1); // Controls the number of rows to pre-fetch scan.setBatch(10); // Controls the number of columns to fetch on a per row basis scan.setCacheBlocks(false); // Don't set to true for MR jobs scan.setMaxVersions(); // We want all versions TableMapReduceUtil.initTableMapperJob(input, // input HBase table name scan, // Scan instance to control CF and attribute selection TableChainMapper.class, // mapper Text.class, // mapper output key Text.class, // mapper output value job); TableChainMapper.addMapper(job, HBaseRowToArcRecordWritableMapper.class, ImmutableBytesWritable.class, Result.class, LongWritable.class, ArcRecordWritable.class, job.getConfiguration()); TableChainMapper.addMapper(job, WacMapReduceArcDemo.MyMapper.class, LongWritable.class, ArcRecordWritable.class, Text.class, Text.class, job.getConfiguration()); job.setNumReduceTasks(0); FileOutputFormat.setOutputPath(job, output); job.setOutputFormatClass(TextOutputFormat.class); FileSystem fs = FileSystem.get(getConf()); if (FileSystem.get(getConf()).exists(output)) { fs.delete(output, true); } long startTime = System.currentTimeMillis(); job.waitForCompletion(true); LOG.info("Job Finished in " + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds"); Counters counters = job.getCounters(); LOG.info("Read " + counters.findCounter(HBaseRowToArcRecordWritableMapper.Rows.TOTAL).getValue() + " rows."); LOG.info("Read " + counters.findCounter(WacMapReduceArcDemo.Records.TOTAL).getValue() + " records."); return 0; } /** * Dispatches command-line arguments to the tool via the ToolRunner. */ public static void main(String[] args) throws Exception { LOG.info("Running " + WacMapReduceHBaseWrapperDemo.class.getCanonicalName() + " with args " + Arrays.toString(args)); ToolRunner.run(new WacMapReduceHBaseWrapperDemo(), args); } } diff --git a/src/main/java/org/warcbase/index/IndexerMapper.java b/src/main/java/org/warcbase/index/IndexerMapper.java index 7b12d8a..e605813 100755 --- a/src/main/java/org/warcbase/index/IndexerMapper.java +++ b/src/main/java/org/warcbase/index/IndexerMapper.java @@ -1,113 +1,129 @@ +/* + * Warcbase: an open-source platform for managing web archives + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.warcbase.index; import java.io.IOException; import java.security.NoSuchAlgorithmException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; import org.archive.io.ArchiveRecord; import org.archive.io.ArchiveRecordHeader; import uk.bl.wa.hadoop.WritableArchiveRecord; import uk.bl.wa.hadoop.indexer.WritableSolrRecord; import uk.bl.wa.indexer.WARCIndexer; import uk.bl.wa.solr.SolrRecord; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; public class IndexerMapper extends MapReduceBase implements Mapper { public static final String NUM_SHARDS = "IndexerMapper.NumShards"; private static final Log LOG = LogFactory.getLog(IndexerMapper.class); static enum MyCounters { NUM_RECORDS, NUM_ERRORS, NUM_NULLS, NUM_EMPTY_HEADERS } private String mapTaskId; private String inputFile; private int numRecords = 0; private int numShards; private WARCIndexer indexer; @Override public void configure(JobConf job) { try { LOG.info("Configuring WARCIndexer."); Config config = ConfigFactory.parseString(job.get(IndexerRunner.CONFIG_PROPERTIES)); this.indexer = new WARCIndexer(config); numShards = job.getInt(NUM_SHARDS, 10); LOG.info("Number of shards: " + numShards); mapTaskId = job.get("mapred.task.id"); inputFile = job.get("map.input.file"); LOG.info("Got task.id " + mapTaskId + " and input.file " + inputFile); } catch (NoSuchAlgorithmException e) { LOG.error("IndexerMapper.configure(): " + e.getMessage()); } } @Override public void map(Text key, WritableArchiveRecord value, OutputCollector output, Reporter reporter) throws IOException { ArchiveRecordHeader header = value.getRecord().getHeader(); ArchiveRecord rec = value.getRecord(); SolrRecord solr = new SolrRecord(key.toString(), rec.getHeader()); numRecords++; try { if (!header.getHeaderFields().isEmpty()) { solr = indexer.extract(key.toString(), value.getRecord()); // If there is no result, report it. if (solr == null) { LOG.debug("WARCIndexer returned NULL for: " + header.getUrl()); reporter.incrCounter(MyCounters.NUM_NULLS, 1); return; } // Increment record counter. reporter.incrCounter(MyCounters.NUM_RECORDS, 1); } else { // Report headerless records. reporter.incrCounter(MyCounters.NUM_EMPTY_HEADERS, 1); } } catch (Exception e) { LOG.error(e.getClass().getName() + ": " + e.getMessage() + "; " + header.getUrl() + "; " + header.getOffset()); reporter.incrCounter(MyCounters.NUM_ERRORS, 1); solr.addParseException(e); } catch (OutOfMemoryError e) { LOG.error("OOME " + e.getClass().getName() + ": " + e.getMessage() + "; " + header.getUrl() + "; " + header.getOffset()); reporter.incrCounter(MyCounters.NUM_ERRORS, 1); solr.addParseException(e); } // Random shard assignment. IntWritable shard = new IntWritable((int) (Math.round(Math.random() * (numShards-1)))); // Wrap up and collect the result: WritableSolrRecord solrRecord = new WritableSolrRecord(solr); output.collect(shard, solrRecord); // Occasionally update application-level status. if ((numRecords % 1000) == 0) { reporter.setStatus(numRecords + " processed from " + inputFile); // Also assure framework that we are making progress. reporter.progress(); } } } diff --git a/src/main/java/org/warcbase/index/IndexerReducer.java b/src/main/java/org/warcbase/index/IndexerReducer.java index 2d6a525..6927575 100755 --- a/src/main/java/org/warcbase/index/IndexerReducer.java +++ b/src/main/java/org/warcbase/index/IndexerReducer.java @@ -1,196 +1,212 @@ +/* + * Warcbase: an open-source platform for managing web archives + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.warcbase.index; import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.solr.client.solrj.SolrServer; import org.apache.solr.client.solrj.embedded.EmbeddedSolrServer; import org.apache.solr.client.solrj.response.UpdateResponse; import org.apache.solr.common.SolrInputDocument; import org.apache.solr.core.CoreContainer; import org.apache.solr.core.HdfsDirectoryFactory; import uk.bl.wa.apache.solr.hadoop.Solate; import uk.bl.wa.hadoop.indexer.WritableSolrRecord; import uk.bl.wa.solr.SolrRecord; public class IndexerReducer extends MapReduceBase implements Reducer { public static final String HDFS_OUTPUT_PATH = "IndexerReducer.HDFSOutputPath"; private static final Log LOG = LogFactory.getLog(IndexerReducer.class); private static final int SUBMISSION_PAUSE_MINS = 5; private static final String SHARD_PREFIX = "shard"; private SolrServer solrServer; private int batchSize = 1000; private List docs = new ArrayList(); private int numberOfSequentialFails = 0; private FileSystem fs; private Path solrHome; private Path outputDir; static enum MyCounters { NUM_RECORDS, NUM_ERRORS, NUM_DROPPED_RECORDS } @Override public void configure(JobConf job) { LOG.info("Configuring reducer..."); // Initialize the embedded server. try { job.setBoolean("fs.hdfs.impl.disable.cache", true); fs = FileSystem.get(job); solrHome = Solate.findSolrConfig(job, IndexerRunner.solrHomeZipName); LOG.info("Found solrHomeDir " + solrHome); } catch (IOException e) { e.printStackTrace(); LOG.error("FAILED in reducer configuration: " + e); } outputDir = new Path(job.get(HDFS_OUTPUT_PATH)); LOG.info("HDFS index output path: " + outputDir); LOG.info("Initialization complete."); } private void initEmbeddedServer(int slice) throws IOException { if (solrHome == null) { throw new IOException("Unable to find solr home setting"); } Path outputShardDir = new Path(fs.getHomeDirectory() + "/" + outputDir, SHARD_PREFIX + slice); LOG.info("Creating embedded Solr server with solrHomeDir: " + solrHome + ", fs: " + fs + ", outputShardDir: " + outputShardDir); Path solrDataDir = new Path(outputShardDir, "data"); if (!fs.exists(solrDataDir) && !fs.mkdirs(solrDataDir)) { throw new IOException("Unable to create " + solrDataDir); } String dataDirStr = solrDataDir.toUri().toString(); LOG.info("Attempting to set data dir to: " + dataDirStr); System.setProperty("solr.data.dir", dataDirStr); System.setProperty("solr.home", solrHome.toString()); System.setProperty("solr.solr.home", solrHome.toString()); System.setProperty("solr.hdfs.home", outputDir.toString()); System.setProperty("solr.directoryFactory", HdfsDirectoryFactory.class.getName()); System.setProperty("solr.lock.type", "hdfs"); System.setProperty("solr.hdfs.nrtcachingdirectory", "false"); System.setProperty("solr.hdfs.blockcache.enabled", "true"); System.setProperty("solr.hdfs.blockcache.write.enabled", "false"); System.setProperty("solr.autoCommit.maxTime", "600000"); System.setProperty("solr.autoSoftCommit.maxTime", "-1"); LOG.info("Loading the container..."); CoreContainer container = new CoreContainer(); container.load(); for (String s : container.getAllCoreNames()) { LOG.warn("Got core name: " + s); } String coreName = ""; if (container.getCoreNames().size() > 0) { coreName = container.getCoreNames().iterator().next(); } LOG.error("Now firing up the server..."); solrServer = new EmbeddedSolrServer(container, coreName); LOG.error("Server started successfully!"); } @Override public void reduce(IntWritable key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { SolrRecord solr; // Get the shard number, but counting from 1 instead of 0: int shard = key.get() + 1; // For indexing into HDFS, set up a new server per key: initEmbeddedServer(shard); // Go through the documents for this shard: long cnt = 0; while (values.hasNext()) { solr = values.next().getSolrRecord(); cnt++; docs.add(solr.getSolrDocument()); // Have we exceeded the batchSize? checkSubmission(docs, batchSize, reporter); // Occasionally update application-level status: if ((cnt % 1000) == 0) { reporter.setStatus(SHARD_PREFIX + shard + ": processed " + cnt + ", dropped " + reporter.getCounter(MyCounters.NUM_DROPPED_RECORDS).getValue()); } } try { // If we have at least one document unsubmitted, make sure we submit it. checkSubmission(docs, 1, reporter); // If we are indexing to HDFS, shut the shard down: // Commit, and block until the changes have been flushed. solrServer.commit(true, false); solrServer.shutdown(); } catch (Exception e) { LOG.error("ERROR on commit: " + e); e.printStackTrace(); } } @Override public void close() {} private void checkSubmission(List docs, int limit, Reporter reporter) { if (docs.size() > 0 && docs.size() >= limit) { try { // Inform that there is progress (still-alive): reporter.progress(); UpdateResponse response = solrServer.add(docs); LOG.info("Submitted " + docs.size() + " docs [" + response.getStatus() + "]"); reporter.incrCounter(MyCounters.NUM_RECORDS, docs.size()); docs.clear(); numberOfSequentialFails = 0; } catch (Exception e) { // Count up repeated fails: numberOfSequentialFails++; // If there have been a lot of fails, drop the records (we have seen some // "Invalid UTF-8 character 0xfffe at char" so this avoids bad data blocking job completion) if (this.numberOfSequentialFails >= 3) { LOG.error("Submission has repeatedly failed - assuming bad data and dropping these " + docs.size() + " records."); reporter.incrCounter(MyCounters.NUM_DROPPED_RECORDS, docs.size()); docs.clear(); } // SOLR-5719 possibly hitting us here; CloudSolrServer.RouteException LOG.error("Sleeping for " + SUBMISSION_PAUSE_MINS + " minute(s): " + e.getMessage(), e); // Also add a report for this condition. reporter.incrCounter(MyCounters.NUM_ERRORS, 1); try { Thread.sleep(1000 * 60 * SUBMISSION_PAUSE_MINS); } catch (InterruptedException ex) { LOG.warn("Sleep between Solr submissions was interrupted!"); } } } } } diff --git a/src/main/java/org/warcbase/index/IndexerRunner.java b/src/main/java/org/warcbase/index/IndexerRunner.java index 60e51a9..e56fa0a 100755 --- a/src/main/java/org/warcbase/index/IndexerRunner.java +++ b/src/main/java/org/warcbase/index/IndexerRunner.java @@ -1,183 +1,199 @@ +/* + * Warcbase: an open-source platform for managing web archives + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.warcbase.index; import java.io.BufferedReader; import java.io.File; import java.io.FileReader; import java.io.IOException; import java.net.URI; import java.util.Arrays; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.OptionBuilder; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.lib.NullOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import uk.bl.wa.apache.solr.hadoop.Zipper; import uk.bl.wa.hadoop.ArchiveFileInputFormat; import uk.bl.wa.hadoop.indexer.WritableSolrRecord; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import com.typesafe.config.ConfigRenderOptions; @SuppressWarnings({ "deprecation" }) public class IndexerRunner extends Configured implements Tool { public static final String CONFIG_PROPERTIES = "IndexerRunner.Config"; private static final Log LOG = LogFactory.getLog(IndexerRunner.class); protected static String solrHomeZipName = "solr_home.zip"; public static final String INPUT_OPTION = "input"; public static final String INDEX_OPTION = "index"; public static final String CONFIG_OPTION = "config"; public static final String SHARDS_OPTION = "numShards"; @SuppressWarnings("static-access") public int run(String[] args) throws IOException, ParseException { LOG.info("Initializing indexer..."); Options options = new Options(); options.addOption(OptionBuilder.withArgName("file").hasArg() .withDescription("input file list").create(INPUT_OPTION)); options.addOption(OptionBuilder.withArgName("path").hasArg() .withDescription("HDFS index output path").create(INDEX_OPTION)); options.addOption(OptionBuilder.withArgName("num").hasArg() .withDescription("number of shards").create(SHARDS_OPTION)); options.addOption(OptionBuilder.withArgName("file").hasArg() .withDescription("config file (optional)").create(CONFIG_OPTION)); CommandLine cmdline; CommandLineParser parser = new GnuParser(); try { cmdline = parser.parse(options, args); } catch (ParseException exp) { HelpFormatter formatter = new HelpFormatter(); formatter.printHelp(this.getClass().getName(), options); ToolRunner.printGenericCommandUsage(System.out); System.err.println("Error parsing command line: " + exp.getMessage()); return -1; } if (!cmdline.hasOption(INPUT_OPTION) || !cmdline.hasOption(INDEX_OPTION) || !cmdline.hasOption(SHARDS_OPTION)) { HelpFormatter formatter = new HelpFormatter(); formatter.printHelp(this.getClass().getName(), options); ToolRunner.printGenericCommandUsage(System.out); return -1; } String configPath = null; if (cmdline.hasOption(CONFIG_OPTION)) { configPath = cmdline.getOptionValue(CONFIG_OPTION); } String inputPath = cmdline.getOptionValue(INPUT_OPTION); String outputPath = cmdline.getOptionValue(INDEX_OPTION); int shards = Integer.parseInt(cmdline.getOptionValue(SHARDS_OPTION)); JobConf conf = new JobConf(getConf(), IndexerRunner.class); if (configPath == null) { LOG.info("Config not specified, using default src/main/solr/WARCIndexer.conf"); configPath = "src/main/solr/WARCIndexer.conf"; } File configFile = new File(configPath); if (!configFile.exists()) { LOG.error("Error: config does not exist!"); System.exit(-1); } Config config = ConfigFactory.parseFile(configFile); conf.set(CONFIG_PROPERTIES, config.withOnlyPath("warc").root().render(ConfigRenderOptions.concise())); FileSystem fs = FileSystem.get(conf); LOG.info("HDFS index output path: " + outputPath); conf.set(IndexerReducer.HDFS_OUTPUT_PATH, outputPath); if (fs.exists(new Path(outputPath))) { LOG.error("Error: path exists already!"); System.exit(-1); } LOG.info("Number of shards: " + shards); conf.setInt(IndexerMapper.NUM_SHARDS, shards); // Add input paths: LOG.info("Reading input files..."); String line = null; BufferedReader br = new BufferedReader(new FileReader(inputPath)); while ((line = br.readLine()) != null) { FileInputFormat.addInputPath(conf, new Path(line)); } br.close(); LOG.info("Read " + FileInputFormat.getInputPaths(conf).length + " input files."); conf.setJobName(IndexerRunner.class.getSimpleName() + ": " + inputPath); conf.setInputFormat(ArchiveFileInputFormat.class); conf.setMapperClass(IndexerMapper.class); conf.setReducerClass(IndexerReducer.class); conf.setOutputFormat(NullOutputFormat.class); // Ensure the JARs we provide take precedence over ones from Hadoop: conf.setBoolean("mapreduce.job.user.classpath.first", true); // Also set reduce speculative execution off, avoiding duplicate submissions to Solr. conf.setBoolean("mapreduce.reduce.speculative", false); // Note that we need this to ensure FileSystem.get is thread-safe: // @see https://issues.apache.org/jira/browse/HDFS-925 // @see https://mail-archives.apache.org/mod_mbox/hadoop-user/201208.mbox/%3CCA+4kjVt-QE2L83p85uELjWXiog25bYTKOZXdc1Ahun+oBSJYpQ@mail.gmail.com%3E conf.setBoolean("fs.hdfs.impl.disable.cache", true); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(Text.class); conf.setMapOutputKeyClass(IntWritable.class); conf.setMapOutputValueClass(WritableSolrRecord.class); conf.setNumReduceTasks(shards); // number of reducers = number of shards cacheSolrHome(conf, solrHomeZipName); JobClient.runJob(conf); return 0; } private void cacheSolrHome(JobConf conf, String solrHomeZipName) throws IOException { File tmpSolrHomeDir = new File("src/main/solr").getAbsoluteFile(); // Create a ZIP file. File solrHomeLocalZip = File.createTempFile("tmp-", solrHomeZipName); Zipper.zipDir(tmpSolrHomeDir, solrHomeLocalZip); // Add to HDFS. FileSystem fs = FileSystem.get(conf); String hdfsSolrHomeDir = fs.getHomeDirectory() + "/solr/tempHome/" + solrHomeZipName; fs.copyFromLocalFile(new Path(solrHomeLocalZip.toString()), new Path(hdfsSolrHomeDir)); final URI baseZipUrl = fs.getUri().resolve(hdfsSolrHomeDir + '#' + solrHomeZipName); // Cache it. DistributedCache.addCacheArchive(baseZipUrl, conf); } public static void main(String[] args) throws Exception { LOG.info("Running " + IndexerRunner.class.getCanonicalName() + " with args " + Arrays.toString(args)); ToolRunner.run(new IndexerRunner(), args); } } diff --git a/src/main/java/org/warcbase/ingest/IngestFiles.java b/src/main/java/org/warcbase/ingest/IngestFiles.java index ede58a6..f6e25b0 100755 --- a/src/main/java/org/warcbase/ingest/IngestFiles.java +++ b/src/main/java/org/warcbase/ingest/IngestFiles.java @@ -1,335 +1,351 @@ +/* + * Warcbase: an open-source platform for managing web archives + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.warcbase.ingest; import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; import java.io.File; import java.io.IOException; import java.io.InputStream; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Iterator; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.OptionBuilder; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.log4j.Logger; import org.archive.io.ArchiveRecord; import org.archive.io.ArchiveRecordHeader; import org.archive.io.arc.ARCReader; import org.archive.io.arc.ARCReaderFactory; import org.archive.io.arc.ARCRecord; import org.archive.io.arc.ARCRecordMetaData; import org.archive.io.warc.WARCConstants; import org.archive.io.warc.WARCReader; import org.archive.io.warc.WARCReaderFactory; import org.archive.io.warc.WARCRecord; import org.archive.util.ArchiveUtils; import org.warcbase.data.HBaseTableManager; import org.warcbase.data.UrlUtils; import org.warcbase.data.WarcRecordUtils; public class IngestFiles { private static final String CREATE_OPTION = "create"; private static final String APPEND_OPTION = "append"; private static final String NAME_OPTION = "name"; private static final String DIR_OPTION = "dir"; private static final String START_OPTION = "start"; private static final String GZ_OPTION = "gz"; private static final String MAX_CONTENT_SIZE_OPTION = "maxSize"; private static final Logger LOG = Logger.getLogger(IngestFiles.class); private static final DateFormat iso8601 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssX"); public static int MAX_CONTENT_SIZE = 10 * 1024 * 1024; private int cnt = 0; private int errors = 0; private int toolarge = 0; private int invalidUrls = 0; private final HBaseTableManager hbaseManager; public IngestFiles(String name, boolean create, Compression.Algorithm compression) throws Exception { hbaseManager = new HBaseTableManager(name, create, compression); } protected final byte [] scratchbuffer = new byte[4 * 1024]; protected long copyStream(final InputStream is, final long recordLength, boolean enforceLength, final DataOutputStream out) throws IOException { int read = scratchbuffer.length; long tot = 0; while ((tot < recordLength) && (read = is.read(scratchbuffer)) != -1) { int write = read; // never write more than enforced length write = (int) Math.min(write, recordLength - tot); tot += read; out.write(scratchbuffer, 0, write); } if (enforceLength && tot != recordLength) { LOG.error("Read " + tot + " bytes but expected " + recordLength + " bytes. Continuing..."); } return tot; } private void ingestArcFile(File inputArcFile) { ARCReader reader = null; // Per file trapping of exceptions so a corrupt file doesn't blow up entire ingest. try { reader = ARCReaderFactory.get(inputArcFile); // The following snippet of code was adapted from the dump method in ARCReader. boolean firstRecord = true; for (Iterator ii = reader.iterator(); ii.hasNext();) { ARCRecord r = (ARCRecord) ii.next(); ARCRecordMetaData meta = r.getMetaData(); if (firstRecord) { firstRecord = false; while (r.available() > 0) { r.read(); } continue; } if (meta.getUrl().startsWith("dns:")) { invalidUrls++; continue; } String metaline = meta.getUrl() + " " + meta.getIp() + " " + meta.getDate() + " " + meta.getMimetype() + " " + (int) meta.getLength(); String date = meta.getDate(); ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream dout = new DataOutputStream(baos); dout.write(metaline.getBytes()); dout.write("\n".getBytes()); copyStream(r, (int) meta.getLength(), true, dout); String key = UrlUtils.urlToKey(meta.getUrl()); String type = meta.getMimetype(); if (key == null) { LOG.error("Invalid URL: " + meta.getUrl()); invalidUrls++; continue; } if (type == null) { type = "text/plain"; } if ((int) meta.getLength() > MAX_CONTENT_SIZE) { toolarge++; } else { if (hbaseManager.insertRecord(key, date, baos.toByteArray(), type)) { cnt++; } else { errors++; } } if (cnt % 10000 == 0 && cnt > 0) { LOG.info("Ingested " + cnt + " records into HBase."); } } } catch (Exception e) { LOG.error("Error ingesting file: " + inputArcFile); e.printStackTrace(); } catch (OutOfMemoryError e) { LOG.error("Encountered OutOfMemoryError ingesting file: " + inputArcFile); LOG.error("Attempting to continue..."); } finally { if (reader != null) try { reader.close(); } catch (IOException e) { e.printStackTrace(); } } } private void ingestWarcFile(File inputWarcFile) { WARCReader reader = null; // Per file trapping of exceptions so a corrupt file doesn't blow up entire ingest. try { reader = WARCReaderFactory.get(inputWarcFile); // The following snippet of code was adapted from the dump method in ARCReader. boolean firstRecord = true; for (Iterator ii = reader.iterator(); ii.hasNext();) { WARCRecord r = (WARCRecord) ii.next(); ArchiveRecordHeader h = r.getHeader(); byte[] recordBytes = WarcRecordUtils.toBytes(r); byte[] content = WarcRecordUtils.getContent(WarcRecordUtils.fromBytes(recordBytes)); if (firstRecord) { firstRecord = false; while (r.available() > 0) { r.read(); } continue; } // Only store WARC 'response' records // Would it be useful to store 'request' and 'metadata' records too? if (!h.getHeaderValue(WARCConstants.HEADER_KEY_TYPE).equals("response")) { continue; } if (h.getUrl().startsWith("dns:")) { invalidUrls++; continue; } Date d = iso8601.parse(h.getDate()); String date = ArchiveUtils.get14DigitDate(d); String key = UrlUtils.urlToKey(h.getUrl()); String type = WarcRecordUtils.getWarcResponseMimeType(content); if (key == null) { LOG.error("Invalid URL: " + h.getUrl()); invalidUrls++; continue; } if (type == null) { type = "text/plain"; } if ((int) h.getLength() > MAX_CONTENT_SIZE) { toolarge++; } else { if (hbaseManager.insertRecord(key, date, recordBytes, type)) { cnt++; } else { errors++; } } if (cnt % 10000 == 0 && cnt > 0) { LOG.info("Ingested " + cnt + " records into HBase."); } } } catch (Exception e) { LOG.error("Error ingesting file: " + inputWarcFile); e.printStackTrace(); } catch (OutOfMemoryError e) { LOG.error("Encountered OutOfMemoryError ingesting file: " + inputWarcFile); LOG.error("Attempting to continue..."); } finally { if (reader != null) try { reader.close(); } catch (IOException e) { e.printStackTrace(); } } } private void ingestFolder(File inputFolder, int i) throws Exception { long startTime = System.currentTimeMillis(); for (; i < inputFolder.listFiles().length; i++) { File inputFile = inputFolder.listFiles()[i]; if (!(inputFile.getName().endsWith(".warc.gz") || inputFile.getName().endsWith(".arc.gz") || inputFile.getName().endsWith(".warc") || inputFile.getName().endsWith(".arc"))) { continue; } LOG.info("processing file " + i + ": " + inputFile.getName()); if ( inputFile.getName().endsWith(".warc.gz") || inputFile.getName().endsWith(".warc") ) { ingestWarcFile(inputFile); } else if (inputFile.getName().endsWith(".arc.gz") || inputFile.getName().endsWith(".arc")) { ingestArcFile(inputFile); } } long totalTime = System.currentTimeMillis() - startTime; LOG.info("Total " + cnt + " records inserted, " + toolarge + " records too large, " + invalidUrls + " invalid URLs, " + errors + " insertion errors."); LOG.info("Total time: " + totalTime + "ms"); LOG.info("Ingest rate: " + cnt / (totalTime / 1000) + " records per second."); } @SuppressWarnings("static-access") public static void main(String[] args) throws Exception { Options options = new Options(); options.addOption(OptionBuilder.withArgName("name").hasArg() .withDescription("name of the archive").create(NAME_OPTION)); options.addOption(OptionBuilder.withArgName("path").hasArg() .withDescription("WARC files location").create(DIR_OPTION)); options.addOption(OptionBuilder.withArgName("n").hasArg() .withDescription("start from the n-th WARC file").create(START_OPTION)); options.addOption(OptionBuilder.withArgName("n").hasArg() .withDescription("max record size to insert (default = 10 MB)").create(MAX_CONTENT_SIZE_OPTION)); options.addOption("create", false, "create new table"); options.addOption("append", false, "append to existing table"); options.addOption(GZ_OPTION, false, "use GZ compression (default = Snappy)"); CommandLine cmdline = null; CommandLineParser parser = new GnuParser(); try { cmdline = parser.parse(options, args); } catch (ParseException exp) { System.err.println("Error parsing command line: " + exp.getMessage()); System.exit(-1); } if (!cmdline.hasOption(DIR_OPTION) || !cmdline.hasOption(NAME_OPTION)) { HelpFormatter formatter = new HelpFormatter(); formatter.printHelp(IngestFiles.class.getCanonicalName(), options); System.exit(-1); } if (!cmdline.hasOption(CREATE_OPTION) && !cmdline.hasOption(APPEND_OPTION)) { System.err.println(String.format("Must specify either -%s or -%s", CREATE_OPTION, APPEND_OPTION)); HelpFormatter formatter = new HelpFormatter(); formatter.printHelp(IngestFiles.class.getCanonicalName(), options); System.exit(-1); } String path = cmdline.getOptionValue(DIR_OPTION); File inputFolder = new File(path); Compression.Algorithm compression = Compression.Algorithm.SNAPPY; if (cmdline.hasOption(GZ_OPTION)) { compression = Compression.Algorithm.GZ; } int i = 0; if (cmdline.hasOption(START_OPTION)) { i = Integer.parseInt(cmdline.getOptionValue(START_OPTION)); } if (cmdline.hasOption(MAX_CONTENT_SIZE_OPTION)) { IngestFiles.MAX_CONTENT_SIZE = Integer.parseInt(cmdline.getOptionValue(MAX_CONTENT_SIZE_OPTION)); } String name = cmdline.getOptionValue(NAME_OPTION); boolean create = cmdline.hasOption(CREATE_OPTION); LOG.info("Input: " + inputFolder); LOG.info("Table: " + name); LOG.info("Create new table: " + create); LOG.info("Compression: " + compression); LOG.info("Max content size: " + IngestFiles.MAX_CONTENT_SIZE); IngestFiles load = new IngestFiles(name, create, compression); load.ingestFolder(inputFolder, i); } } diff --git a/src/main/java/org/warcbase/ingest/SearchForUrl.java b/src/main/java/org/warcbase/ingest/SearchForUrl.java index f44444b..d788874 100755 --- a/src/main/java/org/warcbase/ingest/SearchForUrl.java +++ b/src/main/java/org/warcbase/ingest/SearchForUrl.java @@ -1,180 +1,196 @@ +/* + * Warcbase: an open-source platform for managing web archives + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.warcbase.ingest; import java.io.BufferedInputStream; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.util.zip.GZIPInputStream; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.OptionBuilder; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; import org.apache.commons.io.IOUtils; import org.apache.log4j.Logger; import org.jwat.arc.ArcReader; import org.jwat.arc.ArcReaderFactory; import org.jwat.arc.ArcRecordBase; import org.jwat.common.ByteCountingPushBackInputStream; import org.jwat.common.HttpHeader; import org.jwat.common.Payload; import org.jwat.common.UriProfile; import org.jwat.warc.WarcReader; import org.jwat.warc.WarcReaderFactory; import org.jwat.warc.WarcRecord; import org.warcbase.data.UrlUtils; public class SearchForUrl { private static final String DIR_OPTION = "dir"; private static final String URI_OPTION = "uri"; private static final Logger LOG = Logger.getLogger(SearchForUrl.class); private static final UriProfile uriProfile = UriProfile.RFC3986_ABS_16BIT_LAX; private static final boolean bBlockDigestEnabled = true; private static final boolean bPayloadDigestEnabled = true; private static final int recordHeaderMaxSize = 8192; private static final int payloadHeaderMaxSize = 32768; private void searchArcFile(File inputArcFile, String uri) throws IOException { ArcRecordBase record = null; String url = null; String date = null; byte[] content = null; String type = null; String key = null; InputStream in = new FileInputStream(inputArcFile); ArcReader reader = ArcReaderFactory.getReader(in); while ((record = reader.getNextRecord()) != null) { url = record.getUrlStr(); date = record.getArchiveDateStr(); type = record.getContentTypeStr(); content = IOUtils.toByteArray(record.getPayloadContent()); key = UrlUtils.urlToKey(url); if (uri.equals(url)) { System.out.println("-----------------------------------"); System.out.println("Found at " + inputArcFile.getName()); System.out.println(date); System.out.println(type); System.out.println(content.length); System.out.println(key); } } reader.close(); in.close(); } private void searchWarcFile(File inputWarcFile, String uri) throws FileNotFoundException, IOException { WarcRecord warcRecord = null; String url = null; String date = null; String type = null; byte[] content = null; String key = null; GZIPInputStream gzInputStream = new GZIPInputStream(new FileInputStream(inputWarcFile)); ByteCountingPushBackInputStream pbin = new ByteCountingPushBackInputStream( new BufferedInputStream(gzInputStream, 8192), 32); WarcReader warcReader = WarcReaderFactory.getReaderUncompressed(pbin); if (warcReader == null) { LOG.info("Can't read warc file " + inputWarcFile.getName()); return; } warcReader.setWarcTargetUriProfile(uriProfile); warcReader.setBlockDigestEnabled(bBlockDigestEnabled); warcReader.setPayloadDigestEnabled(bPayloadDigestEnabled); warcReader.setRecordHeaderMaxSize(recordHeaderMaxSize); warcReader.setPayloadHeaderMaxSize(payloadHeaderMaxSize); while ((warcRecord = warcReader.getNextRecord()) != null) { url = warcRecord.header.warcTargetUriStr; key = UrlUtils.urlToKey(url); Payload payload = warcRecord.getPayload(); HttpHeader httpHeader = null; InputStream payloadStream = null; if (payload == null) { continue; } httpHeader = warcRecord.getHttpHeader(); if (httpHeader != null) { payloadStream = httpHeader.getPayloadInputStream(); type = httpHeader.contentType; } else { payloadStream = payload.getInputStreamComplete(); } if (payloadStream == null) { continue; } date = warcRecord.header.warcDateStr; content = IOUtils.toByteArray(payloadStream); if (uri.equals(url)) { System.out.println("-----------------------------------"); System.out.println("Found at " + inputWarcFile.getName()); System.out.println(date); System.out.println(type); System.out.println(content.length); System.out.println(key); } } } private void searchFolder(File inputFolder, String uri) throws IOException { long startTime = System.currentTimeMillis(); GZIPInputStream gzInputStream = null; for (int i = 0; i < inputFolder.listFiles().length; i++) { File inputFile = inputFolder.listFiles()[i]; LOG.info("processing file " + i + ": " + inputFile.getName()); if (inputFile.toString().toLowerCase().endsWith(".gz")) { gzInputStream = new GZIPInputStream(new FileInputStream(inputFile)); ByteCountingPushBackInputStream in = new ByteCountingPushBackInputStream(gzInputStream, 32); if (ArcReaderFactory.isArcFile(in)) { searchArcFile(inputFile, uri); } else if (WarcReaderFactory.isWarcFile(in)) { searchWarcFile(inputFile, uri); } } } long totalTime = System.currentTimeMillis() - startTime; LOG.info("Total time: " + totalTime + "ms"); } @SuppressWarnings("static-access") public static void main(String[] args) throws IOException { Options options = new Options(); options.addOption(OptionBuilder.withArgName("dir").hasArg() .withDescription("WARC files location").create(DIR_OPTION)); options.addOption(OptionBuilder.withArgName("uri").hasArg() .withDescription("search for URI in warc files").create(URI_OPTION)); CommandLine cmdline = null; CommandLineParser parser = new GnuParser(); try { cmdline = parser.parse(options, args); } catch (ParseException exp) { System.err.println("Error parsing command line: " + exp.getMessage()); System.exit(-1); } if (!cmdline.hasOption(DIR_OPTION) || !cmdline.hasOption(URI_OPTION)) { HelpFormatter formatter = new HelpFormatter(); formatter.printHelp(IngestFiles.class.getCanonicalName(), options); System.exit(-1); } String path = cmdline.getOptionValue(DIR_OPTION); String uri = cmdline.getOptionValue(URI_OPTION); File inputFolder = new File(path); SearchForUrl search = new SearchForUrl(); search.searchFolder(inputFolder, uri); } } diff --git a/src/main/java/org/warcbase/io/ArcRecordWritable.java b/src/main/java/org/warcbase/io/ArcRecordWritable.java index 51d47d1..97cbac6 100644 --- a/src/main/java/org/warcbase/io/ArcRecordWritable.java +++ b/src/main/java/org/warcbase/io/ArcRecordWritable.java @@ -1,52 +1,68 @@ +/* + * Warcbase: an open-source platform for managing web archives + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.warcbase.io; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; import org.archive.io.arc.ARCRecord; import org.warcbase.data.ArcRecordUtils; public class ArcRecordWritable implements Writable { private ARCRecord record = null; public ArcRecordWritable() {} public ArcRecordWritable(ARCRecord r) { this.record = r; } public void setRecord(ARCRecord r) { this.record = r; } public ARCRecord getRecord() { return record; } @Override public void readFields(DataInput in) throws IOException { int len = in.readInt(); if (len == 0) { this.record = null; return; } byte[] bytes = new byte[len]; in.readFully(bytes); this.record = ArcRecordUtils.fromBytes(bytes); } @Override public void write(DataOutput out) throws IOException { if (record == null) { out.writeInt(0); } byte[] bytes = ArcRecordUtils.toBytes(record); out.writeInt(bytes.length); out.write(bytes); } } diff --git a/src/main/java/org/warcbase/io/WarcRecordWritable.java b/src/main/java/org/warcbase/io/WarcRecordWritable.java index 6449e08..e823402 100644 --- a/src/main/java/org/warcbase/io/WarcRecordWritable.java +++ b/src/main/java/org/warcbase/io/WarcRecordWritable.java @@ -1,52 +1,68 @@ +/* + * Warcbase: an open-source platform for managing web archives + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.warcbase.io; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; import org.archive.io.warc.WARCRecord; import org.warcbase.data.WarcRecordUtils; public class WarcRecordWritable implements Writable { private WARCRecord record = null; public WarcRecordWritable() {} public WarcRecordWritable(WARCRecord r) { this.record = r; } public void setRecord(WARCRecord r) { this.record = r; } public WARCRecord getRecord() { return record; } @Override public void readFields(DataInput in) throws IOException { int len = in.readInt(); if (len == 0) { this.record = null; return; } byte[] bytes = new byte[len]; in.readFully(bytes); this.record = WarcRecordUtils.fromBytes(bytes); } @Override public void write(DataOutput out) throws IOException { if (record == null) { out.writeInt(0); } byte[] bytes = WarcRecordUtils.toBytes(record); out.writeInt(bytes.length); out.write(bytes); } } diff --git a/src/main/java/org/warcbase/mapreduce/WacArcInputFormat.java b/src/main/java/org/warcbase/mapreduce/WacArcInputFormat.java index 7d98355..c8cb4b0 100644 --- a/src/main/java/org/warcbase/mapreduce/WacArcInputFormat.java +++ b/src/main/java/org/warcbase/mapreduce/WacArcInputFormat.java @@ -1,130 +1,146 @@ +/* + * Warcbase: an open-source platform for managing web archives + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.warcbase.mapreduce; import java.io.BufferedInputStream; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Seekable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.archive.io.ArchiveRecord; import org.archive.io.arc.ARCReader; import org.archive.io.arc.ARCReaderFactory; import org.archive.io.arc.ARCReaderFactory.CompressedARCReader; import org.archive.io.arc.ARCRecord; import org.warcbase.io.ArcRecordWritable; public class WacArcInputFormat extends FileInputFormat { @Override public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { return new ArcRecordReader(); } @Override protected boolean isSplitable(JobContext context, Path filename) { return false; } public class ArcRecordReader extends RecordReader { private ARCReader reader; private long start; private long pos; private long end; private LongWritable key = null; private ArcRecordWritable value = null; private Seekable filePosition; private Iterator iter; @Override public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException { FileSplit split = (FileSplit) genericSplit; Configuration job = context.getConfiguration(); start = split.getStart(); end = start + split.getLength(); final Path file = split.getPath(); FileSystem fs = file.getFileSystem(job); FSDataInputStream fileIn = fs.open(split.getPath()); reader = (ARCReader) ARCReaderFactory.get(split.getPath().toString(), new BufferedInputStream(fileIn), true); iter = reader.iterator(); //reader = (ARCReader) ARCReaderFactory.get(split.getPath().toString(), fileIn, true); this.pos = start; } private boolean isCompressedInput() { return reader instanceof CompressedARCReader; } private long getFilePosition() throws IOException { long retVal; if (isCompressedInput() && null != filePosition) { retVal = filePosition.getPos(); } else { retVal = pos; } return retVal; } @Override public boolean nextKeyValue() throws IOException { if (!iter.hasNext()) { return false; } if (key == null) { key = new LongWritable(); } key.set(pos); ARCRecord record = (ARCRecord) iter.next(); if (record == null) { return false; } if (value == null) { value = new ArcRecordWritable(); } value.setRecord(record); return true; } @Override public LongWritable getCurrentKey() { return key; } @Override public ArcRecordWritable getCurrentValue() { return value; } @Override public float getProgress() throws IOException { if (start == end) { return 0.0f; } else { return Math.min(1.0f, (getFilePosition() - start) / (float) (end - start)); } } @Override public synchronized void close() throws IOException { reader.close(); } } } diff --git a/src/main/java/org/warcbase/mapreduce/WacWarcInputFormat.java b/src/main/java/org/warcbase/mapreduce/WacWarcInputFormat.java index b745dcb..6810e68 100644 --- a/src/main/java/org/warcbase/mapreduce/WacWarcInputFormat.java +++ b/src/main/java/org/warcbase/mapreduce/WacWarcInputFormat.java @@ -1,130 +1,146 @@ +/* + * Warcbase: an open-source platform for managing web archives + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.warcbase.mapreduce; import java.io.BufferedInputStream; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Seekable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.archive.io.ArchiveRecord; import org.archive.io.warc.WARCReader; import org.archive.io.warc.WARCReaderFactory; import org.archive.io.warc.WARCReaderFactory.CompressedWARCReader; import org.archive.io.warc.WARCRecord; import org.warcbase.io.WarcRecordWritable; public class WacWarcInputFormat extends FileInputFormat { @Override public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { return new WarcRecordReader(); } @Override protected boolean isSplitable(JobContext context, Path filename) { return false; } public class WarcRecordReader extends RecordReader { private WARCReader reader; private long start; private long pos; private long end; private LongWritable key = null; private WarcRecordWritable value = null; private Seekable filePosition; private Iterator iter; @Override public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException { FileSplit split = (FileSplit) genericSplit; Configuration job = context.getConfiguration(); start = split.getStart(); end = start + split.getLength(); final Path file = split.getPath(); FileSystem fs = file.getFileSystem(job); FSDataInputStream fileIn = fs.open(split.getPath()); reader = (WARCReader) WARCReaderFactory.get(split.getPath().toString(), new BufferedInputStream(fileIn), true); iter = reader.iterator(); //reader = (ARCReader) ARCReaderFactory.get(split.getPath().toString(), fileIn, true); this.pos = start; } private boolean isCompressedInput() { return reader instanceof CompressedWARCReader; } private long getFilePosition() throws IOException { long retVal; if (isCompressedInput() && null != filePosition) { retVal = filePosition.getPos(); } else { retVal = pos; } return retVal; } @Override public boolean nextKeyValue() throws IOException { if (!iter.hasNext()) { return false; } if (key == null) { key = new LongWritable(); } key.set(pos); WARCRecord record = (WARCRecord) iter.next(); if (record == null) { return false; } if (value == null) { value = new WarcRecordWritable(); } value.setRecord(record); return true; } @Override public LongWritable getCurrentKey() { return key; } @Override public WarcRecordWritable getCurrentValue() { return value; } @Override public float getProgress() throws IOException { if (start == end) { return 0.0f; } else { return Math.min(1.0f, (getFilePosition() - start) / (float) (end - start)); } } @Override public synchronized void close() throws IOException { reader.close(); } } } diff --git a/src/main/java/org/warcbase/mapreduce/lib/Chain.java b/src/main/java/org/warcbase/mapreduce/lib/Chain.java index 9d11c10..2a73b33 100644 --- a/src/main/java/org/warcbase/mapreduce/lib/Chain.java +++ b/src/main/java/org/warcbase/mapreduce/lib/Chain.java @@ -1,673 +1,689 @@ +/* + * Warcbase: an open-source platform for managing web archives + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + /** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.warcbase.mapreduce.lib; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.DefaultStringifier; import org.apache.hadoop.io.Stringifier; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.MapContext; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskInputOutputContext; import org.apache.hadoop.mapreduce.lib.map.WrappedMapper; import org.apache.hadoop.util.ReflectionUtils; /** * The Chain class provides all the common functionality for the * {@link TableChainMapper} and the {@link ChainReducer} classes. */ @InterfaceAudience.Private @InterfaceStability.Unstable public class Chain { protected static final String CHAIN_MAPPER = "mapreduce.chain.mapper"; protected static final String CHAIN_MAPPER_SIZE = ".size"; protected static final String CHAIN_MAPPER_CLASS = ".mapper.class."; protected static final String CHAIN_MAPPER_CONFIG = ".mapper.config."; protected static final String MAPPER_INPUT_KEY_CLASS = "mapreduce.chain.mapper.input.key.class"; protected static final String MAPPER_INPUT_VALUE_CLASS = "mapreduce.chain.mapper.input.value.class"; protected static final String MAPPER_OUTPUT_KEY_CLASS = "mapreduce.chain.mapper.output.key.class"; protected static final String MAPPER_OUTPUT_VALUE_CLASS = "mapreduce.chain.mapper.output.value.class"; @SuppressWarnings("unchecked") private List mappers = new ArrayList(); private List confList = new ArrayList(); private Configuration rConf; private List threads = new ArrayList(); private List> blockingQueues = new ArrayList>(); private Throwable throwable = null; /** * Creates a Chain instance configured for a Mapper or a Reducer. */ protected Chain() { } static class KeyValuePair { K key; V value; boolean endOfInput; KeyValuePair(K key, V value) { this.key = key; this.value = value; this.endOfInput = false; } KeyValuePair(boolean eof) { this.key = null; this.value = null; this.endOfInput = eof; } } // ChainRecordReader either reads from blocking queue or task context. private static class ChainRecordReader extends RecordReader { private Class keyClass; private Class valueClass; private KEYIN key; private VALUEIN value; private Configuration conf; TaskInputOutputContext inputContext = null; ChainBlockingQueue> inputQueue = null; // constructor to read from a blocking queue ChainRecordReader(Class keyClass, Class valueClass, ChainBlockingQueue> inputQueue, Configuration conf) { this.keyClass = keyClass; this.valueClass = valueClass; this.inputQueue = inputQueue; this.conf = conf; } // constructor to read from the context ChainRecordReader(TaskInputOutputContext context) { inputContext = context; } public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { } /** * Advance to the next key, value pair, returning null if at end. * * @return the key object that was read into, or null if no more */ public boolean nextKeyValue() throws IOException, InterruptedException { if (inputQueue != null) { return readFromQueue(); } else if (inputContext.nextKeyValue()) { this.key = inputContext.getCurrentKey(); this.value = inputContext.getCurrentValue(); return true; } else { return false; } } @SuppressWarnings("unchecked") private boolean readFromQueue() throws IOException, InterruptedException { KeyValuePair kv = null; // wait for input on queue kv = inputQueue.dequeue(); if (kv.endOfInput) { return false; } key = (KEYIN) ReflectionUtils.newInstance(keyClass, conf); value = (VALUEIN) ReflectionUtils.newInstance(valueClass, conf); ReflectionUtils.copy(conf, kv.key, this.key); ReflectionUtils.copy(conf, kv.value, this.value); return true; } /** * Get the current key. * * @return the current key object or null if there isn't one * @throws IOException * @throws InterruptedException */ public KEYIN getCurrentKey() throws IOException, InterruptedException { return this.key; } /** * Get the current value. * * @return the value object that was read into * @throws IOException * @throws InterruptedException */ public VALUEIN getCurrentValue() throws IOException, InterruptedException { return this.value; } @Override public void close() throws IOException { } @Override public float getProgress() throws IOException, InterruptedException { return 0; } } // ChainRecordWriter either writes to blocking queue or task context private static class ChainRecordWriter extends RecordWriter { TaskInputOutputContext outputContext = null; ChainBlockingQueue> outputQueue = null; KEYOUT keyout; VALUEOUT valueout; Configuration conf; Class keyClass; Class valueClass; // constructor to write to context ChainRecordWriter(TaskInputOutputContext context) { outputContext = context; } // constructor to write to blocking queue ChainRecordWriter(Class keyClass, Class valueClass, ChainBlockingQueue> output, Configuration conf) { this.keyClass = keyClass; this.valueClass = valueClass; this.outputQueue = output; this.conf = conf; } /** * Writes a key/value pair. * * @param key * the key to write. * @param value * the value to write. * @throws IOException */ public void write(KEYOUT key, VALUEOUT value) throws IOException, InterruptedException { if (outputQueue != null) { writeToQueue(key, value); } else { outputContext.write(key, value); } } @SuppressWarnings("unchecked") private void writeToQueue(KEYOUT key, VALUEOUT value) throws IOException, InterruptedException { this.keyout = (KEYOUT) ReflectionUtils.newInstance(keyClass, conf); this.valueout = (VALUEOUT) ReflectionUtils.newInstance(valueClass, conf); ReflectionUtils.copy(conf, key, this.keyout); ReflectionUtils.copy(conf, value, this.valueout); // wait to write output to queuue outputQueue.enqueue(new KeyValuePair(keyout, valueout)); } /** * Close this RecordWriter to future operations. * * @param context * the context of the task * @throws IOException */ public void close(TaskAttemptContext context) throws IOException, InterruptedException { if (outputQueue != null) { // write end of input outputQueue.enqueue(new KeyValuePair(true)); } } } private synchronized Throwable getThrowable() { return throwable; } private synchronized boolean setIfUnsetThrowable(Throwable th) { if (throwable == null) { throwable = th; return true; } return false; } private class MapRunner extends Thread { private Mapper mapper; private Mapper.Context chainContext; private RecordReader rr; private RecordWriter rw; public MapRunner(Mapper mapper, Mapper.Context mapperContext, RecordReader rr, RecordWriter rw) throws IOException, InterruptedException { this.mapper = mapper; this.rr = rr; this.rw = rw; this.chainContext = mapperContext; } @Override public void run() { if (getThrowable() != null) { return; } try { mapper.run(chainContext); rr.close(); rw.close(chainContext); } catch (Throwable th) { if (setIfUnsetThrowable(th)) { interruptAllThreads(); } } } } Configuration getConf(int index) { return confList.get(index); } /** * Create a map context that is based on ChainMapContext and the given record * reader and record writer */ private Mapper.Context createMapContext( RecordReader rr, RecordWriter rw, TaskInputOutputContext context, Configuration conf) { MapContext mapContext = new ChainMapContextImpl( context, rr, rw, conf); Mapper.Context mapperContext = new WrappedMapper() .getMapContext(mapContext); return mapperContext; } @SuppressWarnings("unchecked") void runMapper(TaskInputOutputContext context, int index) throws IOException, InterruptedException { Mapper mapper = mappers.get(index); RecordReader rr = new ChainRecordReader(context); RecordWriter rw = new ChainRecordWriter(context); Mapper.Context mapperContext = createMapContext(rr, rw, context, getConf(index)); mapper.run(mapperContext); rr.close(); rw.close(context); } /** * Add mapper(the first mapper) that reads input from the input * context and writes to queue */ @SuppressWarnings("unchecked") void addMapper(TaskInputOutputContext inputContext, ChainBlockingQueue> output, int index) throws IOException, InterruptedException { Configuration conf = getConf(index); Class keyOutClass = conf.getClass(MAPPER_OUTPUT_KEY_CLASS, Object.class); Class valueOutClass = conf.getClass(MAPPER_OUTPUT_VALUE_CLASS, Object.class); RecordReader rr = new ChainRecordReader(inputContext); RecordWriter rw = new ChainRecordWriter(keyOutClass, valueOutClass, output, conf); Mapper.Context mapperContext = createMapContext(rr, rw, (MapContext) inputContext, getConf(index)); MapRunner runner = new MapRunner(mappers.get(index), mapperContext, rr, rw); threads.add(runner); } /** * Add mapper(the last mapper) that reads input from * queue and writes output to the output context */ @SuppressWarnings("unchecked") void addMapper(ChainBlockingQueue> input, TaskInputOutputContext outputContext, int index) throws IOException, InterruptedException { Configuration conf = getConf(index); Class keyClass = conf.getClass(MAPPER_INPUT_KEY_CLASS, Object.class); Class valueClass = conf.getClass(MAPPER_INPUT_VALUE_CLASS, Object.class); RecordReader rr = new ChainRecordReader(keyClass, valueClass, input, conf); RecordWriter rw = new ChainRecordWriter(outputContext); MapRunner runner = new MapRunner(mappers.get(index), createMapContext(rr, rw, outputContext, getConf(index)), rr, rw); threads.add(runner); } /** * Add mapper that reads and writes from/to the queue */ @SuppressWarnings("unchecked") void addMapper(ChainBlockingQueue> input, ChainBlockingQueue> output, TaskInputOutputContext context, int index) throws IOException, InterruptedException { Configuration conf = getConf(index); Class keyClass = conf.getClass(MAPPER_INPUT_KEY_CLASS, Object.class); Class valueClass = conf.getClass(MAPPER_INPUT_VALUE_CLASS, Object.class); Class keyOutClass = conf.getClass(MAPPER_OUTPUT_KEY_CLASS, Object.class); Class valueOutClass = conf.getClass(MAPPER_OUTPUT_VALUE_CLASS, Object.class); RecordReader rr = new ChainRecordReader(keyClass, valueClass, input, conf); RecordWriter rw = new ChainRecordWriter(keyOutClass, valueOutClass, output, conf); MapRunner runner = new MapRunner(mappers.get(index), createMapContext(rr, rw, context, getConf(index)), rr, rw); threads.add(runner); } // start all the threads void startAllThreads() { for (Thread thread : threads) { thread.start(); } } // wait till all threads finish void joinAllThreads() throws IOException, InterruptedException { for (Thread thread : threads) { thread.join(); } Throwable th = getThrowable(); if (th != null) { if (th instanceof IOException) { throw (IOException) th; } else if (th instanceof InterruptedException) { throw (InterruptedException) th; } else { throw new RuntimeException(th); } } } // interrupt all threads private synchronized void interruptAllThreads() { for (Thread th : threads) { th.interrupt(); } for (ChainBlockingQueue queue : blockingQueues) { queue.interrupt(); } } protected static String getPrefix() { return CHAIN_MAPPER; } protected static int getIndex(Configuration conf, String prefix) { return conf.getInt(prefix + CHAIN_MAPPER_SIZE, 0); } /** * Creates a {@link Configuration} for the Map or Reduce in the chain. * *

* It creates a new Configuration using the chain job's Configuration as base * and adds to it the configuration properties for the chain element. The keys * of the chain element Configuration have precedence over the given * Configuration. *

* * @param jobConf * the chain job's Configuration. * @param confKey * the key for chain element configuration serialized in the chain * job's Configuration. * @return a new Configuration aggregating the chain job's Configuration with * the chain element configuration properties. */ protected static Configuration getChainElementConf(Configuration jobConf, String confKey) { Configuration conf = null; try { Stringifier stringifier = new DefaultStringifier(jobConf, Configuration.class); String confString = jobConf.get(confKey, null); if (confString != null) { conf = stringifier.fromString(jobConf.get(confKey, null)); } } catch (IOException ioex) { throw new RuntimeException(ioex); } // we have to do this because the Writable desearialization clears all // values set in the conf making not possible do a // new Configuration(jobConf) in the creation of the conf above jobConf = new Configuration(jobConf); if (conf != null) { for (Map.Entry entry : conf) { jobConf.set(entry.getKey(), entry.getValue()); } } return jobConf; } /** * Adds a Mapper class to the chain job. * *

* The configuration properties of the chain job have precedence over the * configuration properties of the Mapper. * * @param isMap * indicates if the Chain is for a Mapper or for a Reducer. * @param job * chain job. * @param klass * the Mapper class to add. * @param inputKeyClass * mapper input key class. * @param inputValueClass * mapper input value class. * @param outputKeyClass * mapper output key class. * @param outputValueClass * mapper output value class. * @param mapperConf * a configuration for the Mapper class. It is recommended to use a * Configuration without default values using the * Configuration(boolean loadDefaults) constructor with * FALSE. */ @SuppressWarnings("unchecked") protected static void addMapper(Job job, Class klass, Class inputKeyClass, Class inputValueClass, Class outputKeyClass, Class outputValueClass, Configuration mapperConf) { String prefix = getPrefix(); Configuration jobConf = job.getConfiguration(); // set the mapper class int index = getIndex(jobConf, prefix); jobConf.setClass(prefix + CHAIN_MAPPER_CLASS + index, klass, Mapper.class); validateKeyValueTypes(jobConf, inputKeyClass, inputValueClass, outputKeyClass, outputValueClass, index, prefix); setMapperConf(jobConf, inputKeyClass, inputValueClass, outputKeyClass, outputValueClass, mapperConf, index, prefix); } protected static void validateKeyValueTypes( Configuration jobConf, Class inputKeyClass, Class inputValueClass, Class outputKeyClass, Class outputValueClass, int index, String prefix) { if (index > 0) { // check the that the new Mapper in the chain key and value input classes // match those of the previous Mapper output. Configuration previousMapperConf = getChainElementConf(jobConf, prefix + CHAIN_MAPPER_CONFIG + (index - 1)); if (!inputKeyClass.isAssignableFrom(previousMapperConf.getClass( MAPPER_OUTPUT_KEY_CLASS, null))) { throw new IllegalArgumentException("The specified Mapper input key class does" + " not match the previous Mapper's output key class."); } if (!inputValueClass.isAssignableFrom(previousMapperConf.getClass( MAPPER_OUTPUT_VALUE_CLASS, null))) { throw new IllegalArgumentException("The specified Mapper input value class" + " does not match the previous Mapper's output value class."); } } } protected static void setMapperConf(Configuration jobConf, Class inputKeyClass, Class inputValueClass, Class outputKeyClass, Class outputValueClass, Configuration mapperConf, int index, String prefix) { // if the Mapper does not have a configuration, create an empty one if (mapperConf == null) { // using a Configuration without defaults to make it lightweight. // still the chain's conf may have all defaults and this conf is // overlapped to the chain configuration one. mapperConf = new Configuration(true); } // store the input/output classes of the mapper in the mapper conf mapperConf.setClass(MAPPER_INPUT_KEY_CLASS, inputKeyClass, Object.class); mapperConf .setClass(MAPPER_INPUT_VALUE_CLASS, inputValueClass, Object.class); mapperConf.setClass(MAPPER_OUTPUT_KEY_CLASS, outputKeyClass, Object.class); mapperConf.setClass(MAPPER_OUTPUT_VALUE_CLASS, outputValueClass, Object.class); // serialize the mapper configuration in the chain configuration. Stringifier stringifier = new DefaultStringifier(jobConf, Configuration.class); try { jobConf.set(prefix + CHAIN_MAPPER_CONFIG + index, stringifier .toString(new Configuration(mapperConf))); } catch (IOException ioEx) { throw new RuntimeException(ioEx); } // increment the chain counter jobConf.setInt(prefix + CHAIN_MAPPER_SIZE, index + 1); } /** * Setup the chain. * * @param jobConf * chain job's {@link Configuration}. */ @SuppressWarnings("unchecked") void setup(Configuration jobConf) { String prefix = getPrefix(); int index = jobConf.getInt(prefix + CHAIN_MAPPER_SIZE, 0); for (int i = 0; i < index; i++) { Class klass = jobConf.getClass(prefix + CHAIN_MAPPER_CLASS + i, null, Mapper.class); Configuration mConf = getChainElementConf(jobConf, prefix + CHAIN_MAPPER_CONFIG + i); confList.add(mConf); Mapper mapper = ReflectionUtils.newInstance(klass, mConf); mappers.add(mapper); } } @SuppressWarnings("unchecked") List getAllMappers() { return mappers; } /** * Creates a ChainBlockingQueue with KeyValuePair as element * * @return the ChainBlockingQueue */ ChainBlockingQueue> createBlockingQueue() { return new ChainBlockingQueue>(); } /** * A blocking queue with one element. * * @param */ class ChainBlockingQueue { E element = null; boolean isInterrupted = false; ChainBlockingQueue() { blockingQueues.add(this); } synchronized void enqueue(E e) throws InterruptedException { while (element != null) { if (isInterrupted) { throw new InterruptedException(); } this.wait(); } element = e; this.notify(); } synchronized E dequeue() throws InterruptedException { while (element == null) { if (isInterrupted) { throw new InterruptedException(); } this.wait(); } E e = element; element = null; this.notify(); return e; } synchronized void interrupt() { isInterrupted = true; this.notifyAll(); } } } diff --git a/src/main/java/org/warcbase/mapreduce/lib/ChainMapContextImpl.java b/src/main/java/org/warcbase/mapreduce/lib/ChainMapContextImpl.java index 1e32ec4..6f07da3 100644 --- a/src/main/java/org/warcbase/mapreduce/lib/ChainMapContextImpl.java +++ b/src/main/java/org/warcbase/mapreduce/lib/ChainMapContextImpl.java @@ -1,333 +1,349 @@ +/* + * Warcbase: an open-source platform for managing web archives + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + /** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.warcbase.mapreduce.lib; import java.io.IOException; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration.IntegerRanges; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.MapContext; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskInputOutputContext; import org.apache.hadoop.security.Credentials; /** * A simple wrapper class that delegates most of its functionality to the * underlying context, but overrides the methods to do with record readers , * record writers and configuration. */ class ChainMapContextImpl implements MapContext { private RecordReader reader; private RecordWriter output; private TaskInputOutputContext base; private Configuration conf; ChainMapContextImpl( TaskInputOutputContext base, RecordReader rr, RecordWriter rw, Configuration conf) { this.reader = rr; this.output = rw; this.base = base; this.conf = conf; } @Override public KEYIN getCurrentKey() throws IOException, InterruptedException { return reader.getCurrentKey(); } @Override public VALUEIN getCurrentValue() throws IOException, InterruptedException { return reader.getCurrentValue(); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { return reader.nextKeyValue(); } @Override public InputSplit getInputSplit() { if (base instanceof MapContext) { MapContext mc = (MapContext) base; return mc.getInputSplit(); } else { return null; } } @Override public Counter getCounter(Enum counterName) { return base.getCounter(counterName); } @Override public Counter getCounter(String groupName, String counterName) { return base.getCounter(groupName, counterName); } @Override public OutputCommitter getOutputCommitter() { return base.getOutputCommitter(); } @Override public void write(KEYOUT key, VALUEOUT value) throws IOException, InterruptedException { output.write(key, value); } @Override public String getStatus() { return base.getStatus(); } @Override public TaskAttemptID getTaskAttemptID() { return base.getTaskAttemptID(); } @Override public void setStatus(String msg) { base.setStatus(msg); } @Override public Path[] getArchiveClassPaths() { return base.getArchiveClassPaths(); } @Override public String[] getArchiveTimestamps() { return base.getArchiveTimestamps(); } @Override public URI[] getCacheArchives() throws IOException { return base.getCacheArchives(); } @Override public URI[] getCacheFiles() throws IOException { return base.getCacheFiles(); } @Override public Class> getCombinerClass() throws ClassNotFoundException { return base.getCombinerClass(); } @Override public Configuration getConfiguration() { return conf; } @Override public Path[] getFileClassPaths() { return base.getFileClassPaths(); } @Override public String[] getFileTimestamps() { return base.getFileTimestamps(); } @Override public RawComparator getCombinerKeyGroupingComparator() { return base.getCombinerKeyGroupingComparator(); } @Override public RawComparator getGroupingComparator() { return base.getGroupingComparator(); } @Override public Class> getInputFormatClass() throws ClassNotFoundException { return base.getInputFormatClass(); } @Override public String getJar() { return base.getJar(); } @Override public JobID getJobID() { return base.getJobID(); } @Override public String getJobName() { return base.getJobName(); } @Override public boolean userClassesTakesPrecedence() { return base.userClassesTakesPrecedence(); } @Override public boolean getJobSetupCleanupNeeded() { return base.getJobSetupCleanupNeeded(); } @Override public boolean getTaskCleanupNeeded() { return base.getTaskCleanupNeeded(); } @Override public Path[] getLocalCacheArchives() throws IOException { return base.getLocalCacheArchives(); } @Override public Path[] getLocalCacheFiles() throws IOException { return base.getLocalCacheArchives(); } @Override public Class getMapOutputKeyClass() { return base.getMapOutputKeyClass(); } @Override public Class getMapOutputValueClass() { return base.getMapOutputValueClass(); } @Override public Class> getMapperClass() throws ClassNotFoundException { return base.getMapperClass(); } @Override public int getMaxMapAttempts() { return base.getMaxMapAttempts(); } @Override public int getMaxReduceAttempts() { return base.getMaxReduceAttempts(); } @Override public int getNumReduceTasks() { return base.getNumReduceTasks(); } @Override public Class> getOutputFormatClass() throws ClassNotFoundException { return base.getOutputFormatClass(); } @Override public Class getOutputKeyClass() { return base.getMapOutputKeyClass(); } @Override public Class getOutputValueClass() { return base.getOutputValueClass(); } @Override public Class> getPartitionerClass() throws ClassNotFoundException { return base.getPartitionerClass(); } @Override public boolean getProfileEnabled() { return base.getProfileEnabled(); } @Override public String getProfileParams() { return base.getProfileParams(); } @Override public IntegerRanges getProfileTaskRange(boolean isMap) { return base.getProfileTaskRange(isMap); } @Override public Class> getReducerClass() throws ClassNotFoundException { return base.getReducerClass(); } @Override public RawComparator getSortComparator() { return base.getSortComparator(); } @Override public boolean getSymlink() { return base.getSymlink(); } @Override public String getUser() { return base.getUser(); } @Override public Path getWorkingDirectory() throws IOException { return base.getWorkingDirectory(); } @Override public void progress() { base.progress(); } @Override public Credentials getCredentials() { return base.getCredentials(); } @Override public float getProgress() { return base.getProgress(); } } diff --git a/src/main/java/org/warcbase/mapreduce/lib/HBaseRowToArcRecordWritableMapper.java b/src/main/java/org/warcbase/mapreduce/lib/HBaseRowToArcRecordWritableMapper.java index a53e124..acbf21a 100644 --- a/src/main/java/org/warcbase/mapreduce/lib/HBaseRowToArcRecordWritableMapper.java +++ b/src/main/java/org/warcbase/mapreduce/lib/HBaseRowToArcRecordWritableMapper.java @@ -1,31 +1,47 @@ +/* + * Warcbase: an open-source platform for managing web archives + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.warcbase.mapreduce.lib; import java.io.IOException; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.Mapper; import org.warcbase.data.ArcRecordUtils; import org.warcbase.io.ArcRecordWritable; public class HBaseRowToArcRecordWritableMapper extends Mapper { public static enum Rows { TOTAL }; private final LongWritable keyOut = new LongWritable(); private final ArcRecordWritable valueOut = new ArcRecordWritable(); @Override public void map(ImmutableBytesWritable row, Result result, Context context) throws IOException, InterruptedException { context.getCounter(Rows.TOTAL).increment(1); for (Cell kv : result.listCells()) { valueOut.setRecord(ArcRecordUtils.fromBytes(CellUtil.cloneValue(kv))); context.write(keyOut, valueOut); } } } \ No newline at end of file diff --git a/src/main/java/org/warcbase/mapreduce/lib/TableChainMapper.java b/src/main/java/org/warcbase/mapreduce/lib/TableChainMapper.java index 52daeda..bd1784e 100644 --- a/src/main/java/org/warcbase/mapreduce/lib/TableChainMapper.java +++ b/src/main/java/org/warcbase/mapreduce/lib/TableChainMapper.java @@ -1,172 +1,188 @@ +/* + * Warcbase: an open-source platform for managing web archives + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + /** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.warcbase.mapreduce.lib; import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.warcbase.mapreduce.lib.Chain.ChainBlockingQueue; /** * The ChainMapper class allows to use multiple Mapper classes within a single * Map task. * *

* The Mapper classes are invoked in a chained (or piped) fashion, the output of * the first becomes the input of the second, and so on until the last Mapper, * the output of the last Mapper will be written to the task's output. *

*

* The key functionality of this feature is that the Mappers in the chain do not * need to be aware that they are executed in a chain. This enables having * reusable specialized Mappers that can be combined to perform composite * operations within a single task. *

*

* Special care has to be taken when creating chains that the key/values output * by a Mapper are valid for the following Mapper in the chain. It is assumed * all Mappers and the Reduce in the chain use matching output and input key and * value classes as no conversion is done by the chaining code. *

*

* Using the ChainMapper and the ChainReducer classes is possible to compose * Map/Reduce jobs that look like [MAP+ / REDUCE MAP*]. And * immediate benefit of this pattern is a dramatic reduction in disk IO. *

*

* IMPORTANT: There is no need to specify the output key/value classes for the * ChainMapper, this is done by the addMapper for the last mapper in the chain. *

* ChainMapper usage pattern: *

* *

  * ...
  * Job = new Job(conf);
  * 

* Configuration mapAConf = new Configuration(false); * ... * ChainMapper.addMapper(job, AMap.class, LongWritable.class, Text.class, * Text.class, Text.class, true, mapAConf); *

* Configuration mapBConf = new Configuration(false); * ... * ChainMapper.addMapper(job, BMap.class, Text.class, Text.class, * LongWritable.class, Text.class, false, mapBConf); *

* ... *

* job.waitForComplettion(true); * ... *

*/ @InterfaceAudience.Public @InterfaceStability.Stable public class TableChainMapper extends TableMapper { /** * Adds a {@link Mapper} class to the chain mapper. * *

* The key and values are passed from one element of the chain to the next, by * value. For the added Mapper the configuration given for it, * mapperConf, have precedence over the job's Configuration. This * precedence is in effect when the task is running. *

*

* IMPORTANT: There is no need to specify the output key/value classes for the * ChainMapper, this is done by the addMapper for the last mapper in the chain *

* * @param job * The job. * @param klass * the Mapper class to add. * @param inputKeyClass * mapper input key class. * @param inputValueClass * mapper input value class. * @param outputKeyClass * mapper output key class. * @param outputValueClass * mapper output value class. * @param mapperConf * a configuration for the Mapper class. It is recommended to use a * Configuration without default values using the * Configuration(boolean loadDefaults) constructor with * FALSE. */ public static void addMapper(Job job, Class klass, Class inputKeyClass, Class inputValueClass, Class outputKeyClass, Class outputValueClass, Configuration mapperConf) throws IOException { job.setMapperClass(TableChainMapper.class); job.setMapOutputKeyClass(outputKeyClass); job.setMapOutputValueClass(outputValueClass); Chain.addMapper(job, klass, inputKeyClass, inputValueClass, outputKeyClass, outputValueClass, mapperConf); } private Chain chain; protected void setup(Context context) { chain = new Chain(); chain.setup(context.getConfiguration()); } public void run(Context context) throws IOException, InterruptedException { setup(context); int numMappers = chain.getAllMappers().size(); if (numMappers == 0) { return; } ChainBlockingQueue> inputqueue; ChainBlockingQueue> outputqueue; if (numMappers == 1) { chain.runMapper(context, 0); } else { // add all the mappers with proper context // add first mapper outputqueue = chain.createBlockingQueue(); chain.addMapper(context, outputqueue, 0); // add other mappers for (int i = 1; i < numMappers - 1; i++) { inputqueue = outputqueue; outputqueue = chain.createBlockingQueue(); chain.addMapper(inputqueue, outputqueue, context, i); } // add last mapper chain.addMapper(outputqueue, context, numMappers - 1); } // start all threads chain.startAllThreads(); // wait for all threads chain.joinAllThreads(); } } diff --git a/src/main/java/org/warcbase/wayback/WarcbaseResourceIndex.java b/src/main/java/org/warcbase/wayback/WarcbaseResourceIndex.java index 6855a44..3821cfd 100644 --- a/src/main/java/org/warcbase/wayback/WarcbaseResourceIndex.java +++ b/src/main/java/org/warcbase/wayback/WarcbaseResourceIndex.java @@ -1,273 +1,289 @@ +/* + * Warcbase: an open-source platform for managing web archives + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.warcbase.wayback; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.net.MalformedURLException; import java.net.URL; import java.net.URLConnection; import java.text.ParseException; import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; import java.util.List; import org.archive.util.ArchiveUtils; import org.archive.util.io.RuntimeIOException; import org.archive.util.iterator.CloseableIterator; import org.archive.wayback.UrlCanonicalizer; import org.archive.wayback.core.CaptureSearchResult; import org.archive.wayback.core.CaptureSearchResults; import org.archive.wayback.core.SearchResult; import org.archive.wayback.core.WaybackRequest; import org.archive.wayback.exception.AccessControlException; import org.archive.wayback.exception.BadQueryException; import org.archive.wayback.exception.ResourceIndexNotAvailableException; import org.archive.wayback.exception.ResourceNotInArchiveException; import org.archive.wayback.resourceindex.LocalResourceIndex; import org.archive.wayback.resourceindex.filterfactory.AccessPointCaptureFilterGroupFactory; import org.archive.wayback.resourceindex.filterfactory.AnnotatingCaptureFilterGroupFactory; import org.archive.wayback.resourceindex.filterfactory.CaptureFilterGroup; import org.archive.wayback.resourceindex.filterfactory.ClosestTrackingCaptureFilterGroupFactory; import org.archive.wayback.resourceindex.filterfactory.CoreCaptureFilterGroupFactory; import org.archive.wayback.resourceindex.filterfactory.ExclusionCaptureFilterGroupFactory; import org.archive.wayback.resourceindex.filterfactory.FilterGroupFactory; import org.archive.wayback.resourceindex.filterfactory.QueryCaptureFilterGroupFactory; import org.archive.wayback.resourceindex.filterfactory.WindowFilterGroup; import org.archive.wayback.util.ObjectFilter; import org.archive.wayback.util.ObjectFilterChain; import org.archive.wayback.util.ObjectFilterIterator; import org.archive.wayback.util.url.AggressiveUrlCanonicalizer; public class WarcbaseResourceIndex extends LocalResourceIndex { /** * maximum number of records to return */ private final static int MAX_RECORDS = 1000; private int maxRecords = MAX_RECORDS; // Set from bean. private UrlCanonicalizer canonicalizer = null; private String host; private int port; private String table; private ObjectFilter annotater = null; private ObjectFilter filter = null; protected List fgFactories = null; public WarcbaseResourceIndex() { canonicalizer = new AggressiveUrlCanonicalizer(); fgFactories = new ArrayList(); fgFactories.add(new AccessPointCaptureFilterGroupFactory()); fgFactories.add(new CoreCaptureFilterGroupFactory()); fgFactories.add(new QueryCaptureFilterGroupFactory()); fgFactories.add(new AnnotatingCaptureFilterGroupFactory()); fgFactories.add(new ExclusionCaptureFilterGroupFactory()); fgFactories.add(new ClosestTrackingCaptureFilterGroupFactory()); } private void cleanupIterator(CloseableIterator itr) throws ResourceIndexNotAvailableException { try { itr.close(); } catch (IOException e) { e.printStackTrace(); throw new ResourceIndexNotAvailableException(e.getLocalizedMessage()); } } protected List getRequestFilterGroups(WaybackRequest r) throws BadQueryException { ArrayList groups = new ArrayList(); for (FilterGroupFactory f : fgFactories) { groups.add(f.getGroup(r, canonicalizer, this)); } return groups; } public CaptureSearchResults doCaptureQuery(WaybackRequest wbRequest, int type) throws ResourceIndexNotAvailableException, ResourceNotInArchiveException, BadQueryException, AccessControlException { wbRequest.setResultsPerPage(100); String urlKey; try { urlKey = canonicalizer.urlStringToKey(wbRequest.getRequestUrl()); } catch (IOException e) { throw new BadQueryException("Bad URL(" + wbRequest.getRequestUrl() + ")"); } CaptureSearchResults results = new CaptureSearchResults(); ObjectFilterChain filters = new ObjectFilterChain(); WindowFilterGroup window = new WindowFilterGroup(wbRequest, this); List groups = getRequestFilterGroups(wbRequest); if (filter != null) { filters.addFilter(filter); } for (CaptureFilterGroup cfg : groups) { filters.addFilters(cfg.getFilters()); } filters.addFilters(window.getFilters()); CloseableIterator itr = null; try { itr = new ObjectFilterIterator( getIterator(wbRequest.getRequestUrl(), urlKey), filters); while (itr.hasNext()) { results.addSearchResult(itr.next()); } } catch (RuntimeIOException e) { e.printStackTrace(); } finally { if (itr != null) { cleanupIterator(itr); } } for (CaptureFilterGroup cfg : groups) { cfg.annotateResults(results); } window.annotateResults(results); return results; } public CloseableIterator getIterator(final String url, final String urlKey) throws ResourceNotInArchiveException, ResourceIndexNotAvailableException { final String resourceUrl = "http://" + host + ":" + port + "/" + table + "/*/" + url; List lines = null; try { byte[] bytes = fetchUrl(new URL(resourceUrl)); if (bytes.length == 0) { throw new ResourceNotInArchiveException("No entries found in: " + resourceUrl); } lines = Arrays.asList(new String(bytes).split("\n")); } catch (MalformedURLException e) { throw new ResourceIndexNotAvailableException("Error contacting REST API: " + resourceUrl); } catch (IOException e) { throw new ResourceIndexNotAvailableException("Error contacting REST API: " + resourceUrl); } final Iterator it = lines.iterator(); return new CloseableIterator() { @Override public boolean hasNext() { return it.hasNext(); } @Override public CaptureSearchResult next() { String line = it.next(); String[] splits = line.split("\\s+"); CaptureSearchResult r = new CaptureSearchResult(); try { r.setCaptureDate(ArchiveUtils.parse14DigitDate(splits[0])); } catch (ParseException e) { e.printStackTrace(); } r.setOriginalUrl(url); r.setUrlKey(urlKey); // doesn't matter, or we get NPE r.setMimeType(splits[1]); r.setFile("foo"); // needed, or otherwise we'll get a NPE in CalendarResults.jsp r.setRedirectUrl("-"); r.setHttpCode("200"); r.setOffset(0); return r; } @Override public void remove() {} @Override public void close() throws IOException {} }; } private static byte[] fetchUrl(URL url) throws IOException { URLConnection connection = url.openConnection(); InputStream in = connection.getInputStream(); int contentLength = connection.getContentLength(); ByteArrayOutputStream tmpOut; if (contentLength != -1) { tmpOut = new ByteArrayOutputStream(contentLength); } else { tmpOut = new ByteArrayOutputStream(16384); } byte[] buf = new byte[512]; while (true) { int len = in.read(buf); if (len == -1) { break; } tmpOut.write(buf, 0, len); } in.close(); tmpOut.close(); // Does nothing, but good hygiene return tmpOut.toByteArray(); } public void setMaxRecords(int maxRecords) { this.maxRecords = maxRecords; } public int getMaxRecords() { return maxRecords; } public UrlCanonicalizer getCanonicalizer() { return canonicalizer; } public void setCanonicalizer(UrlCanonicalizer canonicalizer) { this.canonicalizer = canonicalizer; } public ObjectFilter getAnnotater() { return annotater; } public void setAnnotater(ObjectFilter annotater) { this.annotater = annotater; } public ObjectFilter getFilter() { return filter; } public void setFilter(ObjectFilter filter) { this.filter = filter; } public String getHost() { return host; } public void setHost(String host) { this.host = host; } public int getPort() { return port; } public void setPort(int port) { this.port = port; } public String getTable() { return table; } public void setTable(String table) { this.table = table; } } diff --git a/src/main/java/org/warcbase/wayback/WarcbaseResourceStore.java b/src/main/java/org/warcbase/wayback/WarcbaseResourceStore.java index 5f37211..fd2dec4 100644 --- a/src/main/java/org/warcbase/wayback/WarcbaseResourceStore.java +++ b/src/main/java/org/warcbase/wayback/WarcbaseResourceStore.java @@ -1,87 +1,103 @@ +/* + * Warcbase: an open-source platform for managing web archives + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.warcbase.wayback; import java.io.IOException; import java.io.PushbackInputStream; import java.net.URL; import java.util.logging.Logger; import org.archive.io.arc.ARCReader; import org.archive.io.arc.ARCReaderFactory; import org.archive.io.warc.WARCReader; import org.archive.io.warc.WARCReaderFactory; import org.archive.util.ArchiveUtils; import org.archive.wayback.ResourceStore; import org.archive.wayback.core.CaptureSearchResult; import org.archive.wayback.core.Resource; import org.archive.wayback.exception.ResourceNotAvailableException; import org.archive.wayback.resourcestore.resourcefile.ResourceFactory; public class WarcbaseResourceStore implements ResourceStore { private static final Logger LOGGER = Logger.getLogger(WarcbaseResourceStore.class.getName()); // Set from bean. private String host; private int port; private String table; @Override public Resource retrieveResource(CaptureSearchResult result) throws ResourceNotAvailableException { Resource r = null; String resourceUrl = "http://" + host + ":" + port + "/" + table + "/" + ArchiveUtils.get14DigitDate(result.getCaptureDate()) + "/" + result.getOriginalUrl(); LOGGER.info("Fetching resource url: " + resourceUrl); try { // Read first 4 bytes of input stream to detect archive format; push back into stream for re-use PushbackInputStream pb = new PushbackInputStream(new URL(resourceUrl).openStream(), 4); byte[] signature = new byte[4]; pb.read(signature, 0, 4); pb.unread(signature); if ((new String(signature)).equals("WARC")) { WARCReader reader = (WARCReader) WARCReaderFactory.get(resourceUrl.toString(), pb, false); r = ResourceFactory.WARCArchiveRecordToResource(reader.get(), reader); } else { // Assume ARC format if not WARC ARCReader reader = (ARCReader) ARCReaderFactory.get(resourceUrl.toString(), pb, false); r = ResourceFactory.ARCArchiveRecordToResource(reader.get(), reader); } } catch (IOException e) { throw new ResourceNotAvailableException("Error reading " + resourceUrl); } if (r == null) { throw new ResourceNotAvailableException("Unable to find: " + result.toString()); } return r; } @Override public void shutdown() throws IOException {} public String getHost() { return host; } public void setHost(String host) { this.host = host; } public int getPort() { return port; } public void setPort(int port) { this.port = port; } public String getTable() { return table; } public void setTable(String table) { this.table = table; } } diff --git a/src/main/scala/org/warcbase/spark/matchbox/DetectLanguage.scala b/src/main/scala/org/warcbase/spark/matchbox/DetectLanguage.scala index 02280ad..b7a48b7 100644 --- a/src/main/scala/org/warcbase/spark/matchbox/DetectLanguage.scala +++ b/src/main/scala/org/warcbase/spark/matchbox/DetectLanguage.scala @@ -1,10 +1,26 @@ +/* + * Warcbase: an open-source platform for managing web archives + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.warcbase.spark.matchbox import org.apache.tika.language.LanguageIdentifier object DetectLanguage { def apply(input: String): String = { if (input.isEmpty) "" else new LanguageIdentifier(input).getLanguage } } \ No newline at end of file diff --git a/src/main/scala/org/warcbase/spark/matchbox/DetectMimeTypeTika.scala b/src/main/scala/org/warcbase/spark/matchbox/DetectMimeTypeTika.scala index ebe56a7..31090eb 100644 --- a/src/main/scala/org/warcbase/spark/matchbox/DetectMimeTypeTika.scala +++ b/src/main/scala/org/warcbase/spark/matchbox/DetectMimeTypeTika.scala @@ -1,23 +1,39 @@ +/* + * Warcbase: an open-source platform for managing web archives + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.warcbase.spark.matchbox import java.io.ByteArrayInputStream import org.apache.tika.Tika import org.apache.tika.detect.DefaultDetector import org.apache.tika.parser.AutoDetectParser /** * A UDF to detect mime types */ object DetectMimeTypeTika { def apply(content: String): String = { if (content.isEmpty) "N/A" else { val is = new ByteArrayInputStream(content.getBytes) val detector = new DefaultDetector() val parser = new AutoDetectParser(detector) val mimetype = new Tika(detector, parser).detect(is) mimetype } } } diff --git a/src/main/scala/org/warcbase/spark/matchbox/ExtractBoilerpipeText.scala b/src/main/scala/org/warcbase/spark/matchbox/ExtractBoilerpipeText.scala index 0e66ca9..c889d49 100644 --- a/src/main/scala/org/warcbase/spark/matchbox/ExtractBoilerpipeText.scala +++ b/src/main/scala/org/warcbase/spark/matchbox/ExtractBoilerpipeText.scala @@ -1,25 +1,41 @@ +/* + * Warcbase: an open-source platform for managing web archives + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.warcbase.spark.matchbox import java.io.IOException import de.l3s.boilerpipe.extractors.DefaultExtractor /** * UDF for extracting raw text content from an HTML page, minus "boilerplate" * content (using boilerpipe). */ object ExtractBoilerpipeText { def apply(input: String) = { if (input.isEmpty) Nil else try { val text = DefaultExtractor.INSTANCE.getText(input).replaceAll("[\\r\\n]+", " ").trim() if (text.isEmpty) Nil else text } catch { case e: Exception => throw new IOException("Caught exception processing input row " + e) } } } diff --git a/src/main/scala/org/warcbase/spark/matchbox/ExtractEntities.scala b/src/main/scala/org/warcbase/spark/matchbox/ExtractEntities.scala index 79ce8d0..adf328d 100644 --- a/src/main/scala/org/warcbase/spark/matchbox/ExtractEntities.scala +++ b/src/main/scala/org/warcbase/spark/matchbox/ExtractEntities.scala @@ -1,53 +1,69 @@ +/* + * Warcbase: an open-source platform for managing web archives + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.warcbase.spark.matchbox import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD /** * Extracts entities */ object ExtractEntities { /** * @param iNerClassifierFile path of classifier file * @param inputRecordFile path of ARC or WARC file from which to extract entities * @param outputFile path of output directory */ def extractFromRecords(iNerClassifierFile: String, inputRecordFile: String, outputFile: String, sc: SparkContext): RDD[(String, String, String)] = { val rdd = RecordLoader.loadArc(inputRecordFile, sc) .map(r => (r.getCrawldate, r.getUrl, r.getRawBodyContent)) extractAndOutput(iNerClassifierFile, rdd, outputFile) } /** * @param iNerClassifierFile path of classifier file * @param inputFile path of file with tuples (date: String, url: String, content: String) * from which to extract entities * @param outputFile path of output directory */ def extractFromScrapeText(iNerClassifierFile: String, inputFile: String, outputFile: String, sc: SparkContext): RDD[(String, String, String)] = { val rdd = sc.textFile(inputFile) .map(line => { val ind1 = line.indexOf(",") val ind2 = line.indexOf(",", ind1 + 1) (line.substring(1, ind1), line.substring(ind1 + 1, ind2), line.substring(ind2 + 1, line.length - 1)) }) extractAndOutput(iNerClassifierFile, rdd, outputFile) } /** * @param iNerClassifierFile path of classifier file * @param rdd with values (date, url, content) * @param outputFile path of output directory */ def extractAndOutput(iNerClassifierFile: String, rdd: RDD[(String, String, String)], outputFile: String): RDD[(String, String, String)] = { val r = rdd.mapPartitions(iter => { NER3Classifier.apply(iNerClassifierFile) iter.map(r => (r._1, r._2, NER3Classifier.classify(r._3))) }) r.saveAsTextFile(outputFile) r } } diff --git a/src/main/scala/org/warcbase/spark/matchbox/ExtractLinks.scala b/src/main/scala/org/warcbase/spark/matchbox/ExtractLinks.scala index 5f75beb..0480cb3 100644 --- a/src/main/scala/org/warcbase/spark/matchbox/ExtractLinks.scala +++ b/src/main/scala/org/warcbase/spark/matchbox/ExtractLinks.scala @@ -1,43 +1,59 @@ +/* + * Warcbase: an open-source platform for managing web archives + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.warcbase.spark.matchbox import java.io.IOException import org.jsoup.Jsoup import org.jsoup.select.Elements import scala.collection.mutable /** * UDF for extracting links from a webpage given the HTML content (using Jsoup). * */ object ExtractLinks { /** * @param src the src link. * @param html the content from which links are to be extracted. * @param base an optional base domain. * * Returns a sequence of (source, target, anchortext) */ def apply(src: String, html: String, base: String = ""): Seq[(String, String, String)] = { if (html.isEmpty) return Nil try { val output = mutable.MutableList[(String, String, String)]() val doc = Jsoup.parse(html) val links: Elements = doc.select("a[href]") val it = links.iterator() while (it.hasNext) { val link = it.next() if (base.nonEmpty) link.setBaseUri(base) val target = link.attr("abs:href") if (target.nonEmpty) { output += ((src, target, link.text)) } } output } catch { case e: Exception => throw new IOException("Caught exception processing input row ", e); } } } \ No newline at end of file diff --git a/src/main/scala/org/warcbase/spark/matchbox/ExtractLinksAndText.scala b/src/main/scala/org/warcbase/spark/matchbox/ExtractLinksAndText.scala index 307be9e..f35e954 100644 --- a/src/main/scala/org/warcbase/spark/matchbox/ExtractLinksAndText.scala +++ b/src/main/scala/org/warcbase/spark/matchbox/ExtractLinksAndText.scala @@ -1,40 +1,56 @@ +/* + * Warcbase: an open-source platform for managing web archives + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.warcbase.spark.matchbox import java.io.IOException import org.jsoup.Jsoup import org.jsoup.select.Elements import scala.collection.mutable /** * UDF for extracting links from a webpage given the HTML content (using Jsoup). Returns a seq of tuples, * where each tuple consists of the URL and the anchor text. */ object ExtractLinksAndText { def apply(html: String, base: String): Seq[(String, String)] = { if (html.isEmpty) return Nil try { val output = mutable.MutableList[(String, String)]() val doc = Jsoup.parse(html) val links: Elements = doc.select("a[href]") val it = links.iterator() while (it.hasNext) { val link = it.next() if (base.nonEmpty) link.setBaseUri(base) val target = link.attr("abs:href") if (target.nonEmpty) { output += ((target, link.text)) } } output } catch { case e: Exception => throw new IOException("Caught exception processing input row ", e); } } } \ No newline at end of file diff --git a/src/main/scala/org/warcbase/spark/matchbox/ExtractRawText.scala b/src/main/scala/org/warcbase/spark/matchbox/ExtractRawText.scala index cc50693..0a4e53c 100644 --- a/src/main/scala/org/warcbase/spark/matchbox/ExtractRawText.scala +++ b/src/main/scala/org/warcbase/spark/matchbox/ExtractRawText.scala @@ -1,16 +1,32 @@ +/* + * Warcbase: an open-source platform for managing web archives + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.warcbase.spark.matchbox import java.io.IOException import org.jsoup.Jsoup object ExtractRawText { def apply(content: String) = { try { Jsoup.parse(content).text().replaceAll("[\\r\\n]+", " ") } catch { case e: Exception => throw new IOException("Caught exception processing input row ", e) } } } diff --git a/src/main/scala/org/warcbase/spark/matchbox/ExtractTextFromPDFs.scala b/src/main/scala/org/warcbase/spark/matchbox/ExtractTextFromPDFs.scala index d6072b3..368d351 100644 --- a/src/main/scala/org/warcbase/spark/matchbox/ExtractTextFromPDFs.scala +++ b/src/main/scala/org/warcbase/spark/matchbox/ExtractTextFromPDFs.scala @@ -1,34 +1,50 @@ +/* + * Warcbase: an open-source platform for managing web archives + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.warcbase.spark.matchbox import java.io.ByteArrayInputStream //import org.apache.pig.data.DataByteArray import org.apache.tika.metadata.Metadata import org.apache.tika.parser.ParseContext import org.apache.tika.parser.pdf.PDFParser import org.apache.tika.sax.BodyContentHandler; object ExtractTextFromPDFs { val pdfParser = new PDFParser() /* def apply(dba: DataByteArray): String = { if (dba.get.isEmpty) "N/A" else { try { val is = new ByteArrayInputStream(dba.get) val contenthandler = new BodyContentHandler(Integer.MAX_VALUE) val metadata = new Metadata() pdfParser.parse(is, contenthandler, metadata, new ParseContext()) is.close() contenthandler.toString } catch { case t: Throwable => t.printStackTrace() "" } } } */ } \ No newline at end of file diff --git a/src/main/scala/org/warcbase/spark/matchbox/ExtractTopLevelDomain.scala b/src/main/scala/org/warcbase/spark/matchbox/ExtractTopLevelDomain.scala index e334e1a..7320a9e 100644 --- a/src/main/scala/org/warcbase/spark/matchbox/ExtractTopLevelDomain.scala +++ b/src/main/scala/org/warcbase/spark/matchbox/ExtractTopLevelDomain.scala @@ -1,21 +1,37 @@ +/* + * Warcbase: an open-source platform for managing web archives + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.warcbase.spark.matchbox import java.net.URL object ExtractTopLevelDomain { def apply(url: String, source: String = ""): String = { if (url == null) return null var host: String = null try { host = new URL(url).getHost } catch { case e: Exception => // it's okay } if (host != null || source == null) return host try { new URL(source).getHost } catch { case e: Exception => null } } } diff --git a/src/main/scala/org/warcbase/spark/matchbox/NER3Classifier.scala b/src/main/scala/org/warcbase/spark/matchbox/NER3Classifier.scala index b0afaa6..2ec8bd1 100644 --- a/src/main/scala/org/warcbase/spark/matchbox/NER3Classifier.scala +++ b/src/main/scala/org/warcbase/spark/matchbox/NER3Classifier.scala @@ -1,94 +1,110 @@ +/* + * Warcbase: an open-source platform for managing web archives + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.warcbase.spark.matchbox import java.util import com.fasterxml.jackson.core.`type`.TypeReference import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.scala.{DefaultScalaModule, JsonScalaEnumeration} import edu.stanford.nlp.ie.AbstractSequenceClassifier import edu.stanford.nlp.ie.crf.CRFClassifier import edu.stanford.nlp.ling.{CoreAnnotations, CoreLabel} import scala.collection.mutable /** * UDF which reads in a text string, and returns entities identified by the configured Stanford NER classifier */ object NER3Classifier { var serializedClassifier: String = _ var classifier: AbstractSequenceClassifier[CoreLabel] = _ val mapper = new ObjectMapper().registerModule(DefaultScalaModule) object NERClassType extends Enumeration { type NERClassType = Value val PERSON, ORGANIZATION, LOCATION, O = Value class NERClassTypeType extends TypeReference[NERClassType.type] case class NERClassTypeHolder(@JsonScalaEnumeration(classOf[NERClassTypeType]) nerclasstype: NERClassType.NERClassType) } def apply(file: String) = { serializedClassifier = file } def classify(input: String): String = { val emptyString: String = "{\"PERSON\":[],\"ORGANIZATION\"=[],\"LOCATION\"=[]}" val entitiesByType = mutable.LinkedHashMap[NERClassType.Value, mutable.Seq[String]]() for (t <- NERClassType.values) { if (t != NERClassType.O) entitiesByType.put(t, mutable.Seq()) } var prevEntityType = NERClassType.O var entityBuffer: String = "" if (input == null) return emptyString try { if (classifier == null) classifier = CRFClassifier.getClassifier(serializedClassifier) val out: util.List[util.List[CoreLabel]] = classifier.classify(input) val outit = out.iterator() while (outit.hasNext) { val sentence = outit.next val sentenceit = sentence.iterator() while (sentenceit.hasNext) { val word = sentenceit.next val wordText = word.word() val classText = word.get(classOf[CoreAnnotations.AnswerAnnotation]) val currEntityType = NERClassType.withName(classText) if (prevEntityType != currEntityType) { if (prevEntityType != NERClassType.O && !entityBuffer.equals("")) { //time to commit entitiesByType.put(prevEntityType, entitiesByType.get(prevEntityType).get ++ Seq(entityBuffer)) entityBuffer = "" } } prevEntityType = currEntityType if (currEntityType != NERClassType.O) { if (entityBuffer.equals("")) entityBuffer = wordText else entityBuffer += " " + wordText } } //end of sentence //apply commit and reset if (prevEntityType != NERClassType.O && !entityBuffer.equals("")) { entitiesByType.put(prevEntityType, entitiesByType.get(prevEntityType).get ++ Seq(entityBuffer)) entityBuffer = "" } //reset prevEntityType = NERClassType.O entityBuffer = "" } mapper.writeValueAsString(entitiesByType) } catch { case e: Exception => if (classifier == null) throw new ExceptionInInitializerError("Unable to load classifier " + e) emptyString } } } diff --git a/src/main/scala/org/warcbase/spark/matchbox/RecordLoader.scala b/src/main/scala/org/warcbase/spark/matchbox/RecordLoader.scala index da0767b..5fed7f3 100644 --- a/src/main/scala/org/warcbase/spark/matchbox/RecordLoader.scala +++ b/src/main/scala/org/warcbase/spark/matchbox/RecordLoader.scala @@ -1,21 +1,37 @@ +/* + * Warcbase: an open-source platform for managing web archives + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.warcbase.spark.matchbox import org.apache.hadoop.io.LongWritable import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.warcbase.io.{WarcRecordWritable, ArcRecordWritable} import org.warcbase.mapreduce.{WacWarcInputFormat, WacArcInputFormat} import org.warcbase.spark.matchbox.RecordTransformers.WARecord object RecordLoader { def loadArc(path: String, sc: SparkContext): RDD[WARecord] = { sc.newAPIHadoopFile(path, classOf[WacArcInputFormat], classOf[LongWritable], classOf[ArcRecordWritable]) .map(r => r._2.getRecord) } def loadWarc(path: String, sc: SparkContext): RDD[WARecord] = { sc.newAPIHadoopFile(path, classOf[WacWarcInputFormat], classOf[LongWritable], classOf[WarcRecordWritable]) .filter(r => r._2.getRecord.getHeader.getHeaderValue("WARC-Type").equals("response")) .map(r => r._2.getRecord) } } diff --git a/src/main/scala/org/warcbase/spark/matchbox/RecordTransformers.scala b/src/main/scala/org/warcbase/spark/matchbox/RecordTransformers.scala index fda3424..1423829 100644 --- a/src/main/scala/org/warcbase/spark/matchbox/RecordTransformers.scala +++ b/src/main/scala/org/warcbase/spark/matchbox/RecordTransformers.scala @@ -1,66 +1,82 @@ +/* + * Warcbase: an open-source platform for managing web archives + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.warcbase.spark.matchbox import java.text.SimpleDateFormat import org.archive.io.arc.ARCRecord import org.archive.io.warc.WARCRecord import org.archive.util.ArchiveUtils import org.warcbase.data.{ArcRecordUtils, WarcRecordUtils} /** * Provides a common interface from which ARCRecords and WARCRecords can be accessed. * * Specifically, a WARecord has fields crawldate, url, domain, mimeType, and bodyContent. */ object RecordTransformers { trait WARecord extends Serializable { def getCrawldate: String def getUrl: String def getDomain: String def getMimeType: String def getBodyContent: String def getRawBodyContent: String } implicit def toArcRecord(r: ARCRecord): WARecord = new ArcRecord(r) implicit def toWarcRecord(r: WARCRecord): WARecord = new WarcRecord(r) class ArcRecord(r: ARCRecord) extends WARecord { override def getCrawldate: String = r.getMetaData.getDate.substring(0, 8) override def getDomain: String = ExtractTopLevelDomain(r.getMetaData.getUrl) override def getMimeType: String = r.getMetaData.getMimetype override def getUrl: String = r.getMetaData.getUrl override def getBodyContent: String = new String(ArcRecordUtils.getBodyContent(r)) override def getRawBodyContent: String = ExtractRawText(new String(ArcRecordUtils.getBodyContent(r))) } class WarcRecord(r: WARCRecord) extends WARecord { val ISO8601 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssX") private val content = WarcRecordUtils.getContent(r) override def getCrawldate: String = ArchiveUtils.get14DigitDate(ISO8601.parse(r.getHeader.getDate)).substring(0, 8) override def getDomain = ExtractTopLevelDomain(getUrl).replace("^\\s*www\\.", "") override def getMimeType = WarcRecordUtils.getWarcResponseMimeType(content) override def getUrl = r.getHeader.getUrl override def getBodyContent: String = new String(content) override def getRawBodyContent: String = ExtractRawText(new String(WarcRecordUtils.getBodyContent(r))) } } diff --git a/src/main/scala/org/warcbase/spark/pythonconverters/ArcRecordConverter.scala b/src/main/scala/org/warcbase/spark/pythonconverters/ArcRecordConverter.scala index a889ec4..0ec8a1a 100644 --- a/src/main/scala/org/warcbase/spark/pythonconverters/ArcRecordConverter.scala +++ b/src/main/scala/org/warcbase/spark/pythonconverters/ArcRecordConverter.scala @@ -1,64 +1,80 @@ +/* + * Warcbase: an open-source platform for managing web archives + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.warcbase.spark.pythonconverters import scala.collection.JavaConversions._ import scala.util.parsing.json.JSONObject import org.warcbase.data.ArcRecordUtils; import org.warcbase.io.ArcRecordWritable import org.apache.hadoop.io.{MapWritable, Text} import org.apache.spark.api.python.Converter class ArcRecordWritableToUrlConverter extends Converter[Any, String] { override def convert(obj: Any): String = { val key = obj.asInstanceOf[ArcRecordWritable] val meta = key.getRecord().getMetaData() meta.getUrl() } } class ArcRecordWritableToCrawlDateConverter extends Converter[Any, String] { override def convert(obj: Any): String = { val key = obj.asInstanceOf[ArcRecordWritable] val meta = key.getRecord().getMetaData() meta.getDate() } } class ArcRecordWritableToMetadataConverter extends Converter[Any, java.util.Map[String, String]] { override def convert(obj: Any): java.util.Map[String, String] = { val key = obj.asInstanceOf[ArcRecordWritable] val meta = key.getRecord().getMetaData() mapAsJavaMap(Map[String, String]( "url" -> meta.getUrl(), "date" -> meta.getDate(), "mime-type" -> meta.getMimetype() )) } } class ArcRecordWritableToStringMetadataConverter extends Converter[Any, String] { override def convert(obj: Any): String = { val key = obj.asInstanceOf[ArcRecordWritable] val meta = key.getRecord().getMetaData() meta.getUrl() + "\t" + meta.getDate() + "\t" + meta.getMimetype() } } class ArcRecordWritableToHtmlConverter extends Converter[Any, java.util.Map[String, String]] { override def convert(obj: Any): java.util.Map[String, String] = { val key = obj.asInstanceOf[ArcRecordWritable] val meta = key.getRecord().getMetaData() if ( meta.getMimetype() != "text/html" ) { null } else { mapAsJavaMap(Map[String, String]( "url" -> meta.getUrl(), "date" -> meta.getDate(), "mime-type" -> meta.getMimetype(), "content" -> new String(ArcRecordUtils.getBodyContent(key.getRecord())) )) } } } diff --git a/src/main/scala/org/warcbase/spark/rdd/RecordRDD.scala b/src/main/scala/org/warcbase/spark/rdd/RecordRDD.scala index 2a0b752..2dbd66e 100644 --- a/src/main/scala/org/warcbase/spark/rdd/RecordRDD.scala +++ b/src/main/scala/org/warcbase/spark/rdd/RecordRDD.scala @@ -1,74 +1,90 @@ +/* + * Warcbase: an open-source platform for managing web archives + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.warcbase.spark.rdd import org.apache.spark.rdd.RDD import org.warcbase.spark.matchbox.ExtractTopLevelDomain import org.warcbase.spark.matchbox.RecordTransformers.WARecord import scala.reflect.ClassTag /** * RDD wrappers for working with Records */ object RecordRDD extends java.io.Serializable { /** * A Wrapper class around RDD to simplify counting */ implicit class CountableRDD[T: ClassTag](rdd: RDD[T]) extends java.io.Serializable { def countItems(): RDD[(T, Int)] = { rdd.map(r => (r, 1)) .reduceByKey((c1, c2) => c1 + c2) .sortBy(f => f._2, ascending = false) } } /** * A Wrapper class around RDD to allow RDDs of type ARCRecord and WARCRecord to be queried via a fluent API. * * To load such an RDD, please see [[org.warcbase.spark.matchbox.RecordLoader]] */ implicit class WARecordRDD(rdd: RDD[WARecord]) extends java.io.Serializable { def keepValidPages(): RDD[WARecord] = { rdd.filter(r => r.getCrawldate != null && (r.getMimeType == "text/html" || r.getUrl.endsWith("htm") || r.getUrl.endsWith("html")) && !r.getUrl.endsWith("robots.txt")) } def keepMimeTypes(mimeTypes: Set[String]) = { rdd.filter(r => mimeTypes.contains(r.getMimeType)) } def keepDate(date: String) = { rdd.filter(r => r.getCrawldate == date) } def keepUrls(urls: Set[String]) = { rdd.filter(r => urls.contains(r.getUrl)) } def keepDomains(urls: Set[String]) = { rdd.filter(r => urls.contains(ExtractTopLevelDomain(r.getUrl).replace("^\\s*www\\.", ""))) } def discardMimeTypes(mimeTypes: Set[String]) = { rdd.filter(r => !mimeTypes.contains(r.getMimeType)) } def discardDate(date: String) = { rdd.filter(r => r.getCrawldate != date) } def discardUrls(urls: Set[String]) = { rdd.filter(r => !urls.contains(r.getUrl)) } def discardDomains(urls: Set[String]) = { rdd.filter(r => !urls.contains(r.getDomain)) } } } diff --git a/src/main/scala/org/warcbase/spark/scripts/Filter.scala b/src/main/scala/org/warcbase/spark/scripts/Filter.scala index 1990823..615cef2 100644 --- a/src/main/scala/org/warcbase/spark/scripts/Filter.scala +++ b/src/main/scala/org/warcbase/spark/scripts/Filter.scala @@ -1,17 +1,33 @@ +/* + * Warcbase: an open-source platform for managing web archives + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.warcbase.spark.scripts import org.apache.spark.SparkContext import org.warcbase.spark.matchbox.RecordLoader import org.warcbase.spark.rdd.RecordRDD._ object Filter { def filter(sc: SparkContext) = { val r = RecordLoader.loadArc("collections/ARCHIVEIT-227-UOFTORONTO-CANPOLPINT-20090201174320-00056-crawling04.us.archive.org.arc.gz", sc) .keepMimeTypes(Set("text/html")) .discardDate(null) .keepDomains(Set("greenparty.ca")) .map(r => (r.getCrawldate, r.getRawBodyContent)) r.saveAsTextFile("/green") } } diff --git a/src/main/scala/org/warcbase/spark/scripts/SocialMediaLinks.scala b/src/main/scala/org/warcbase/spark/scripts/SocialMediaLinks.scala index 3448fe8..d45b72f 100644 --- a/src/main/scala/org/warcbase/spark/scripts/SocialMediaLinks.scala +++ b/src/main/scala/org/warcbase/spark/scripts/SocialMediaLinks.scala @@ -1,8 +1,24 @@ +/* + * Warcbase: an open-source platform for managing web archives + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.warcbase.spark.scripts /** * Created by alicezhou on 2015-11-07. */ class SocialMediaLinks { } diff --git a/src/test/java/org/warcbase/data/UrlMappingTest.java b/src/test/java/org/warcbase/data/UrlMappingTest.java index d32addd..3f58552 100644 --- a/src/test/java/org/warcbase/data/UrlMappingTest.java +++ b/src/test/java/org/warcbase/data/UrlMappingTest.java @@ -1,136 +1,152 @@ +/* + * Warcbase: an open-source platform for managing web archives + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.warcbase.data; import static org.junit.Assert.assertEquals; import java.io.IOException; import java.util.List; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.IntsRef; import org.apache.lucene.util.fst.Builder; import org.apache.lucene.util.fst.FST; import org.apache.lucene.util.fst.FST.INPUT_TYPE; import org.apache.lucene.util.fst.PositiveIntOutputs; import org.apache.lucene.util.fst.Util; import org.junit.Before; import org.junit.Test; // This class aims to test the PrefixSearch functionality. public class UrlMappingTest { private UrlMapping map; @Before public void setUp() throws Exception { String inputValues[] = { "cat", "catch", "cut", "doga", "dogb", "dogs" }; long outputValues[] = { 1, 2, 3, 4, 5, 6 }; PositiveIntOutputs outputs = PositiveIntOutputs.getSingleton(); Builder builder = new Builder(INPUT_TYPE.BYTE1, outputs); BytesRef scratchBytes = new BytesRef(); IntsRef scratchInts = new IntsRef(); for (int i = 0; i < inputValues.length; i++) { scratchBytes.copyChars(inputValues[i]); builder.add(Util.toIntsRef(scratchBytes, scratchInts), outputValues[i]); } FST fst = builder.finish(); map = new UrlMapping(fst); } @Test public void testGetIds() { assertEquals(-1, map.getID("apple")); assertEquals(1, map.getID("cat")); assertEquals(2, map.getID("catch")); assertEquals(3, map.getID("cut")); assertEquals(-1, map.getID("cuttery")); assertEquals(4, map.getID("doga")); assertEquals(5, map.getID("dogb")); assertEquals(6, map.getID("dogs")); assertEquals(-1, map.getID("dogz")); } @Test public void testGetUrls() { assertEquals(null, map.getUrl(0)); assertEquals("cat", map.getUrl(1)); assertEquals("catch", map.getUrl(2)); assertEquals("cut", map.getUrl(3)); assertEquals("doga", map.getUrl(4)); assertEquals("dogb", map.getUrl(5)); assertEquals("dogs", map.getUrl(6)); assertEquals(null, map.getUrl(7)); } @Test public void testPrefixSearch() { List results; results = map.prefixSearch("cut"); assertEquals(1, results.size()); assertEquals("cut", results.get(0)); results = map.prefixSearch("dog"); assertEquals(3, results.size()); assertEquals("doga", results.get(0)); assertEquals("dogb", results.get(1)); assertEquals("dogs", results.get(2)); results = map.prefixSearch(""); assertEquals(0, results.size()); results = map.prefixSearch(null); assertEquals(0, results.size()); results = map.prefixSearch("dad"); assertEquals(0, results.size()); } @Test public void testGetIdRange() throws IOException{ int[] range; range = map.getIdRange("doga", "dogs"); assertEquals(4, range[0]); assertEquals(6, range[1]); assertEquals("doga", map.getUrl(range[0])); assertEquals("dogs", map.getUrl(range[1])); range = map.getIdRange("doga", "dogb"); assertEquals(4, range[0]); assertEquals(5, range[1]); assertEquals("doga", map.getUrl(range[0])); assertEquals("dogb", map.getUrl(range[1])); range = map.getIdRange("dogs", "dogs"); assertEquals(6, range[0]); assertEquals(6, range[1]); assertEquals("dogs", map.getUrl(range[0])); assertEquals("dogs", map.getUrl(range[1])); // If either one of the bounds is invalid, expect null range = map.getIdRange("dog", "dogx"); assertEquals(null, range); range = map.getIdRange("doga", "dogx"); assertEquals(null, range); range = map.getIdRange("dog", "dogs"); assertEquals(null, range); range = map.getIdRange("", "dogs"); assertEquals(null, range); range = map.getIdRange("", ""); assertEquals(null, range); range = map.getIdRange(null, ""); assertEquals(null, range); range = map.getIdRange(null, null); assertEquals(null, range); range = map.getIdRange(null, null); assertEquals(null, range); } } diff --git a/src/test/java/org/warcbase/data/UrlUtilsTest.java b/src/test/java/org/warcbase/data/UrlUtilsTest.java index 6bc3cc8..4b3f3f2 100644 --- a/src/test/java/org/warcbase/data/UrlUtilsTest.java +++ b/src/test/java/org/warcbase/data/UrlUtilsTest.java @@ -1,29 +1,45 @@ +/* + * Warcbase: an open-source platform for managing web archives + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.warcbase.data; import static org.junit.Assert.assertEquals; import org.junit.Test; public class UrlUtilsTest { @Test public void test1() { String url = "http://www.house.gov/mthompson/the_1st_district.htm"; String rowKey = "gov.house.www/mthompson/the_1st_district.htm"; assertEquals(rowKey, UrlUtils.urlToKey(url)); assertEquals(url, UrlUtils.keyToUrl(rowKey)); } @Test public void test2() { String[] hostnames = new String[] { "www.house.gov", "umiacs.umd.edu", "foo.bar.com:8080", "a.b.c.d.com:12345", "warcbase.org", "foo" }; String[] reversed = new String[] { "gov.house.www", "edu.umd.umiacs", "com.bar.foo:8080", "com.d.c.b.a:12345", "org.warcbase", "foo" }; for (int i=0; i ii = reader.iterator(); ii.hasNext();) { ARCRecord r = (ARCRecord) ii.next(); ARCRecordMetaData meta = r.getMetaData(); if (cnt < urls.length) { assertEquals(urls[cnt], meta.getUrl()); } cnt++; } reader.close(); LOG.info(cnt + " records read!"); assertEquals(300, cnt); } @Test public void testReadFromStream() throws Exception { String arcFile = Resources.getResource("arc/example.arc.gz").getPath(); ARCReader reader = ARCReaderFactory.get(new File(arcFile)); int cnt = 0; for (Iterator ii = reader.iterator(); ii.hasNext();) { ARCRecord r = (ARCRecord) ii.next(); // Skip the file header. if (cnt == 0) { cnt++; continue; } String h = r.getHeaderString(); InputStream in = new DataInputStream(new ByteArrayInputStream(ArcRecordUtils.toBytes(r))); ARCReader nr = (ARCReader) ARCReaderFactory.get("", new BufferedInputStream(in), false); ARCRecord r2 = (ARCRecord) nr.get(); assertEquals(h, r2.getHeaderString()); cnt++; } reader.close(); LOG.info(cnt + " records read!"); assertEquals(300, cnt); } } diff --git a/src/test/java/org/warcbase/ingest/WacWarcLoaderTest.java b/src/test/java/org/warcbase/ingest/WacWarcLoaderTest.java index c07e92a..205a9ef 100644 --- a/src/test/java/org/warcbase/ingest/WacWarcLoaderTest.java +++ b/src/test/java/org/warcbase/ingest/WacWarcLoaderTest.java @@ -1,118 +1,134 @@ +/* + * Warcbase: an open-source platform for managing web archives + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.warcbase.ingest; import static org.junit.Assert.assertEquals; import java.io.BufferedInputStream; import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.File; import java.io.InputStream; import java.text.SimpleDateFormat; import java.util.Iterator; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.archive.io.ArchiveRecord; import org.archive.io.ArchiveRecordHeader; import org.archive.io.warc.WARCReader; import org.archive.io.warc.WARCReaderFactory; import org.archive.io.warc.WARCRecord; import org.archive.util.ArchiveUtils; import org.junit.Test; import org.warcbase.data.WarcRecordUtils; import tl.lin.data.fd.Object2IntFrequencyDistribution; import tl.lin.data.fd.Object2IntFrequencyDistributionEntry; import com.google.common.io.Resources; public class WacWarcLoaderTest { private static final Log LOG = LogFactory.getLog(WacWarcLoaderTest.class); private static final SimpleDateFormat DATE_WARC = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'"); @Test public void testReader() throws Exception { String warcFile = Resources.getResource("warc/example.warc.gz").getPath(); WARCReader reader = WARCReaderFactory.get(new File(warcFile)); Object2IntFrequencyDistribution types = new Object2IntFrequencyDistributionEntry(); Object2IntFrequencyDistribution responseTypes = new Object2IntFrequencyDistributionEntry(); int cnt = 0; for (Iterator ii = reader.iterator(); ii.hasNext();) { WARCRecord r = (WARCRecord) ii.next(); ArchiveRecordHeader header = r.getHeader(); types.increment((String) header.getHeaderValue("WARC-Type")); byte[] contents = WarcRecordUtils.getContent(r); int len = (int) header.getContentLength(); assertEquals(len, contents.length); // This is how you extract the date @SuppressWarnings("unused") String digit14Date = ArchiveUtils.get14DigitDate(DATE_WARC.parse(header.getDate())); if (header.getHeaderValue("WARC-Type").equals("response") && header.getUrl().startsWith("http://")) { responseTypes.increment(WarcRecordUtils.getWarcResponseMimeType(contents)); } cnt++; } reader.close(); LOG.info(cnt + " records read!"); assertEquals(822, cnt); assertEquals(299, types.get("response")); assertEquals( 1, types.get("warcinfo")); assertEquals(261, types.get("request")); assertEquals(261, types.get("metadata")); assertEquals( 4, types.getNumberOfEvents()); assertEquals(822, types.getSumOfCounts()); assertEquals( 8, responseTypes.get("application/x-javascript")); assertEquals( 4, responseTypes.get("text/css")); assertEquals( 8, responseTypes.get("application/x-shockwave-flash")); assertEquals( 9, responseTypes.get("text/xml")); assertEquals( 8, responseTypes.get("image/png")); assertEquals( 18, responseTypes.get("image/jpeg")); assertEquals( 29, responseTypes.get("image/gif")); assertEquals( 36, responseTypes.get("text/plain")); assertEquals(140, responseTypes.get("text/html")); assertEquals(260, responseTypes.getSumOfCounts()); } @Test public void testReadFromStream() throws Exception { String warcFile = Resources.getResource("warc/example.warc.gz").getPath(); WARCReader reader = WARCReaderFactory.get(new File(warcFile)); int cnt = 0; for (Iterator ii = reader.iterator(); ii.hasNext();) { WARCRecord r = (WARCRecord) ii.next(); InputStream in = new DataInputStream(new ByteArrayInputStream(WarcRecordUtils.toBytes(r))); WARCReader nr = (WARCReader) WARCReaderFactory.get("", new BufferedInputStream(in), false); WARCRecord r2 = (WARCRecord) nr.get(); assertEquals(r.getHeader().getUrl(), r2.getHeader().getUrl()); ArchiveRecordHeader header = r2.getHeader(); byte[] contents = WarcRecordUtils.getContent(r2); int len = (int) header.getContentLength(); assertEquals(len, contents.length); cnt++; } reader.close(); LOG.info(cnt + " records read!"); assertEquals(822, cnt); } } diff --git a/src/test/java/org/warcbase/io/ArcRecordWritableTest.java b/src/test/java/org/warcbase/io/ArcRecordWritableTest.java index cdb955a..0986af0 100644 --- a/src/test/java/org/warcbase/io/ArcRecordWritableTest.java +++ b/src/test/java/org/warcbase/io/ArcRecordWritableTest.java @@ -1,66 +1,82 @@ +/* + * Warcbase: an open-source platform for managing web archives + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.warcbase.io; import static org.junit.Assert.assertEquals; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.File; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; import org.apache.hadoop.util.ReflectionUtils; import org.junit.Test; import org.warcbase.mapreduce.WacArcInputFormat; import com.google.common.io.Resources; public class ArcRecordWritableTest { @Test public void testInputFormat() throws Exception { String arcFile = Resources.getResource("arc/example.arc.gz").getPath(); Configuration conf = new Configuration(false); conf.set("fs.defaultFS", "file:///"); File testFile = new File(arcFile); Path path = new Path(testFile.getAbsoluteFile().toURI()); FileSplit split = new FileSplit(path, 0, testFile.length(), null); InputFormat inputFormat = ReflectionUtils.newInstance( WacArcInputFormat.class, conf); TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID()); RecordReader reader = inputFormat.createRecordReader(split, context); reader.initialize(split, context); int cnt = 0; while (reader.nextKeyValue()) { ArcRecordWritable record = reader.getCurrentValue(); cnt++; ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); DataOutputStream dataOut = new DataOutputStream(bytesOut); record.write(dataOut); ArcRecordWritable reconstructed = new ArcRecordWritable(); reconstructed.readFields(new DataInputStream(new ByteArrayInputStream(bytesOut.toByteArray()))); assertEquals(record.getRecord().getMetaData().getUrl(), reconstructed.getRecord().getMetaData().getUrl()); } assertEquals(300, cnt); } } diff --git a/src/test/java/org/warcbase/io/WarcRecordWritableTest.java b/src/test/java/org/warcbase/io/WarcRecordWritableTest.java index 77b0b25..f246d51 100644 --- a/src/test/java/org/warcbase/io/WarcRecordWritableTest.java +++ b/src/test/java/org/warcbase/io/WarcRecordWritableTest.java @@ -1,71 +1,87 @@ +/* + * Warcbase: an open-source platform for managing web archives + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.warcbase.io; import static org.junit.Assert.assertEquals; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.File; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; import org.apache.hadoop.util.ReflectionUtils; import org.junit.Test; import org.warcbase.mapreduce.WacWarcInputFormat; import com.google.common.io.Resources; public class WarcRecordWritableTest { @Test public void testInputFormat() throws Exception { String warcFile = Resources.getResource("warc/example.warc.gz").getPath(); Configuration conf = new Configuration(false); conf.set("fs.defaultFS", "file:///"); File testFile = new File(warcFile); Path path = new Path(testFile.getAbsoluteFile().toURI()); FileSplit split = new FileSplit(path, 0, testFile.length(), null); InputFormat inputFormat = ReflectionUtils.newInstance( WacWarcInputFormat.class, conf); TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID()); RecordReader reader = inputFormat.createRecordReader(split, context); reader.initialize(split, context); int cnt = 0; while (reader.nextKeyValue()) { WarcRecordWritable record = reader.getCurrentValue(); //System.out.println(record.getRecord().getHeader().getUrl() + " " + // record.getRecord().getHeader().getHeaderValue("WARC-Type")); cnt++; ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); DataOutputStream dataOut = new DataOutputStream(bytesOut); record.write(dataOut); WarcRecordWritable reconstructed = new WarcRecordWritable(); reconstructed.readFields(new DataInputStream(new ByteArrayInputStream(bytesOut.toByteArray()))); assertEquals(record.getRecord().getHeader().getUrl(), reconstructed.getRecord().getHeader().getUrl()); assertEquals(record.getRecord().getHeader().getContentLength(), reconstructed.getRecord().getHeader().getContentLength()); } assertEquals(822, cnt); } } diff --git a/src/test/java/org/warcbase/mapreduce/WacArcInputFormatTest.java b/src/test/java/org/warcbase/mapreduce/WacArcInputFormatTest.java index c785fd1..1acf9e8 100644 --- a/src/test/java/org/warcbase/mapreduce/WacArcInputFormatTest.java +++ b/src/test/java/org/warcbase/mapreduce/WacArcInputFormatTest.java @@ -1,63 +1,79 @@ +/* + * Warcbase: an open-source platform for managing web archives + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.warcbase.mapreduce; import static org.junit.Assert.assertEquals; import java.io.File; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; import org.apache.hadoop.util.ReflectionUtils; import org.archive.io.arc.ARCRecord; import org.archive.io.arc.ARCRecordMetaData; import org.junit.Test; import org.warcbase.io.ArcRecordWritable; import com.google.common.io.Resources; public class WacArcInputFormatTest { @Test public void testInputFormat() throws Exception { String[] urls = new String[] { "filedesc://IAH-20080430204825-00000-blackbook.arc", "dns:www.archive.org", "http://www.archive.org/robots.txt", "http://www.archive.org/", "http://www.archive.org/index.php" }; String arcFile = Resources.getResource("arc/example.arc.gz").getPath(); Configuration conf = new Configuration(false); conf.set("fs.defaultFS", "file:///"); File testFile = new File(arcFile); Path path = new Path(testFile.getAbsoluteFile().toURI()); FileSplit split = new FileSplit(path, 0, testFile.length(), null); InputFormat inputFormat = ReflectionUtils.newInstance(WacArcInputFormat.class, conf); TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID()); RecordReader reader = inputFormat.createRecordReader(split, context); reader.initialize(split, context); int cnt = 0; while (reader.nextKeyValue()) { ARCRecord record = reader.getCurrentValue().getRecord(); ARCRecordMetaData metadata = record.getMetaData(); if (cnt < urls.length) { assertEquals(urls[cnt], metadata.getUrl()); } cnt++; } assertEquals(300, cnt); } } diff --git a/src/test/java/org/warcbase/mapreduce/WacWarcInputFormatTest.java b/src/test/java/org/warcbase/mapreduce/WacWarcInputFormatTest.java index c0d6dfd..596f931 100644 --- a/src/test/java/org/warcbase/mapreduce/WacWarcInputFormatTest.java +++ b/src/test/java/org/warcbase/mapreduce/WacWarcInputFormatTest.java @@ -1,89 +1,105 @@ +/* + * Warcbase: an open-source platform for managing web archives + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.warcbase.mapreduce; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import java.io.File; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; import org.apache.hadoop.util.ReflectionUtils; import org.archive.io.warc.WARCRecord; import org.junit.Test; import org.warcbase.io.WarcRecordWritable; import com.google.common.io.Resources; public class WacWarcInputFormatTest { @Test public void testInputFormat() throws Exception { String[] urls = new String[] { null, "dns:www.archive.org", "http://www.archive.org/robots.txt", "http://www.archive.org/robots.txt", "http://www.archive.org/robots.txt", "http://www.archive.org/", "http://www.archive.org/", "http://www.archive.org/", "http://www.archive.org/index.php", "http://www.archive.org/index.php"}; String[] types = new String[] { "warcinfo", "response", "response", "request", "metadata", "response", "request", "metadata", "response", "request"}; String arcFile = Resources.getResource("warc/example.warc.gz").getPath(); Configuration conf = new Configuration(false); conf.set("fs.defaultFS", "file:///"); File testFile = new File(arcFile); Path path = new Path(testFile.getAbsoluteFile().toURI()); FileSplit split = new FileSplit(path, 0, testFile.length(), null); InputFormat inputFormat = ReflectionUtils.newInstance(WacWarcInputFormat.class, conf); TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID()); RecordReader reader = inputFormat.createRecordReader(split, context); reader.initialize(split, context); assertTrue(urls.length == types.length); int cnt = 0; int responseCnt = 0; while (reader.nextKeyValue()) { WARCRecord record = reader.getCurrentValue().getRecord(); if (cnt < urls.length) { assertEquals(urls[cnt], record.getHeader().getUrl()); assertEquals(types[cnt], record.getHeader().getHeaderValue("WARC-Type")); } if (record.getHeader().getHeaderValue("WARC-Type").equals("response")) { responseCnt++; } cnt++; } assertEquals(822, cnt); assertEquals(299, responseCnt); } } diff --git a/src/test/scala/org/warcbase/spark/ArcTest.scala b/src/test/scala/org/warcbase/spark/ArcTest.scala index 01a6b6e..a4e130c 100644 --- a/src/test/scala/org/warcbase/spark/ArcTest.scala +++ b/src/test/scala/org/warcbase/spark/ArcTest.scala @@ -1,87 +1,103 @@ +/* + * Warcbase: an open-source platform for managing web archives + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.warcbase.spark import com.google.common.io.Resources import org.apache.spark.{SparkConf, SparkContext} import org.junit.runner.RunWith import org.scalatest.junit.JUnitRunner import org.scalatest.{BeforeAndAfter, FunSuite} import org.warcbase.spark.matchbox._ import org.warcbase.spark.rdd.RecordRDD._ @RunWith(classOf[JUnitRunner]) class ArcTest extends FunSuite with BeforeAndAfter { private val arcPath = Resources.getResource("arc/example.arc.gz").getPath private val master = "local[4]" private val appName = "example-spark" private var sc: SparkContext = _ before { val conf = new SparkConf() .setMaster(master) .setAppName(appName) sc = new SparkContext(conf) } test("count records") { assert(RecordLoader.loadArc(arcPath, sc).count == 300L) } test("count links") { val links = RecordLoader.loadArc(arcPath, sc) .map(r => ExtractLinks(r.getUrl, r.getBodyContent)) .reduce((a, b) => a ++ b) assert(links.size == 664) } test("detect language") { val languageCounts = RecordLoader.loadArc(arcPath, sc) .keepMimeTypes(Set("text/html")) .map(r => ExtractRawText(r.getBodyContent)) .groupBy(content => DetectLanguage(content)) .map(f => { (f._1, f._2.size) }) .collect languageCounts.foreach { case ("en", count) => assert(57L == count) case ("et", count) => assert(6L == count) case ("it", count) => assert(1L == count) case ("lt", count) => assert(61L == count) case ("no", count) => assert(6L == count) case ("ro", count) => assert(4L == count) case (_, count) => print(_) } } test("detect mime type tika") { val mimeTypeCounts = RecordLoader.loadArc(arcPath, sc) .map(r => ExtractRawText(r.getBodyContent)) .groupBy(content => DetectMimeTypeTika(content)) .map(f => { println(f._1 + " : " + f._2.size) (f._1, f._2.size) }).collect mimeTypeCounts.foreach { case ("image/gif", count) => assert(29L == count) case ("image/png", count) => assert(8L == count) case ("image/jpeg", count) => assert(18L == count) case ("text/html", count) => assert(132L == count) case ("text/plain", count) => assert(229L == count) case ("application/xml", count) => assert(1L == count) case ("application/rss+xml", count) => assert(9L == count) case ("application/xhtml+xml", count) => assert(1L == count) case ("application/octet-stream", count) => assert(26L == count) case ("application/x-shockwave-flash", count) => assert(8L == count) case (_, count) => print(_) } } after { if (sc != null) { sc.stop() } } } diff --git a/src/test/scala/org/warcbase/spark/WarcTest.scala b/src/test/scala/org/warcbase/spark/WarcTest.scala index d1d4125..72957f7 100644 --- a/src/test/scala/org/warcbase/spark/WarcTest.scala +++ b/src/test/scala/org/warcbase/spark/WarcTest.scala @@ -1,51 +1,67 @@ +/* + * Warcbase: an open-source platform for managing web archives + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.warcbase.spark import com.google.common.io.Resources import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} import org.junit.runner.RunWith import org.scalatest.junit.JUnitRunner import org.scalatest.{BeforeAndAfter, FunSuite} import org.warcbase.spark.matchbox.RecordLoader import org.warcbase.spark.matchbox.RecordTransformers.WARecord import org.warcbase.spark.rdd.RecordRDD._ @RunWith(classOf[JUnitRunner]) class WarcTest extends FunSuite with BeforeAndAfter { private val warcPath = Resources.getResource("warc/example.warc.gz").getPath private val master = "local[2]" private val appName = "example-spark" private var sc: SparkContext = _ private var records: RDD[WARecord] = _ before { val conf = new SparkConf() .setMaster(master) .setAppName(appName) sc = new SparkContext(conf) records = RecordLoader.loadWarc(warcPath, sc) } test("count records") { val warcRecords = RecordLoader.loadWarc(warcPath, sc) assert(299L == warcRecords.count) } test("warc extract domain") { val r = RecordLoader.loadWarc(warcPath, sc) .keepValidPages() .map(r => r.getDomain) .countItems() .take(10) assert(r.length == 3) } after { if (sc != null) { sc.stop() } } } diff --git a/src/test/scala/org/warcbase/spark/matchbox/ExtractEntitiesTest.scala b/src/test/scala/org/warcbase/spark/matchbox/ExtractEntitiesTest.scala index d42d338..2c4532f 100644 --- a/src/test/scala/org/warcbase/spark/matchbox/ExtractEntitiesTest.scala +++ b/src/test/scala/org/warcbase/spark/matchbox/ExtractEntitiesTest.scala @@ -1,60 +1,76 @@ +/* + * Warcbase: an open-source platform for managing web archives + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.warcbase.spark.matchbox import java.io.File import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.scala.DefaultScalaModule import com.google.common.io.{Files, Resources} import org.apache.commons.io.FileUtils import org.apache.commons.logging.LogFactory import org.apache.spark.{SparkConf, SparkContext} import org.scalatest.{BeforeAndAfter, FunSuite} import org.warcbase.spark.matchbox.NER3Classifier.NERClassType import scala.collection.mutable // There must be a valid classifier file with path `iNerClassifierFile` for this test to pass // @RunWith(classOf[JUnitRunner]) class ExtractEntitiesTest extends FunSuite with BeforeAndAfter { private val LOG = LogFactory.getLog(classOf[ExtractEntitiesTest]) private val scrapePath = Resources.getResource("ner/example.txt").getPath private val master = "local[4]" private val appName = "example-spark" private var sc: SparkContext = _ private var tempDir: File = _ private val mapper = new ObjectMapper().registerModule(DefaultScalaModule) private val iNerClassifierFile = Resources.getResource("ner/classifiers/english.all.3class.distsim.crf.ser.gz").getPath before { val conf = new SparkConf() .setMaster(master) .setAppName(appName) sc = new SparkContext(conf) tempDir = Files.createTempDir() LOG.info("Output can be found in " + tempDir.getPath) } test("extract entities") { val e = ExtractEntities.extractFromScrapeText(iNerClassifierFile, scrapePath, tempDir + "/scrapeTextEntities", sc).take(3).last val expectedEntityMap = mutable.Map[NERClassType.Value, List[String]]() expectedEntityMap.put(NERClassType.PERSON, List()) expectedEntityMap.put(NERClassType.LOCATION, List("Teoma")) expectedEntityMap.put(NERClassType.ORGANIZATION, List()) assert(e._1 == "20080430") assert(e._2 == "http://www.archive.org/robots.txt") val actual = mapper.readValue(e._3, classOf[Map[String, List[String]]]) expectedEntityMap.toStream.foreach(f => { assert(f._2 == actual.get(f._1.toString).get) }) } after { FileUtils.deleteDirectory(tempDir) LOG.info("Removing tmp files in " + tempDir.getPath) if (sc != null) { sc.stop() } } } diff --git a/src/test/scala/org/warcbase/spark/matchbox/ExtractLinksTest.scala b/src/test/scala/org/warcbase/spark/matchbox/ExtractLinksTest.scala index 1b3875f..024d9da 100644 --- a/src/test/scala/org/warcbase/spark/matchbox/ExtractLinksTest.scala +++ b/src/test/scala/org/warcbase/spark/matchbox/ExtractLinksTest.scala @@ -1,28 +1,44 @@ +/* + * Warcbase: an open-source platform for managing web archives + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.warcbase.spark.matchbox import org.junit.runner.RunWith import org.scalatest.FunSuite import org.scalatest.junit.JUnitRunner @RunWith(classOf[JUnitRunner]) class ExtractLinksTest extends FunSuite { test("simple") { val fragment: String = "Here is a search engine.\n" + "Here is Twitter.\n" val extracted: Seq[(String, String, String)] = ExtractLinks("", fragment) assert(extracted.size == 2) assert("http://www.google.com" == extracted.head._2) assert("a search engine" == extracted.head._3) assert("http://www.twitter.com/" == extracted.last._2) assert("Twitter" == extracted.last._3) } test("relative") { val fragment: String = "Here is a search engine.\n" + "Here is a relative URL.\n" val extracted: Seq[(String, String, String)] = ExtractLinks("", fragment, "http://www.foobar.org/index.html") assert(extracted.size == 2) assert("http://www.google.com" == extracted.head._2) assert("a search engine" == extracted.head._3) assert("http://www.foobar.org/page.html" == extracted.last._2) assert("a relative URL" == extracted.last._3) } } diff --git a/src/test/scala/org/warcbase/spark/matchbox/ExtractTopLevelDomainTest.scala b/src/test/scala/org/warcbase/spark/matchbox/ExtractTopLevelDomainTest.scala index e14d980..f02dcbf 100644 --- a/src/test/scala/org/warcbase/spark/matchbox/ExtractTopLevelDomainTest.scala +++ b/src/test/scala/org/warcbase/spark/matchbox/ExtractTopLevelDomainTest.scala @@ -1,30 +1,46 @@ +/* + * Warcbase: an open-source platform for managing web archives + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.warcbase.spark.matchbox import org.junit.runner.RunWith import org.scalatest.FunSuite import org.scalatest.junit.JUnitRunner @RunWith(classOf[JUnitRunner]) class ExtractTopLevelDomainTest extends FunSuite { private val data1: Seq[(String, String)] = Seq.newBuilder.+=( ("http://www.umiacs.umd.edu/~jimmylin/", "www.umiacs.umd.edu"), ("https://github.com/lintool", "github.com"), ("http://ianmilligan.ca/2015/05/04/iipc-2015-slides-for-warcs-wats-and-wgets-presentation/", "ianmilligan.ca"), ("index.html", null)).result() private val data2 = Seq.newBuilder.+=( ("index.html", "http://www.umiacs.umd.edu/~jimmylin/", "www.umiacs.umd.edu"), ("index.html", "lintool/", null)).result() test("simple") { data1.foreach { case (link, domain) => assert(ExtractTopLevelDomain(link) == domain) } } test("withBase") { data2.foreach { case (link, base, domain) => assert(ExtractTopLevelDomain(link, base) == domain) } } } diff --git a/src/test/scala/org/warcbase/spark/rdd/CountableRDDTest.scala b/src/test/scala/org/warcbase/spark/rdd/CountableRDDTest.scala index e5691c3..36bf7b2 100644 --- a/src/test/scala/org/warcbase/spark/rdd/CountableRDDTest.scala +++ b/src/test/scala/org/warcbase/spark/rdd/CountableRDDTest.scala @@ -1,45 +1,61 @@ +/* + * Warcbase: an open-source platform for managing web archives + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.warcbase.spark.rdd import com.google.common.io.Resources import org.apache.spark.{SparkConf, SparkContext} import org.junit.runner.RunWith import org.scalatest.junit.JUnitRunner import org.scalatest.{BeforeAndAfter, FunSuite} import org.warcbase.spark.matchbox._ import org.warcbase.spark.rdd.RecordRDD._ @RunWith(classOf[JUnitRunner]) class CountableRDDTest extends FunSuite with BeforeAndAfter { private val arcPath = Resources.getResource("arc/example.arc.gz").getPath private val master = "local[4]" private val appName = "example-spark" private var sc: SparkContext = _ before { val conf = new SparkConf() .setMaster(master) .setAppName(appName) sc = new SparkContext(conf) } test("count records") { val base = RecordLoader.loadArc(arcPath, sc) .keepValidPages() .map(r => ExtractTopLevelDomain(r.getUrl)) val r = base .map(r => (r, 1)) .reduceByKey(_ + _) .map(_.swap) .sortByKey(ascending = false) .map(_.swap) .collect() val r2 = base.countItems().collect() assert(r sameElements r2) } after { if (sc != null) { sc.stop() } } }