diff --git a/pom.xml b/pom.xml index 6505e1a..17f9822 100644 --- a/pom.xml +++ b/pom.xml @@ -1,533 +1,538 @@ 4.0.0 org.warcbase warcbase jar 0.1.0-SNAPSHOT Warcbase An open-source platform for managing web archives built on Hadoop and HBase http://warcbase.org/ The Apache Software License, Version 2.0 http://www.apache.org/licenses/LICENSE-2.0.txt repo scm:git:git@github.com:lintool/warcbase.git scm:git:git@github.com:lintool/warcbase.git git@github.com:lintool/warcbase.git lintool Jimmy Lin jimmylin@umd.edu milad621 Milad Gholami mgholami@cs.umd.edu jeffyRao Jinfeng Rao jinfeng@cs.umd.edu org.sonatype.oss oss-parent 7 UTF-8 UTF-8 8.1.12.v20130726 2.6.0-cdh5.4.1 1.0.0-cdh5.4.1 3.4.5-cdh5.4.1 - 1.3.0-cdh5.4.1 + 1.3.0-cdh5.4.1 2.10.4 maven-clean-plugin 2.6.1 src/main/solr/lib false org.apache.maven.plugins maven-compiler-plugin 3.2 1.7 1.7 org.apache.maven.plugins maven-shade-plugin 2.3 package shade META-INF/services/org.apache.lucene.codecs.Codec *:* META-INF/*.SF META-INF/*.DSA META-INF/*.RSA true fatjar org.apache.hadoop:* org.apache.maven.plugins maven-dependency-plugin 2.4 copy package copy-dependencies src/main/solr/lib org.codehaus.mojo appassembler-maven-plugin 1.9 -Xms512M -Xmx24576M org.warcbase.WarcbaseAdmin WarcbaseAdmin org.warcbase.data.UrlMappingBuilder UrlMappingBuilder org.warcbase.data.UrlMapping UrlMapping org.warcbase.data.ExtractLinks ExtractLinks org.warcbase.data.ExtractSiteLinks ExtractSiteLinks org.warcbase.ingest.IngestFiles IngestFiles org.warcbase.ingest.SearchForUrl SearchForUrl org.warcbase.browser.WarcBrowser WarcBrowser org.warcbase.analysis.DetectDuplicates DetectDuplicates org.warcbase.browser.SeleniumBrowser SeleniumBrowser org.scala-tools maven-scala-plugin 2.15.2 process-resources add-source compile scala-test-compile process-test-resources testCompile ${scala.version} true -target:jvm-1.7 -g:vars -deprecation -dependencyfile ${project.build.directory}/.scala_dependencies maven http://repo.maven.apache.org/maven2/ cloudera https://repository.cloudera.com/artifactory/cloudera-repos/ internetarchive Internet Archive Maven Repository http://builds.archive.org:8080/maven2 junit junit 4.12 test org.scalatest scalatest_2.10 2.2.4 test commons-codec commons-codec 1.8 commons-io commons-io 2.4 org.jsoup jsoup 1.7.3 com.google.guava guava - 14.0.1 + 19.0 tl.lin lintools-datatypes 1.0.0 org.apache.hbase hbase-client ${hbase.version} org.apache.hadoophadoop-core org.apache.hbase hbase-server ${hbase.version} org.apache.hadoophadoop-core org.mortbay.jettyservlet-api-2.5 javax.servletservlet-api asmasm org.apache.hadoop hadoop-client ${hadoop.version} javax.servletservlet-api org.apache.zookeeper zookeeper ${zookeeper.version} org.netpreserve.openwayback openwayback-core 2.0.0.BETA.2 org.apache.hadoophadoop-core ch.qos.logbacklogback-classic org.netpreserve.openwaybackopenwayback-cdx-server org.netpreserve.openwaybackopenwayback-access-control-core it.unimi.dsidsiutils fastutilfastutil org.netpreserve.commons webarchive-commons 1.1.4 org.apache.hadoophadoop-core commons-langcommons-lang fastutilfastutil it.unimi.dsi dsiutils 2.2.0 ch.qos.logbacklogback-classic commons-langcommons-lang it.unimi.dsi fastutil 6.5.15 commons-langcommons-lang org.eclipse.jetty jetty-server ${jettyVersion} org.eclipse.jetty jetty-webapp ${jettyVersion} true org.slf4j slf4j-log4j12 1.6.4 org.apache.commons commons-lang3 3.0 commons-cli commons-cli 1.2 net.sf.opencsv opencsv 2.3 org.apache.tika tika-core 1.9 org.apache.tika tika-parsers 1.9 org.antlr antlr 3.5.2 org.seleniumhq.selenium selenium-java 2.42.2 org.seleniumhq.seleniumselenium-htmlunit-driver org.seleniumhq.seleniumselenium-ie-driver org.webbitserverwebbit org.scala-lang scala-library 2.10.4 org.apache.spark spark-core_2.10 ${spark.version} com.typesafeconfig org.xerial.snappysnappy-java + + org.apache.spark + spark-graphx_2.10 + ${spark.version} + com.chuusai shapeless_2.10.4 2.0.0 com.fasterxml.jackson.core jackson-core - 2.6.3 + 2.7.2 com.fasterxml.jackson.core jackson-databind - 2.6.3 + 2.7.2 org.json4s json4s-jackson_2.10 3.2.10 com.typesafe config 1.2.1 org.xerial.snappy snappy-java 1.0.5 edu.stanford.nlp stanford-corenlp 3.4.1 com.syncthemall boilerpipe 1.2.2 xerces xercesImpl 2.11.0 org.apache.lucene lucene-core 4.7.2 org.apache.solr solr-core 4.7.2 slf4j-apiorg.slf4j org.apache.hadoophadoop-annotations org.apache.hadoophadoop-common org.apache.hadoophadoop-hdfs com.typesafeconfig uk.bl.wa.discovery warc-hadoop-indexer 2.2.0-BETA-5 asmasm com.typesafeconfig diff --git a/src/main/scala/org/warcbase/spark/matchbox/ExtractGraph.scala b/src/main/scala/org/warcbase/spark/matchbox/ExtractGraph.scala new file mode 100644 index 0000000..a14cbaf --- /dev/null +++ b/src/main/scala/org/warcbase/spark/matchbox/ExtractGraph.scala @@ -0,0 +1,89 @@ +/* + * Warcbase: an open-source platform for managing web archives + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.warcbase.spark.matchbox + +import org.apache.spark.graphx._ +import org.apache.spark.rdd.RDD +import org.warcbase.spark.archive.io.ArchiveRecord +import org.warcbase.spark.rdd.RecordRDD._ +import org.warcbase.spark.utils.JsonUtil + +/** + * + * e.g. when done: + * $ cat nodes.partjson/part-* > nodes.json && cat links.partjson/part-* > links.json + * $ jq -c -n --slurpfile nodes nodes.json --slurpfile links links.json '{nodes: $nodes, links: $links}' > graph.json + * + */ + +object ExtractGraph { + def pageHash(url: String): VertexId = { + url.hashCode.toLong + } + + case class VertexData(domain: String, pageRank: Double, inDegree: Int, outDegree: Int) + case class EdgeData(date: String, src: String, dst: String) + + def apply(records: RDD[ArchiveRecord], dynamic: Boolean = false, + tolerance: Double = 0.001, numIter: Int = 3): Graph[VertexData, EdgeData] = { + val vertices: RDD[(VertexId, VertexData)] = records.keepValidPages() + .flatMap(r => ExtractLinks(r.getUrl, r.getContentString)) + .flatMap(r => List(ExtractTopLevelDomain(r._1).replaceAll("^\\s*www\\.", ""), ExtractTopLevelDomain(r._2).replaceAll("^\\s*www\\.", ""))) + .distinct + .map(r => (pageHash(r), VertexData(r, 0.0, 0, 0))) + + val edges: RDD[Edge[EdgeData]] = records.keepValidPages() + .map(r => (r.getCrawldate, ExtractLinks(r.getUrl, r.getContentString))) + .flatMap(r => r._2.map(f => (r._1, ExtractTopLevelDomain(f._1).replaceAll("^\\s*www\\.", ""), ExtractTopLevelDomain(f._2).replaceAll("^\\s*www\\.", "")))) + .filter(r => r._2 != "" && r._3 != "") + .map(r => Edge(pageHash(r._2), pageHash(r._3), EdgeData(r._1, r._2, r._3))) + + val graph = Graph(vertices, edges) + + val graphInOut = graph.outerJoinVertices(graph.inDegrees) { + case (vid, rv, inDegOpt) => VertexData(rv.domain, rv.pageRank, inDegOpt.getOrElse(0), rv.outDegree) + }.outerJoinVertices(graph.outDegrees) { + case (vid, rv, outDegOpt) => VertexData(rv.domain, rv.pageRank, rv.inDegree, outDegOpt.getOrElse(0)) + } + + if (dynamic) { + graphInOut.outerJoinVertices(graph.pageRank(tolerance).vertices) { + case (vid, rv, pageRankOpt) => VertexData(rv.domain, pageRankOpt.getOrElse(0.0), rv.inDegree, rv.outDegree) + } + } else { + graphInOut.outerJoinVertices(graph.staticPageRank(numIter).vertices) { + case (vid, rv, pageRankOpt) => VertexData(rv.domain, pageRankOpt.getOrElse(0.0), rv.inDegree, rv.outDegree) + } + } + } + + implicit class GraphWriter(graph: Graph[VertexData, EdgeData]) { + def writeAsJson(verticesPath: String, edgesPath: String) = { + // Combine edges of a given (date, src, dst) combination into single record with count value. + val edgesCounted = graph.edges.countItems().map { + r => Map("date" -> r._1.attr.date, + "src" -> r._1.attr.src, + "dst" -> r._1.attr.dst, + "count" -> r._2) + } + + edgesCounted.map(r => JsonUtil.toJson(r)).saveAsTextFile(edgesPath) + graph.vertices.map(r => JsonUtil.toJson(r._2)).saveAsTextFile(verticesPath) + } + } +} + diff --git a/src/main/scala/org/warcbase/spark/matchbox/NERCombinedJson.scala b/src/main/scala/org/warcbase/spark/matchbox/NERCombinedJson.scala index c7421e2..180012a 100644 --- a/src/main/scala/org/warcbase/spark/matchbox/NERCombinedJson.scala +++ b/src/main/scala/org/warcbase/spark/matchbox/NERCombinedJson.scala @@ -1,149 +1,129 @@ package org.warcbase.spark.matchbox import java.io.BufferedReader import java.io.BufferedWriter import java.io.InputStreamReader import java.io.OutputStreamWriter import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ -import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext -import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} -import com.fasterxml.jackson.module.scala.DefaultScalaModule -import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper - +import org.warcbase.spark.utils.JsonUtil import scala.collection.mutable.MutableList import scala.util.Random /** * Classifies records using NER and stores results as JSON */ class NERCombinedJson extends Serializable { def combineKeyCountLists (l1: List[(String, Int)], l2: List[(String, Int)]): List[(String, Int)] = { (l1 ++ l2).groupBy(_._1 ).map { case (key, tuples) => (key, tuples.map( _._2).sum) }.toList } /** Combines directory of part-files containing one JSON array per line * into a single file containing a single JSON array of arrays. * * @param srcDir name of directory holding files, also name that will * be given to JSON file. */ def partDirToFile(srcDir: String): Unit = { val hadoopConfig = new Configuration() val hdfs = FileSystem.get(hadoopConfig) val rnd = new Random val srcPath = new Path(srcDir) val tmpFile = rnd.alphanumeric.take(8).mkString + ".almostjson" val tmpPath = new Path(tmpFile) // Merge part-files into single file FileUtil.copyMerge(hdfs, srcPath, hdfs, tmpPath, false, hadoopConfig, null) // Read file of JSON arrays, write into single JSON array of arrays val fsInStream = hdfs.open(tmpPath) val inFile = new BufferedReader(new InputStreamReader(fsInStream)) hdfs.delete(srcPath, true) // Don't need part-files anymore val fsOutStream = hdfs.create(srcPath, true) // path was dir of part-files, // now is a file of JSON val outFile = new BufferedWriter(new OutputStreamWriter(fsOutStream)) outFile.write("[") val line = inFile.readLine() if (line != null) outFile.write(line) Iterator.continually(inFile.readLine()).takeWhile(_ != null).foreach(s => {outFile.write(", " + s)}) outFile.write("]") outFile.close() inFile.close() hdfs.delete(tmpPath, false) } /** Do NER classification on input path, output JSON. * * @param iNerClassifierFile path of classifier file * @param inputFile path of file with tuples (date: String, url: String, content: String) * from which to extract entities * @param outputFile path of output file (e.g., "entities.json") * @param sc Spark context object */ def classify(iNerClassifierFile: String, inputFile: String, outputFile: String, sc: SparkContext) { val out = sc.textFile(inputFile) .mapPartitions(iter => { NER3Classifier.apply(iNerClassifierFile) iter.map(line => { val ind1 = line.indexOf(",") val ind2 = line.indexOf(",", ind1 + 1) (line.substring(1, ind1), line.substring(ind1 + 1, ind2), line.substring(ind2 + 1, line.length - 1)) }) .map(r => { val classifiedJson = NER3Classifier.classify(r._3) - val jUtl = new JsonUtil - val classifiedMap = jUtl.fromJson[Map[String,List[String]]](classifiedJson) + //val jUtl = new JsonUtil + //val classifiedMap = JsonUtil.fromJson[Map[String,List[String]]](classifiedJson) + val classifiedMap = JsonUtil.fromJson(classifiedJson) val classifiedMapCountTuples: Map[String, List[(String, Int)]] = classifiedMap.map { - case (nerType, entityList) => (nerType, entityList.groupBy(identity).mapValues(_.size).toList) + case (nerType, entityList: List[String]) => (nerType, entityList.groupBy(identity).mapValues(_.size).toList) } ((r._1, r._2), classifiedMapCountTuples) }) }) .reduceByKey( (a, b) => (a ++ b).keySet.map(r => (r, combineKeyCountLists(a(r), b(r)))).toMap) .mapPartitions(iter => { - val jUtl = new JsonUtil iter.map(r => { val nerRec = new NerRecord(r._1._1, r._1._2) r._2.foreach(entityMap => { // e.g., entityMap = "PERSON" -> List(("Jack", 1), ("Diane", 3)) val ec = new EntityCounts(entityMap._1) entityMap._2.foreach(e => { ec.entities += new Entity(e._1, e._2) }) nerRec.ner += ec }) - jUtl.toJson(nerRec) + JsonUtil.toJson(nerRec) }) }) .saveAsTextFile(outputFile) partDirToFile(outputFile) } class Entity(iEntity: String, iFreq: Int) { var entity: String = iEntity var freq: Int = iFreq } class EntityCounts(iNerType: String) { var nerType: String = iNerType var entities = MutableList[Entity]() } class NerRecord(recDate: String, recDomain: String) { var date = recDate var domain = recDomain var ner = MutableList[EntityCounts]() } } -class JsonUtil extends Serializable { - val mapper = new ObjectMapper() with ScalaObjectMapper - mapper.registerModule(DefaultScalaModule) - mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) - - def toJson(value: Map[Symbol, Any]): String = { - toJson(value map { case (k,v) => k.name -> v}) - } - def toJson(value: Any): String = { - mapper.writeValueAsString(value) - } - - def fromJson[T](json: String)(implicit m : Manifest[T]): T = { - mapper.readValue[T](json) - } -} diff --git a/src/main/scala/org/warcbase/spark/utils/JsonUtil.scala b/src/main/scala/org/warcbase/spark/utils/JsonUtil.scala new file mode 100644 index 0000000..cf8b0d9 --- /dev/null +++ b/src/main/scala/org/warcbase/spark/utils/JsonUtil.scala @@ -0,0 +1,22 @@ +package org.warcbase.spark.utils + +import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} +import com.fasterxml.jackson.module.scala.DefaultScalaModule + +object JsonUtil extends Serializable { + val mapper = new ObjectMapper() + mapper.registerModule(DefaultScalaModule) + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + + def toJson(value: Map[Symbol, Any]): String = { + toJson(value map { case (k,v) => k.name -> v}) + } + + def toJson(value: Any): String = { + mapper.writeValueAsString(value) + } + + def fromJson(json: String): Map[String, Any] = { + mapper.readValue(json, classOf[Map[String, Any]]) + } +}