diff --git a/src/main/java/org/warcbase/ingest/IngestFiles.java b/src/main/java/org/warcbase/ingest/IngestFiles.java index a64fa29..6049b35 100755 --- a/src/main/java/org/warcbase/ingest/IngestFiles.java +++ b/src/main/java/org/warcbase/ingest/IngestFiles.java @@ -1,418 +1,418 @@ package org.warcbase.ingest; import java.io.BufferedInputStream; import java.io.ByteArrayOutputStream; import java.io.DataOutput; import java.io.DataOutputStream; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.util.Iterator; import java.util.zip.GZIPInputStream; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.OptionBuilder; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; import org.apache.commons.io.IOUtils; import org.apache.log4j.Logger; import org.archive.io.ArchiveRecord; import org.archive.io.arc.ARCReader; import org.archive.io.arc.ARCReaderFactory; import org.archive.io.arc.ARCRecord; import org.archive.io.arc.ARCRecordMetaData; import org.archive.io.arc.ARCWriter; import org.archive.util.ArchiveUtils; import org.jwat.arc.ArcReaderFactory; import org.jwat.common.ByteCountingPushBackInputStream; import org.jwat.common.HttpHeader; import org.jwat.common.Payload; import org.jwat.common.UriProfile; import org.jwat.warc.WarcReader; import org.jwat.warc.WarcReaderFactory; import org.jwat.warc.WarcRecord; import org.warcbase.data.HbaseManager; import org.warcbase.data.UrlUtil; public class IngestFiles { private static final String CREATE_OPTION = "create"; private static final String APPEND_OPTION = "append"; private static final String NAME_OPTION = "name"; private static final String DIR_OPTION = "dir"; private static final String START_OPTION = "start"; private static final Logger LOG = Logger.getLogger(IngestFiles.class); // TODO: rename to constants and make final private static final UriProfile uriProfile = UriProfile.RFC3986_ABS_16BIT_LAX; private static final boolean bBlockDigestEnabled = true; private static final boolean bPayloadDigestEnabled = true; private static final int recordHeaderMaxSize = 8192; private static final int payloadHeaderMaxSize = 32768; public static final int MAX_CONTENT_SIZE = 1024 * 1024; private int cnt = 0; private int skipped = 0; private final HbaseManager hbaseManager; public IngestFiles(String name, boolean create) throws Exception { hbaseManager = new HbaseManager(name, create); //hbaseManager = null; } protected final byte [] scratchbuffer = new byte[4 * 1024]; protected long copyFrom(final InputStream is, final long recordLength, boolean enforceLength, DataOutputStream out) throws IOException { int read = scratchbuffer.length; long tot = 0; while ((tot < recordLength) && (read = is.read(scratchbuffer)) != -1) { int write = read; // never write more than enforced length write = (int) Math.min(write, recordLength - tot); tot += read; out.write(scratchbuffer, 0, write); } if (enforceLength && tot != recordLength) { // throw exception if desired for read vs. declared mismatches throw new IOException("Read " + tot + " but expected " + recordLength); } return tot; } private void ingestArcFile(File inputArcFile) throws Exception { //ArcRecordBase record = null; String url = null; String date = null; String type = null; String key = null; byte[] content = null; InputStream in = null; //ArcReader reader = null; ARCReader reader = ARCReaderFactory.get(inputArcFile); boolean firstRecord = true; ARCWriter writer = null; for (Iterator ii = reader.iterator(); ii.hasNext();) { ARCRecord r = (ARCRecord)ii.next(); // We're to dump the arc on stdout. // Get the first record's data if any. ARCRecordMetaData meta = r.getMetaData(); if (firstRecord) { firstRecord = false; // Get an ARCWriter. ByteArrayOutputStream baos = new ByteArrayOutputStream(r.available()); // This is slow but done only once at top of ARC. while (r.available() > 0) { baos.write(r.read()); } // List listOfMetadata = new ArrayList(); // listOfMetadata.add(baos.toString(WriterPoolMember.UTF8)); // // Assume getArc returns full path to file. ARCWriter // // or new File will complain if it is otherwise. // List outDirs = new ArrayList(); // WriterPoolSettingsData settings = // new WriterPoolSettingsData("","",-1L,false,outDirs,listOfMetadata); // writer = new ARCWriter(new AtomicInteger(), System.out, // new File(meta.getArc()), settings); continue; } String metaline = meta.getUrl() + " " + meta.getIp() + " " + meta.getDate() + " " + meta.getMimetype() + " " + (int)meta.getLength(); //date = ArchiveUtils.parse14DigitDate(meta.getDate()).getTime() + ""; date = meta.getDate(); ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream dout = new DataOutputStream(baos); dout.write(metaline.getBytes()); dout.write("\n".getBytes()); copyFrom(r, (int)meta.getLength(), true, dout); - System.out.println("-----------"); - System.out.println(metaline); + // System.out.println("-----------"); + //System.out.println(metaline); // System.out.println(new String(bytes, "UTF8")); // writer.write(meta.getUrl(), meta.getMimetype(), meta.getIp(), // ArchiveUtils.parse14DigitDate(meta.getDate()).getTime(), // (int)meta.getLength(), r); key = UrlUtil.urlToKey(meta.getUrl()); type = meta.getMimetype(); if (key != null && type == null) { type = "text/plain"; } if (key == null) { continue; } - if (baos.toByteArray().length > MAX_CONTENT_SIZE) { - skipped++; - } else { - System.out.println(key + " " + type + " " + date); + // if (baos.toByteArray().length > MAX_CONTENT_SIZE) { + // skipped++; + // } else { + //System.out.println(key + " " + type + " " + date); if (hbaseManager.addRecord(key, date, baos.toByteArray(), type)) { cnt++; } else { skipped++; } - } + // } if (cnt % 10000 == 0 && cnt > 0) { LOG.info("Ingested " + cnt + " records into Hbase."); } } // for (Iterator ii = reader.iterator(); ii.hasNext();) { // ARCRecord r = (ARCRecord) ii.next(); // ARCRecordMetaData meta = r.getMetaData(); // // ByteArrayOutputStream os = new ByteArrayOutputStream(); // PrintStream ps = new PrintStream(os); // ... // String output = os.toString("UTF8"); // // // Swriter.write(meta.getUrl(), meta.getMimetype(), meta.getIp(), // ArchiveUtils.parse14DigitDate(meta.getDate()).getTime(), // (int)meta.getLength(), r); // // } // reader.close(); // try { // // Per file trapping of exceptions. // in = new FileInputStream(inputArcFile); // reader = ArcReaderFactory.getReader(in); // // while ((record = reader.getNextRecord()) != null) { // try { // // Per record trapping of exceptions. // url = record.getUrlStr(); // date = record.getArchiveDateStr(); // // try { // // This is prone to OOM errors when the underlying file is corrupt. // content = IOUtils.toByteArray(record.getPayloadContent()); // } catch (OutOfMemoryError e) { // // Yes, kinda sketchy... but try to move on. // return; // } // // key = UrlUtil.urlToKey(url); // type = record.getContentTypeStr(); // // if (key != null && type == null) { // type = "text/plain"; // } // // if (key == null) { // continue; // } // // if (content.length > MAX_CONTENT_SIZE) { // skipped++; // } else { // if (hbaseManager.addRecord(key, date, content, type)) { // cnt++; // } else { // skipped++; // } // } // // if (cnt % 10000 == 0 && cnt > 0) { // LOG.info("Ingested " + cnt + " records into Hbase."); // } // } catch (Exception e) { // LOG.error("Error ingesting record: " + e); // } // } // } catch (Exception e) { // LOG.error("Error ingesting file: " + inputArcFile); // } finally { // if (reader != null) // reader.close(); // if (in != null) // in.close(); // } } private void ingestWarcFile(File inputWarcFile) throws IOException { WarcRecord warcRecord = null; String uri = null; String date = null; String type = null; byte[] content = null; String key = null; GZIPInputStream gzInputStream = new GZIPInputStream(new FileInputStream(inputWarcFile)); ByteCountingPushBackInputStream pbin = new ByteCountingPushBackInputStream( new BufferedInputStream(gzInputStream, 8192), 32); WarcReader warcReader = WarcReaderFactory.getReaderUncompressed(pbin); if (warcReader == null) { LOG.info("Can't read warc file " + inputWarcFile.getName()); return; } warcReader.setWarcTargetUriProfile(uriProfile); warcReader.setBlockDigestEnabled(bBlockDigestEnabled); warcReader.setPayloadDigestEnabled(bPayloadDigestEnabled); warcReader.setRecordHeaderMaxSize(recordHeaderMaxSize); warcReader.setPayloadHeaderMaxSize(payloadHeaderMaxSize); while ((warcRecord = warcReader.getNextRecord()) != null) { uri = warcRecord.header.warcTargetUriStr; key = UrlUtil.urlToKey(uri); Payload payload = warcRecord.getPayload(); HttpHeader httpHeader = null; InputStream payloadStream = null; if (payload == null) { continue; } httpHeader = warcRecord.getHttpHeader(); if (httpHeader != null) { payloadStream = httpHeader.getPayloadInputStream(); type = httpHeader.contentType; } else { payloadStream = payload.getInputStreamComplete(); } if (payloadStream == null) { skipped++; continue; } date = warcRecord.header.warcDateStr; if (payloadStream.available() > MAX_CONTENT_SIZE) { skipped++; continue; } content = IOUtils.toByteArray(payloadStream); // TODO: fix this if (key == null) { skipped++; continue; } if (type == null) { type = "text/plain"; } if (warcRecord.getHeader("WARC-Type").value.toLowerCase().equals("response")) { if (content.length > MAX_CONTENT_SIZE) { skipped++; continue; } if (cnt % 10000 == 0 && cnt > 0) { LOG.info("Ingested " + cnt + " records into Hbase."); } if (hbaseManager.addRecord(key, date, content, type)) { cnt++; } else { skipped++; } } } warcReader.close(); pbin.close(); gzInputStream.close(); } private void ingestFolder(File inputFolder, int i) throws Exception { long startTime = System.currentTimeMillis(); cnt = 0; skipped = 0; GZIPInputStream gzInputStream = null; for (; i < inputFolder.listFiles().length; i++) { File inputFile = inputFolder.listFiles()[i]; if (!(inputFile.getName().endsWith(".warc.gz") || inputFile.getName().endsWith(".arc.gz") || inputFile.getName().endsWith(".warc") || inputFile.getName().endsWith(".arc"))) { continue; } LOG.info("processing file " + i + ": " + inputFile.getName()); if (inputFile.toString().toLowerCase().endsWith(".gz")) { gzInputStream = new GZIPInputStream(new FileInputStream(inputFile)); ByteCountingPushBackInputStream in = new ByteCountingPushBackInputStream(gzInputStream, 32); if (ArcReaderFactory.isArcFile(in)) { ingestArcFile(inputFile); } else if (WarcReaderFactory.isWarcFile(in)) { ingestWarcFile(inputFile); } } } long totalTime = System.currentTimeMillis() - startTime; LOG.info("Total " + cnt + " records inserted, " + skipped + " records skipped"); LOG.info("Total time: " + totalTime + "ms"); LOG.info("Ingest rate: " + cnt / (totalTime / 1000) + " records per second."); } @SuppressWarnings("static-access") public static void main(String[] args) throws Exception { Options options = new Options(); options.addOption(OptionBuilder.withArgName("name").hasArg() .withDescription("name of the archive").create(NAME_OPTION)); options.addOption(OptionBuilder.withArgName("path").hasArg() .withDescription("WARC files location").create(DIR_OPTION)); options.addOption(OptionBuilder.withArgName("n").hasArg() .withDescription("Start from the n-th WARC file").create(START_OPTION)); options.addOption("create", false, "create new table"); options.addOption("append", false, "append to existing table"); CommandLine cmdline = null; CommandLineParser parser = new GnuParser(); try { cmdline = parser.parse(options, args); } catch (ParseException exp) { System.err.println("Error parsing command line: " + exp.getMessage()); System.exit(-1); } if (!cmdline.hasOption(DIR_OPTION) || !cmdline.hasOption(NAME_OPTION)) { HelpFormatter formatter = new HelpFormatter(); formatter.printHelp(IngestFiles.class.getCanonicalName(), options); System.exit(-1); } if (!cmdline.hasOption(CREATE_OPTION) && !cmdline.hasOption(APPEND_OPTION)) { System.err.println(String.format("Must specify either -%s or -%s", CREATE_OPTION, APPEND_OPTION)); HelpFormatter formatter = new HelpFormatter(); formatter.printHelp(IngestFiles.class.getCanonicalName(), options); System.exit(-1); } String path = cmdline.getOptionValue(DIR_OPTION); File inputFolder = new File(path); int i = 0; if (cmdline.hasOption(START_OPTION)) { i = Integer.parseInt(cmdline.getOptionValue(START_OPTION)); } String name = cmdline.getOptionValue(NAME_OPTION); boolean create = cmdline.hasOption(CREATE_OPTION); IngestFiles load = new IngestFiles(name, create); load.ingestFolder(inputFolder, i); } }