diff --git a/pom.xml b/pom.xml index eb38d7a..99a9e19 100644 --- a/pom.xml +++ b/pom.xml @@ -1,270 +1,270 @@ 4.0.0 org.warcbase warcbase jar 0.1.0-SNAPSHOT warcbase WARC + HBase http://warcbase.org/ The Apache Software License, Version 2.0 http://www.apache.org/licenses/LICENSE-2.0.txt repo scm:git:git@github.com:lintool/warcbase.git scm:git:git@github.com:lintool/warcbase.git git@github.com:lintool/warcbase.git lintool Jimmy Lin jimmylin@umd.edu milad621 Milad Gholami mgholami@cs.umd.edu org.sonatype.oss oss-parent 7 UTF-8 UTF-8 8.1.11.v20130520 2.0.0-mr1-cdh4.3.0 2.0.0-cdh4.3.0 3.4.5-cdh4.3.0 org.codehaus.mojo appassembler-maven-plugin 1.3.1 - org.warcbase.LoadWARC - LoadWARC + org.warcbase.ingest.IngestWarcFiles + IngestWarcFiles org.warcbase.WarcBrowser WarcBrowser org.warcbase.testHbase testHbase org.warcbase.PrintURLS PrintURLS org.warcbase.Dashboard Dashboard org.warcbase.DuplicatesHbase DuplicatesHbase org.mortbay.jetty jetty-maven-plugin ${jettyVersion} junit junit 4.11 test com.google.guava guava 14.0.1 org.apache.hbase hbase slf4j-api slf4j-api org.slf4j slf4j-api org.slf4j slf4j-api-1.4.3 jsp-api jsp-api org.mortbay.jetty jsp-api org.mortbay.jetty jsp-api-2.1 org.mortbay.jetty servlet-api-2.5 org.mortbay.jetty servlet-api servlet-api servlet-api org.apache.hadoop hadoop-core ${hadoop.version} javax.servlet servlet-api org.apache.hadoop hadoop-common ${hadoop.version2} javax.servlet servlet-api org.apache.zookeeper zookeeper ${zookeeper.version} org.archive.heritrix heritrix-commons 3.1.2-SNAPSHOT org.archive.wayback wayback-core 1.7.0 org.eclipse.jetty jetty-server 8.1.11.v20130520 org.eclipse.jetty jetty-webapp ${jettyVersion} true org.slf4j slf4j-log4j12 1.6.4 internetarchive Internet Archive Maven Repository http://builds.archive.org:8080/maven2 cloudera https://repository.cloudera.com/artifactory/cloudera-repos/ org.apache.hbase hbase 0.94.6-cdh4.3.0 diff --git a/src/main/java/org/warcbase/Constants.java b/src/main/java/org/warcbase/Constants.java index e1023e9..3eff54a 100644 --- a/src/main/java/org/warcbase/Constants.java +++ b/src/main/java/org/warcbase/Constants.java @@ -1,41 +1,41 @@ package org.warcbase; //import iu.pti.hbaseapp.clueweb09.HTMLTextParser; import org.apache.hadoop.hbase.util.Bytes; //import org.apache.lucene.analysis.Analyzer; //import org.apache.lucene.analysis.standard.StandardAnalyzer; //import org.apache.lucene.util.Version; public class Constants { public static final String TABLE_NAME = "Web_Archive"; //public static final String[] FAMILYS = {"date", "content"}; - public static final String[] FAMILYS = {"File"}; + public static final String[] FAMILIES = {"File"}; public static final String CF_COUNTS = "counts"; public static final byte[] CF_COUNTS_BYTES = Bytes.toBytes(CF_COUNTS); // Specially for the ClueWeb09 data set: public static final String CLUEWEB09_DATA_TABLE_NAME = "clueWeb09DataTable"; public static final String WORD_COUNT_TABLE_NAME = "WordCountTable"; public static final byte[] CW09_DATA_TABLE_BYTES = Bytes.toBytes(CLUEWEB09_DATA_TABLE_NAME); public static final byte[] WORD_COUNT_TABLE_BYTES = Bytes.toBytes(WORD_COUNT_TABLE_NAME); public static final String CF_DETAILS = "details"; public static final byte[] CF_DETAILS_BYTES = Bytes.toBytes(CF_DETAILS); public static final String QUALIFIER_URI = "URI"; public static final String QUALIFIER_CONTENT = "content"; public static final byte[] QUAL_CONTENT_BYTES = Bytes.toBytes(QUALIFIER_CONTENT); public static final byte[] QUAL_URI_BYTES = Bytes.toBytes(QUALIFIER_URI); //public static final Analyzer analyzer = new StandardAnalyzer(Version.LUCENE_31); //public static final HTMLTextParser txtExtractor = new HTMLTextParser(); // data types public enum DataType {INT, STRING, DOUBLE, UNKNOWN}; } \ No newline at end of file diff --git a/src/main/java/org/warcbase/Dashboard.java b/src/main/java/org/warcbase/Dashboard.java index 0145364..d11b0d4 100644 --- a/src/main/java/org/warcbase/Dashboard.java +++ b/src/main/java/org/warcbase/Dashboard.java @@ -1,169 +1,170 @@ package org.warcbase; import java.io.IOException; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.SortedSet; import java.util.TreeSet; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.OptionBuilder; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; import org.apache.hadoop.hbase.mapreduce.RowCounter; +import org.warcbase.ingest.IngestWarcFiles; public class Dashboard { private static final String N_OPTION = "n"; public static Configuration hbaseConfig = null; public static HTable table = null; static { hbaseConfig = HBaseConfiguration.create(); } public static String getFileType(String url){ //System.out.println(url); if(url.length() > 0 && url.charAt(url.length() - 1) == '/') return ""; String[] splits = url.split("\\/"); if(splits.length == 0) return ""; splits = splits[splits.length - 1].split("\\."); //System.out.println(splits.length); if(splits.length <= 1) return ""; String type = splits[splits.length - 1]; if(type.length() > 8) return ""; if(type.length() == 1 && Character.isDigit(type.charAt(0))) return ""; return type; } public static String getDomain(String url){ String[] splits = url.split("\\/"); return splits[0]; } public static Map sortByValue(HashMap map) { List> list = new LinkedList>(map.entrySet()); Collections.sort(list, new Comparator>() { public int compare(Map.Entry m1, Map.Entry m2) { return (m2.getValue()).compareTo(m1.getValue()); } }); Map result = new LinkedHashMap(); for (Map.Entry entry : list) { result.put(entry.getKey(), entry.getValue()); } return result; } public static void main(String[] args) throws IOException { /*String testString = "com.89north.www/wp-content/plugins/jquery-drop-down-menu-plugin/noConflict.js?ver=3.5.1"; System.out.println(getDomain(testString)); if(true) return;*/ Options options = new Options(); options.addOption(OptionBuilder.withArgName("n").hasArg() .withDescription("Num").create(N_OPTION)); CommandLine cmdline = null; CommandLineParser parser = new GnuParser(); try { cmdline = parser.parse(options, args); } catch (ParseException exp) { System.err.println("Error parsing command line: " + exp.getMessage()); System.exit(-1); } if (!cmdline.hasOption(N_OPTION)) { HelpFormatter formatter = new HelpFormatter(); - formatter.printHelp(LoadWARC.class.getCanonicalName(), options); + formatter.printHelp(IngestWarcFiles.class.getCanonicalName(), options); System.exit(-1); } int num = Integer.parseInt(cmdline.getOptionValue(N_OPTION)); int count = 0; try { table = new HTable(hbaseConfig, Constants.TABLE_NAME); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("scanning full table:"); Scan scan = new Scan(); scan.setFilter(new FirstKeyOnlyFilter()); ResultScanner scanner = null; try { scanner = table.getScanner(scan); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } //RowCounter rCounter = new RowCounter(); //rCounter.se HashMap fileTypeCounter = new HashMap(); HashMap domainCounter = new HashMap(); for (Result rr = scanner.next(); rr != null; rr = scanner.next()) { byte[] key = rr.getRow(); String url = new String(key, "UTF8"); count++; String domain = getDomain(url); if(domainCounter.containsKey(domain)) domainCounter.put(domain, domainCounter.get(domain) + 1); else domainCounter.put(domain, 1); String fileType = getFileType(url); if(fileType.equals("")) continue; if(fileTypeCounter.containsKey(fileType)) fileTypeCounter.put(fileType, fileTypeCounter.get(fileType) + 1); else fileTypeCounter.put(fileType, 1); //System.out.println(new String(key, "UTF8") + " " + getFileType(url)); } System.out.println("Number of rows in the table overall: " + count); System.out.println("\nBreakdown by file type:"); Map sortedMap = sortByValue(fileTypeCounter); int i = 0; for(Map.Entry entry: sortedMap.entrySet()){ System.out.println(entry.getKey() + " " + entry.getValue()); if(i > num) break; i++; } System.out.println("\nBreakdown by domain:"); Map sortedDomain = sortByValue(domainCounter); i = 0; for(Map.Entry entry: sortedDomain.entrySet()){ System.out.println(entry.getKey() + " " + entry.getValue()); if(i > num) break; i++; } } } diff --git a/src/main/java/org/warcbase/Fetch.java b/src/main/java/org/warcbase/Fetch.java index 29ed317..233ca61 100755 --- a/src/main/java/org/warcbase/Fetch.java +++ b/src/main/java/org/warcbase/Fetch.java @@ -1,133 +1,134 @@ package org.warcbase; import java.io.*; import java.net.URL; import java.net.MalformedURLException; import java.nio.ByteBuffer; import java.nio.CharBuffer; import java.nio.charset.CharacterCodingException; import java.nio.charset.Charset; import java.nio.charset.CharsetDecoder; import java.util.ArrayList; import java.util.Collections; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; +import org.warcbase.ingest.IngestWarcFiles; public class Fetch { /** Write a byte array to the given file. Writing binary data is significantly simpler than reading it. */ public static void write(byte[] aInput, String aOutputFileName){ try { OutputStream output = null; try { output = new BufferedOutputStream(new FileOutputStream(aOutputFileName)); output.write(aInput); } finally { output.close(); } } catch(FileNotFoundException ex){ } catch(IOException ex){ } } public static boolean isPureAscii(byte bytearray []) { CharsetDecoder d = Charset.forName("US-ASCII").newDecoder(); try { CharBuffer r = d.decode(ByteBuffer.wrap(bytearray)); r.toString(); } catch(CharacterCodingException e) { return false; } return true; } public static void main(String[] args) throws IOException { if(args.length < 1){ System.err.println("Should have at least one argument."); return; } String query = args[0]; /*String date = null; if(args.length > 1){ date = args[1]; }*/ //System.out.println(query); //if(true) //return; Configuration config = HBaseConfiguration.create(); - HTable table = new HTable(LoadWARC.hbaseConfig, Constants.TABLE_NAME); + HTable table = new HTable(config, Constants.TABLE_NAME); //String query = "com.nytimes.topics/top/reference/timestopics/subjects/a/agriculture/urban_agriculture/index.html/Microsoft.XMLHTTP?offset=10&s=newest&query=ANTIPOVERTY+PROGRAMS&field=des&match=exact"; //String query = "http://topics.nytimes.com/top/reference/timestopics/subjects/a/agriculture/urban_agriculture/index.html/2.1_120516.0?query=OAKLAND%20(CALIF)&field=geo&match=exact"; //String query = "http://www.boxer.senate.gov/"; //String query = "http://www.boxer.senate.gov/"; - query = Util.reverse_hostname(query); + query = Util.reverseHostname(query); Get get = new Get(Bytes.toBytes(query)); Result rs = table.get(get); if(rs.raw().length == 0){ System.out.println("Not found."); return; } byte[] data = rs.raw()[0].getValue(); String content = new String(rs.raw()[0].getValue()); System.out.println(data.length); System.out.println(content); Fetch.write(rs.raw()[0].getValue(), "pic.jpg"); System.out.println(Fetch.isPureAscii(rs.raw()[0].getValue())); FileOutputStream fileOut = new FileOutputStream("test.jpeg"); DataOutputStream os = new DataOutputStream(fileOut); os.write(data); os.close(); if(true) return; while(content.charAt(0) != '<' && content.contains("\n")) content = content.substring(content.indexOf('\n')+1); TextDocument2 t2 = new TextDocument2(null, null, null); System.out.println(t2.fixURLs(content, "http://www.boxer.senate.gov/", "2013-02-12T20:42:22Z")); if(true) return; //ArrayList dates = new ArrayList(rs.raw().length); ArrayList dates = new ArrayList(10); for(int i=0;i get_headers(String doc) { - HashMap hdr = new HashMap(20); - try { - BufferedReader in = new BufferedReader(new StringReader(doc)); - int nl = 0; - String line = null; - while ((line = in.readLine()) != null) { - if (line.length() == 0) - nl++; - if (nl == 2) - break; - int i = line.indexOf(':'); - if (i == -1) - continue; - try { - hdr.put(line.substring(0, i), line.substring(i+2)); - } catch (Exception e) {} - } - StringBuilder buf = new StringBuilder(); - while ((line = in.readLine()) != null) { - buf.append(line).append('\n'); - } - hdr.put("document", buf.toString()); - } catch (IOException e) {} - return hdr; - } - - public static void creatTable(){ - HBaseAdmin admin = null; - try { - admin = new HBaseAdmin(hbaseConfig); - } catch (MasterNotRunningException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } catch (ZooKeeperConnectionException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - try { - if (admin.tableExists(Constants.TABLE_NAME)) { - System.out.println("table already exists!"); - } else { - HTableDescriptor tableDesc = new HTableDescriptor(Constants.TABLE_NAME); - for (int i = 0; i < Constants.FAMILYS.length; i++) { - tableDesc.addFamily(new HColumnDescriptor(Constants.FAMILYS[i])); - } - admin.createTable(tableDesc); - System.out.println("create table " + Constants.TABLE_NAME + " ok."); - } - } catch (IOException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - } - - public static void addRecord(String key, String date, byte[] data){ - if(table == null){ - try { - table = new HTable(hbaseConfig, Constants.TABLE_NAME); - } catch (IOException e) { - System.out.println("addRecord exception: error in setting table"); - // TODO Auto-generated catch block - e.printStackTrace(); - } - } - Put put = new Put(Bytes.toBytes(key)); - put.add(Bytes.toBytes(Constants.FAMILYS[0]), Bytes.toBytes(date), data); - //put.add(Bytes.toBytes(Constants.FAMILYS[1]), Bytes.toBytes(""), Bytes.toBytes(content)); - try { - table.put(put); - } catch (IOException e) { - System.out.println("addRecord exception: Couldn't insert key: " + key); - System.out.println("File Size: " + data.length); - // TODO Auto-generated catch block - e.printStackTrace(); - } - //System.out.println("insert recored " + key + " to table " - // + Constants.TABLE_NAME + " ok."); - } - - /** - * @param args - */ - public static void main(String[] args) { - Options options = new Options(); - options.addOption(OptionBuilder.withArgName("dir").hasArg() - .withDescription("WARC files location").create(DIR_OPTION)); - options.addOption(OptionBuilder.withArgName("start").hasArg() - .withDescription("Start from WARC file").create(START_OPTION)); - - CommandLine cmdline = null; - CommandLineParser parser = new GnuParser(); - try { - cmdline = parser.parse(options, args); - } catch (ParseException exp) { - System.err.println("Error parsing command line: " + exp.getMessage()); - System.exit(-1); - } - - if (!cmdline.hasOption(DIR_OPTION)) { - HelpFormatter formatter = new HelpFormatter(); - formatter.printHelp(LoadWARC.class.getCanonicalName(), options); - System.exit(-1); - } - String path = cmdline.getOptionValue(DIR_OPTION); - //System.out.println(path); - //if(true) - //return; - - //Create hbase table - creatTable(); - if(table == null){ - try { - table = new HTable(hbaseConfig, Constants.TABLE_NAME); - } catch (IOException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - } - - Field maxKeyValueSizeField = null; - try { - maxKeyValueSizeField = HTable.class.getDeclaredField("maxKeyValueSize"); - } catch (SecurityException e1) { - // TODO Auto-generated catch block - e1.printStackTrace(); - } catch (NoSuchFieldException e1) { - // TODO Auto-generated catch block - e1.printStackTrace(); - } - maxKeyValueSizeField.setAccessible(true); - try { - maxKeyValueSizeField.set(table, 0); - } catch (IllegalArgumentException e3) { - // TODO Auto-generated catch block - e3.printStackTrace(); - } catch (IllegalAccessException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - try { - System.out.println(maxKeyValueSizeField.get(table)); - } catch (IllegalArgumentException e1) { - // TODO Auto-generated catch block - e1.printStackTrace(); - } catch (IllegalAccessException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - - File inputWarcFolder = new File(path); - //File inputWarcFolder = new File("/Users/milad/UMD/CL/US_Congress/archive/partner.archive-it.org/cgi-bin/getarcs.pl"); - //File inputWarcFolder = new File("/scratch0/milad/partner.archive-it.org/cgi-bin/getarcs.pl"); - //String inputWarcFile = "/Users/milad/workspace/java/Senate/ARCHIVEIT-3395-NONE-KQSJEZ-20121203173122-00012-wbgrp-crawl068.us.archive.org-6680.warc.gz"; - //File inputWarcFile = new File("/Users/milad/UMD/CL/US_Congress/archive/partner.archive-it.org/cgi-bin/getarcs.pl/ARCHIVEIT-3566-DAILY-13015-20130222204213709-00000-wbgrp-crawl062.us.archive.org-6443.warc.gz"); - //for(File inputWarcFile: inputWarcFolder.listFiles()){ - System.out.println(inputWarcFolder.listFiles().length + " Files in total."); - int i = 0; - if(cmdline.hasOption(START_OPTION)) { - i = Integer.parseInt(cmdline.getOptionValue(START_OPTION)); - } - - for(;i 3 && thisTargetURI.substring(thisTargetURI.length() - 3, thisTargetURI.length()).equals("mp3")) - continue; - //if(thisTargetURI.length() > 3 && thisTargetURI.substring(thisTargetURI.length() - 3, thisTargetURI.length()).equals("flv")) - //continue; - if(thisTargetURI.length() > 3 && thisTargetURI.substring(thisTargetURI.length() - 3, thisTargetURI.length()).equals("mov")) - continue; - if(thisTargetURI.length() > 3 && thisTargetURI.substring(thisTargetURI.length() - 3, thisTargetURI.length()).equals("wmv")) - continue; - if(thisTargetURI.length() > 3 && thisTargetURI.substring(thisTargetURI.length() - 3, thisTargetURI.length()).equals("mp4")) - continue; - if(thisTargetURI.length() > 3 && thisTargetURI.substring(thisTargetURI.length() - 3, thisTargetURI.length()).equals("MP4")) - continue; - String content = thisWarcRecord.getContentUTF8(); - content = thisWarcRecord.toString(); - parse = get_headers(content); - String key = Util.reverse_hostname(thisTargetURI); - if(key == null) - continue; - //System.out.println(key); - addRecord(key, parse.get("WARC-Date"), thisWarcRecord.getByteContent());//parse.get("document")); - } - } - } catch (IOException e2) { - // TODO Auto-generated catch block - System.out.println("exception2: " + thisTargetURI); - System.out.println(Util.reverse_hostname(thisTargetURI)); - e2.printStackTrace(); - } - catch(Exception e){ - System.out.println("exception: " + thisTargetURI); - System.out.println(parse.get("document").length()); - //System.out.println(thisTargetURI.substring(thisTargetURI.length() - 3, thisTargetURI.length()).equals("mp3")); - e.printStackTrace(); - } - - try { - inStream.close(); - } catch (IOException e2) { - // TODO Auto-generated catch block - e2.printStackTrace(); - } - - System.out.println("Added file: " + inputWarcFile.getName()); - } - - - } - -} diff --git a/src/main/java/org/warcbase/PrintURLS.java b/src/main/java/org/warcbase/PrintURLS.java index 7ab59ed..8132baa 100644 --- a/src/main/java/org/warcbase/PrintURLS.java +++ b/src/main/java/org/warcbase/PrintURLS.java @@ -1,115 +1,116 @@ package org.warcbase; import java.io.DataInputStream; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.io.PrintStream; import java.util.HashMap; import java.util.zip.GZIPInputStream; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.OptionBuilder; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; +import org.warcbase.ingest.IngestWarcFiles; public class PrintURLS { private static final String DIR_OPTION = "dir"; private static final String O_OPTION = "o"; public static void main(String[] args) { Options options = new Options(); options.addOption(OptionBuilder.withArgName("dir").hasArg() .withDescription("WARC files location").create(DIR_OPTION)); options.addOption(OptionBuilder.withArgName("o").hasArg() .withDescription("Output file name").create(O_OPTION)); CommandLine cmdline = null; CommandLineParser parser = new GnuParser(); try { cmdline = parser.parse(options, args); } catch (ParseException exp) { System.err.println("Error parsing command line: " + exp.getMessage()); System.exit(-1); } if (!cmdline.hasOption(DIR_OPTION) || !cmdline.hasOption(O_OPTION)) { HelpFormatter formatter = new HelpFormatter(); - formatter.printHelp(LoadWARC.class.getCanonicalName(), options); + formatter.printHelp(IngestWarcFiles.class.getCanonicalName(), options); System.exit(-1); } String path = cmdline.getOptionValue(DIR_OPTION); String outFile = cmdline.getOptionValue(O_OPTION); PrintStream out = null; try { out = new PrintStream(new FileOutputStream(outFile)); } catch (FileNotFoundException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } File inputWarcFolder = new File(path); for(int i = 0;i 0; i--) { if (i > 0) newhost.append(parts[i]).append("."); } newhost.append(parts[0]); int port = url.getPort(); if (port != -1) newhost.append(":").append(port); newhost.append(url.getFile()); return newhost.toString(); } public static void main(String[] args) { - System.out.println(Util.reverse_hostname("http://www.boxer.senate.gov/"));//http://www.ayotte.senate.gov/ + System.out.println(Util.reverseHostname("http://www.boxer.senate.gov/"));//http://www.ayotte.senate.gov/ } + + public static String getUriExtension(String thisTargetURI) { + if (thisTargetURI.length() > 3) { + return thisTargetURI.substring(thisTargetURI.length() - 3, thisTargetURI.length()); + } + + return ""; + } + } diff --git a/src/main/java/org/warcbase/WarcbaseServlet.java b/src/main/java/org/warcbase/WarcbaseServlet.java index 85da1e2..f7c7cb1 100644 --- a/src/main/java/org/warcbase/WarcbaseServlet.java +++ b/src/main/java/org/warcbase/WarcbaseServlet.java @@ -1,167 +1,171 @@ package org.warcbase; import java.io.*; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import javax.servlet.*; import javax.servlet.http.*; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.util.Bytes; +import org.warcbase.ingest.IngestWarcFiles; -public class WarcbaseServlet extends HttpServlet -{ - +public class WarcbaseServlet extends HttpServlet { + private static Configuration hbaseConfig = null; + static { + hbaseConfig = HBaseConfiguration.create(); + } + private void writeResponse(HttpServletResponse resp, byte[] data, String query, String d) throws IOException{ String content = new String(data, "UTF8"); //System.out.println(content); if(!warcRecordParser.getType(content).startsWith("text")){ resp.setHeader("Content-Type", ResponseRecord.getType(content)); resp.setContentLength(ResponseRecord.getBodyByte(data).length); resp.getOutputStream().write(ResponseRecord.getBodyByte(data)); //resp.getOutputStream().write(ResponseRecord.getBodyByte(data)); //IOUtils.write(ResponseRecord.getBodyByte(data), resp.getOutputStream()); } else{ System.setProperty("file.encoding", "UTF8"); //resp.setContentType("text/html;charset=UTF-8"); resp.setHeader("Content-Type", ResponseRecord.getType(content)); resp.setCharacterEncoding("UTF-8"); PrintWriter out = resp.getWriter(); //out. TextDocument2 t2 = new TextDocument2(null, null, null); String bodyContent = new String(ResponseRecord.getBodyByte(data), "UTF8"); bodyContent = t2.fixURLs(bodyContent, query, d); //System.out.println(bodyContent); out.println(bodyContent); //resp.getOutputStream().write(bodyContent.getBytes("UTF-8")); } } protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { String query = req.getParameter("query"); String d = req.getParameter("date"); - String q = Util.reverse_hostname(query); + String q = Util.reverseHostname(query); Configuration config = HBaseConfiguration.create(); - HTable table = new HTable(LoadWARC.hbaseConfig, Constants.TABLE_NAME); + HTable table = new HTable(hbaseConfig, Constants.TABLE_NAME); Get get = new Get(Bytes.toBytes(q)); Result rs = table.get(get); byte[] data = null; String imagePath = null; File imageFile = null; for(int i=1;i dates = new ArrayList(10); for(int i=0;i 0){//d < i data = rs.raw()[i - 1].getValue(); writeResponse(resp, data, query, d); /*String content = new String(rs.raw()[i - 1].getValue()); if(!warcRecordParser.getType(content).equals("text/html")){ resp.setHeader("Content-Type", "image/jpg"); resp.setContentLength(ResponseRecord.getBodyByte(data).length); resp.getOutputStream().write(ResponseRecord.getBodyByte(data)); } else{ PrintWriter out = resp.getWriter(); TextDocument2 t2 = new TextDocument2(null, null, null); out.println(t2.fixURLs(ResponseRecord.getBodyText(content), query, d)); } //out.close(); table.close();*/ return; } int i = dates.size(); data = rs.raw()[i - 1].getValue(); writeResponse(resp, data, query, d); /*String content = new String(rs.raw()[i - 1].getValue()); if(!warcRecordParser.getType(content).equals("text/html")){ resp.setHeader("Content-Type", "image/jpg"); resp.setContentLength(ResponseRecord.getBodyByte(data).length); resp.getOutputStream().write(ResponseRecord.getBodyByte(data)); } else{ PrintWriter out = resp.getWriter(); TextDocument2 t2 = new TextDocument2(null, null, null); out.println(t2.fixURLs(ResponseRecord.getBodyText(content), query, d)); } //out.close(); table.close();*/ return; //return; } PrintWriter out = resp.getWriter(); out.println(""); out.println(""); for(int i=0;i " + date + "");//Link text } out.println(""); out.println(""); table.close(); } protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { String field = req.getParameter("field"); PrintWriter out = resp.getWriter(); out.println(""); out.println(""); out.println("You entered \"" + field + "\" into the text box."); out.println(""); out.println(""); } } diff --git a/src/main/java/org/warcbase/ingest/IngestWarcFiles.java b/src/main/java/org/warcbase/ingest/IngestWarcFiles.java new file mode 100644 index 0000000..2001f9e --- /dev/null +++ b/src/main/java/org/warcbase/ingest/IngestWarcFiles.java @@ -0,0 +1,217 @@ +package org.warcbase.ingest; + +import java.io.BufferedReader; +import java.io.DataInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.StringReader; +import java.lang.reflect.Field; +import java.util.Map; +import java.util.Set; +import java.util.zip.GZIPInputStream; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.GnuParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.log4j.Logger; +import org.warcbase.Constants; +import org.warcbase.Util; +import org.warcbase.WarcHTMLResponseRecord; +import org.warcbase.WarcRecord; + +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Maps; + +public class IngestWarcFiles { + private static final Logger LOG = Logger.getLogger(IngestWarcFiles.class); + private static final int MAX_SIZE = 1024 * 1024; + private static final Set SKIP = ImmutableSet.of("mp3", "mov", "wmv", "mp4", "MP4"); + + private final HTable table; + + public IngestWarcFiles() throws Exception { + Configuration hbaseConfig = HBaseConfiguration.create(); + HBaseAdmin admin = new HBaseAdmin(hbaseConfig); + + if (admin.tableExists(Constants.TABLE_NAME)) { + LOG.info("Table " + Constants.TABLE_NAME + "already exists: doing nothing"); + } else { + HTableDescriptor tableDesc = new HTableDescriptor(Constants.TABLE_NAME); + for (int i = 0; i < Constants.FAMILIES.length; i++) { + tableDesc.addFamily(new HColumnDescriptor(Constants.FAMILIES[i])); + } + admin.createTable(tableDesc); + System.out.println("Successfully created table " + Constants.TABLE_NAME + "."); + } + + table = new HTable(hbaseConfig, Constants.TABLE_NAME); + Field maxKeyValueSizeField = HTable.class.getDeclaredField("maxKeyValueSize"); + maxKeyValueSizeField.setAccessible(true); + maxKeyValueSizeField.set(table, MAX_SIZE); + + LOG.info("maxKeyValueSize: " + maxKeyValueSizeField.get(table)); + admin.close(); + } + + public void ingestFolder(File inputWarcFolder, int i) { + long startTime = System.currentTimeMillis(); + int cnt = 0; + int skipped = 0; + for (; i < inputWarcFolder.listFiles().length; i++) { + File inputWarcFile = inputWarcFolder.listFiles()[i]; + if (inputWarcFile.getName().charAt(0) == '.') + continue; + GZIPInputStream gzInputStream = null; + try { + LOG.info("Processing File: " + i + " = " + inputWarcFile.getName()); + gzInputStream = new GZIPInputStream(new FileInputStream(inputWarcFile)); + } catch (Exception e) { + e.printStackTrace(); + } + // cast to a data input stream + DataInputStream inStream = new DataInputStream(gzInputStream); + + // iterate through our stream + WarcRecord record; + Map parse = null; + String thisTargetURI = null; + try { + while ((record = WarcRecord.readNextWarcRecord(inStream)) != null) { + // see if it's a response record + if (record.getHeaderRecordType().equals("response")) { + // it is - create a WarcHTML record + WarcHTMLResponseRecord htmlRecord = new WarcHTMLResponseRecord(record); + // get our TREC ID and target URI + thisTargetURI = htmlRecord.getTargetURI(); + if (SKIP.contains(Util.getUriExtension(thisTargetURI))) { + skipped++; + } + + String content = record.getContentUTF8(); + content = record.toString(); + parse = getHeaders(content); + String key = Util.reverseHostname(thisTargetURI); + if (key == null) { + continue; + } + byte[] data = record.getByteContent(); + if ( data.length > MAX_SIZE) { + LOG.info("Skipping " + key + " with " + data.length + " byte record"); + skipped++; + } else { + addRecord(key, parse.get("WARC-Date"), record.getByteContent()); + cnt++; + } + + if (cnt % 1000 == 0) { + LOG.info(cnt + " records ingested"); + } + } + } + inStream.close(); + + } catch (Exception e) { + e.printStackTrace(); + } + } + + long totalTime = System.currentTimeMillis() - startTime; + LOG.info("Total " + cnt + " records inserted, " + skipped + " records skipped"); + LOG.info("Total time: " + totalTime + "ms"); + LOG.info("Ingest rate: " + cnt / (totalTime/1000) + " records per second."); + } + + private Map getHeaders(String doc) { + Map hdr = Maps.newHashMapWithExpectedSize(20); + try { + BufferedReader in = new BufferedReader(new StringReader(doc)); + int nl = 0; + String line = null; + while ((line = in.readLine()) != null) { + if (line.length() == 0) + nl++; + if (nl == 2) + break; + int i = line.indexOf(':'); + if (i == -1) + continue; + try { + hdr.put(line.substring(0, i), line.substring(i + 2)); + } catch (Exception e) { + e.printStackTrace(); + } + } + StringBuilder buf = new StringBuilder(); + while ((line = in.readLine()) != null) { + buf.append(line).append('\n'); + } + hdr.put("document", buf.toString()); + } catch (IOException e) { + e.printStackTrace(); + } + return hdr; + } + + private void addRecord(String key, String date, byte[] data) { + try { + Put put = new Put(Bytes.toBytes(key)); + put.add(Bytes.toBytes(Constants.FAMILIES[0]), Bytes.toBytes(date), data); + table.put(put); + } catch (IOException e) { + LOG.error("Couldn't insert key: " + key); + LOG.error("File Size: " + data.length); + e.printStackTrace(); + } + } + + private static final String DIR_OPTION = "dir"; + private static final String START_OPTION = "start"; + + @SuppressWarnings("static-access") + public static void main(String[] args) throws Exception { + Options options = new Options(); + options.addOption(OptionBuilder.withArgName("dir").hasArg() + .withDescription("WARC files location").create(DIR_OPTION)); + options.addOption(OptionBuilder.withArgName("start").hasArg() + .withDescription("Start from WARC file").create(START_OPTION)); + + CommandLine cmdline = null; + CommandLineParser parser = new GnuParser(); + try { + cmdline = parser.parse(options, args); + } catch (ParseException exp) { + System.err.println("Error parsing command line: " + exp.getMessage()); + System.exit(-1); + } + + if (!cmdline.hasOption(DIR_OPTION)) { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp(IngestWarcFiles.class.getCanonicalName(), options); + System.exit(-1); + } + String path = cmdline.getOptionValue(DIR_OPTION); + File inputWarcFolder = new File(path); + + int i = 0; + if (cmdline.hasOption(START_OPTION)) { + i = Integer.parseInt(cmdline.getOptionValue(START_OPTION)); + } + + IngestWarcFiles load = new IngestWarcFiles(); + + load.ingestFolder(inputWarcFolder, i); + } +} diff --git a/src/main/java/org/warcbase/testHbase.java b/src/main/java/org/warcbase/testHbase.java deleted file mode 100644 index eae26fc..0000000 --- a/src/main/java/org/warcbase/testHbase.java +++ /dev/null @@ -1,75 +0,0 @@ -package org.warcbase; - -import java.io.IOException; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.util.Bytes; - - -public class testHbase { - - public static Configuration hbaseConfig = null; - public static HTable table = null; - - static { - hbaseConfig = HBaseConfiguration.create(); - } - - public static void addRecord(String key, String date, String content){ - if(table == null){ - try { - table = new HTable(hbaseConfig, Constants.TABLE_NAME); - } catch (IOException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - } - Put put = new Put(Bytes.toBytes(key)); - //put.add(family, qualifier, ts, value) - //put.setAttribute("name", Bytes.toBytes("")); - //put.add(Bytes.toBytes(Constants.FAMILYS[0]), Bytes.toBytes(""), Bytes.toBytes(date)); - put.add(Bytes.toBytes(Constants.FAMILYS[0]), Bytes.toBytes(date), Bytes.toBytes(content)); - try { - table.put(put); - } catch (IOException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - System.out.println("insert recored " + key + " to table " - + Constants.TABLE_NAME + " ok."); - } - - - /** - * @param args - */ - public static void main(String[] args) { - // TODO Auto-generated method stub - LoadWARC.creatTable(); - String testId = "2"; - addRecord(testId, "2012", "html1"); - addRecord(testId, "2013", "html2"); - addRecord(testId, "2011", "html3"); - Get get = new Get(Bytes.toBytes(testId)); - Result rs = null; - try { - rs = table.get(get); - } catch (IOException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - System.out.println(rs.raw().length); - for(int i=0;i