diff --git a/warcbase-core/src/main/java/org/warcbase/data/WarcRecordUtils.java b/warcbase-core/src/main/java/org/warcbase/data/WarcRecordUtils.java index c4c9c30..2106f6a 100644 --- a/warcbase-core/src/main/java/org/warcbase/data/WarcRecordUtils.java +++ b/warcbase-core/src/main/java/org/warcbase/data/WarcRecordUtils.java @@ -1,146 +1,152 @@ /* * Warcbase: an open-source platform for managing web archives * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.warcbase.data; import org.apache.commons.httpclient.HttpParser; import org.apache.log4j.Logger; import org.archive.io.warc.WARCConstants; import org.archive.io.warc.WARCReader; import org.archive.io.warc.WARCReaderFactory; import org.archive.io.warc.WARCRecord; import java.io.*; +import java.lang.Exception; import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; /** * Utilities for working with {@code WARCRecord}s (from archive.org APIs). */ public class WarcRecordUtils implements WARCConstants { private static final Logger LOG = Logger.getLogger(WarcRecordUtils.class); // TODO: these methods work fine, but there's a lot of unnecessary buffer copying, which is // terrible from a performance perspective. /** * Converts raw bytes into an {@code WARCRecord}. * * @param bytes raw bytes * @return parsed {@code WARCRecord} * @throws IOException */ public static WARCRecord fromBytes(byte[] bytes) throws IOException { WARCReader reader = (WARCReader) WARCReaderFactory.get("", new BufferedInputStream(new ByteArrayInputStream(bytes)), false); return (WARCRecord) reader.get(); } public static byte[] toBytes(WARCRecord record) throws IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream dout = new DataOutputStream(baos); dout.write("WARC/0.17\n".getBytes()); for (Map.Entry entry : record.getHeader().getHeaderFields().entrySet()) { dout.write((entry.getKey() + ": " + entry.getValue().toString() + "\n").getBytes()); } dout.write("\n".getBytes()); record.dump(dout); return baos.toByteArray(); } /** * Extracts the MIME type of WARC response records (i.e., "WARC-Type" is "response"). * Note that this is different from the "Content-Type" in the WARC header. * * @param contents raw contents of the WARC response record * @return MIME type */ public static String getWarcResponseMimeType(byte[] contents) { // This is a somewhat janky way to get the MIME type of the response. // Note that this is different from the "Content-Type" in the WARC header. Pattern pattern = Pattern.compile("Content-Type: ([^\\s]+)", Pattern.CASE_INSENSITIVE); Matcher matcher = pattern.matcher(new String(contents)); if (matcher.find()) { return matcher.group(1).replaceAll(";$", ""); } return null; } /** * Extracts raw contents from a {@code WARCRecord} (including HTTP headers). * * @param record the {@code WARCRecord} * @return raw contents * @throws IOException */ public static byte[] getContent(WARCRecord record) throws IOException { int len = (int) record.getHeader().getContentLength(); // If we have a corrupt record, quit and move on. if (len < 0) { return new byte[0]; } - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - DataOutputStream dout = new DataOutputStream(baos); - copyStream(record, len, true, dout); + try { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream dout = new DataOutputStream(baos); + copyStream(record, len, true, dout); - return baos.toByteArray(); + return baos.toByteArray(); + } catch (Exception e) { + // Catch exceptions related to any corrupt archive files. + return new byte[0]; + } } /** * Extracts contents of the body from a {@code WARCRecord} (excluding HTTP headers). * * @param record the {@code WARCRecord} * @return contents of the body * @throws IOException */ public static byte[] getBodyContent(WARCRecord record) throws IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); String line = HttpParser.readLine(record, WARC_HEADER_ENCODING); if (line == null) { return null; } // Just using parseHeaders to move down input stream to body HttpParser.parseHeaders(record, WARC_HEADER_ENCODING); record.dump(baos); return baos.toByteArray(); } private static long copyStream(final InputStream is, final int recordLength, boolean enforceLength, final DataOutputStream out) throws IOException { byte [] scratchbuffer = new byte[recordLength]; int read = 0; long tot = 0; while ((tot < recordLength) && (read = is.read(scratchbuffer)) != -1) { int write = read; // never write more than enforced length write = (int) Math.min(write, recordLength - tot); tot += read; out.write(scratchbuffer, 0, write); } if (enforceLength && tot != recordLength) { LOG.error("Read " + tot + " bytes but expected " + recordLength + " bytes. Continuing..."); } return tot; } } diff --git a/warcbase-core/src/main/java/org/warcbase/mapreduce/WacGenericInputFormat.java b/warcbase-core/src/main/java/org/warcbase/mapreduce/WacGenericInputFormat.java index 2553a40..5687eab 100644 --- a/warcbase-core/src/main/java/org/warcbase/mapreduce/WacGenericInputFormat.java +++ b/warcbase-core/src/main/java/org/warcbase/mapreduce/WacGenericInputFormat.java @@ -1,161 +1,168 @@ /* * Warcbase: an open-source platform for managing web archives * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.warcbase.mapreduce; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Seekable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.archive.io.ArchiveReader; import org.archive.io.ArchiveReaderFactory; import org.archive.io.ArchiveRecord; import org.archive.io.arc.ARCReader; import org.archive.io.arc.ARCReaderFactory.CompressedARCReader; import org.archive.io.warc.WARCReader; import org.archive.io.warc.WARCReaderFactory.CompressedWARCReader; import org.warcbase.io.GenericArchiveRecordWritable; import org.warcbase.io.GenericArchiveRecordWritable.ArchiveFormat; import java.io.BufferedInputStream; import java.io.IOException; +import java.lang.Exception; import java.util.Iterator; public class WacGenericInputFormat extends FileInputFormat { @Override public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { return new GenericArchiveRecordReader(); } @Override protected boolean isSplitable(JobContext context, Path filename) { return false; } public class GenericArchiveRecordReader extends RecordReader { private ArchiveReader reader; private ArchiveFormat format; private long start; private long pos; private long end; private LongWritable key = null; private GenericArchiveRecordWritable value = null; private Seekable filePosition; private Iterator iter; @Override public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException { FileSplit split = (FileSplit) genericSplit; Configuration job = context.getConfiguration(); start = split.getStart(); end = start + split.getLength(); final Path file = split.getPath(); FileSystem fs = file.getFileSystem(job); FSDataInputStream fileIn = fs.open(split.getPath()); reader = ArchiveReaderFactory.get(split.getPath().toString(), new BufferedInputStream(fileIn), true); if (reader instanceof ARCReader) { format = ArchiveFormat.ARC; iter = reader.iterator(); } if (reader instanceof WARCReader) { format = ArchiveFormat.WARC; iter = reader.iterator(); } this.pos = start; } private boolean isCompressedInput() { if (format == ArchiveFormat.ARC) { return reader instanceof CompressedARCReader; } else { return reader instanceof CompressedWARCReader; } } private long getFilePosition() throws IOException { long retVal; if (isCompressedInput() && null != filePosition) { retVal = filePosition.getPos(); } else { retVal = pos; } return retVal; } @Override public boolean nextKeyValue() throws IOException { if (!iter.hasNext()) { return false; } if (key == null) { key = new LongWritable(); } key.set(pos); - ArchiveRecord record = iter.next(); + ArchiveRecord record = null; + try { + record = iter.next(); + } catch (Exception e) { + return false; + } + if (record == null) { return false; } if (value == null) { value = new GenericArchiveRecordWritable(); } value.setRecord(record); return true; } @Override public LongWritable getCurrentKey() { return key; } @Override public GenericArchiveRecordWritable getCurrentValue() { return value; } @Override public float getProgress() throws IOException { if (start == end) { return 0.0f; } else { return Math.min(1.0f, (getFilePosition() - start) / (float) (end - start)); } } @Override public synchronized void close() throws IOException { reader.close(); } } } \ No newline at end of file