diff --git a/src/main/scala/org/warcbase/spark/matchbox/ExtractDate.scala b/src/main/scala/org/warcbase/spark/matchbox/ExtractDate.scala index cf40a9a..f30dbcd 100644 --- a/src/main/scala/org/warcbase/spark/matchbox/ExtractDate.scala +++ b/src/main/scala/org/warcbase/spark/matchbox/ExtractDate.scala @@ -1,30 +1,30 @@ package org.warcbase.spark.matchbox /** * Simple wrapper for getting different parts of a date */ object ExtractDate { object DateComponent extends Enumeration { type DateComponent = Value val YYYY, MM, DD, YYYYMM, YYYYMMDD = Value } import DateComponent._ /** * Extracts the wanted component from a date * - * @param fullDate date returned by `WARecord.getCrawldate`, formatted as YYYYMMDD + * @param fullDate date returned by `WARecord.getCrawlDate`, formatted as YYYYMMDD * @param dateFormat an enum describing the portion of the date wanted */ def apply(fullDate: String, dateFormat: DateComponent): String = if (fullDate == null) fullDate else dateFormat match { case YYYY => fullDate.substring(0, 4) case MM => fullDate.substring(4, 6) case DD => fullDate.substring(6, 8) case YYYYMM => fullDate.substring(0, 6) case YYYYMMDD => fullDate.substring(0, 8) } } diff --git a/src/main/scala/org/warcbase/spark/matchbox/ExtractEntities.scala b/src/main/scala/org/warcbase/spark/matchbox/ExtractEntities.scala index ba8f2b8..db51633 100644 --- a/src/main/scala/org/warcbase/spark/matchbox/ExtractEntities.scala +++ b/src/main/scala/org/warcbase/spark/matchbox/ExtractEntities.scala @@ -1,69 +1,69 @@ /* * Warcbase: an open-source platform for managing web archives * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.warcbase.spark.matchbox import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD /** * Extracts entities */ object ExtractEntities { /** * @param iNerClassifierFile path of classifier file * @param inputRecordFile path of ARC or WARC file from which to extract entities * @param outputFile path of output directory */ def extractFromRecords(iNerClassifierFile: String, inputRecordFile: String, outputFile: String, sc: SparkContext): RDD[(String, String, String)] = { val rdd = RecordLoader.loadArc(inputRecordFile, sc) - .map(r => (r.getCrawldate, r.getUrl, RemoveHTML(r.getContentString))) + .map(r => (r.getCrawlDate, r.getUrl, RemoveHTML(r.getContentString))) extractAndOutput(iNerClassifierFile, rdd, outputFile) } /** * @param iNerClassifierFile path of classifier file * @param inputFile path of file with tuples (date: String, url: String, content: String) * from which to extract entities * @param outputFile path of output directory */ def extractFromScrapeText(iNerClassifierFile: String, inputFile: String, outputFile: String, sc: SparkContext): RDD[(String, String, String)] = { val rdd = sc.textFile(inputFile) .map(line => { val ind1 = line.indexOf(",") val ind2 = line.indexOf(",", ind1 + 1) (line.substring(1, ind1), line.substring(ind1 + 1, ind2), line.substring(ind2 + 1, line.length - 1)) }) extractAndOutput(iNerClassifierFile, rdd, outputFile) } /** * @param iNerClassifierFile path of classifier file * @param rdd with values (date, url, content) * @param outputFile path of output directory */ def extractAndOutput(iNerClassifierFile: String, rdd: RDD[(String, String, String)], outputFile: String): RDD[(String, String, String)] = { val r = rdd.mapPartitions(iter => { NER3Classifier.apply(iNerClassifierFile) iter.map(r => (r._1, r._2, NER3Classifier.classify(r._3))) }) r.saveAsTextFile(outputFile) r } } diff --git a/src/main/scala/org/warcbase/spark/matchbox/ExtractGraph.scala b/src/main/scala/org/warcbase/spark/matchbox/ExtractGraph.scala index 786aa90..085113f 100644 --- a/src/main/scala/org/warcbase/spark/matchbox/ExtractGraph.scala +++ b/src/main/scala/org/warcbase/spark/matchbox/ExtractGraph.scala @@ -1,90 +1,90 @@ /* * Warcbase: an open-source platform for managing web archives * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.warcbase.spark.matchbox import org.apache.spark.graphx._ import org.apache.spark.rdd.RDD import org.warcbase.spark.archive.io.ArchiveRecord import org.warcbase.spark.matchbox.StringUtils._ import org.warcbase.spark.rdd.RecordRDD._ import org.warcbase.spark.utils.JsonUtil /** * * e.g. when done: * $ cat nodes.partjson/part-* > nodes.json && cat links.partjson/part-* > links.json * $ jq -c -n --slurpfile nodes nodes.json --slurpfile links links.json '{nodes: $nodes, links: $links}' > graph.json * */ object ExtractGraph { def pageHash(url: String): VertexId = { url.hashCode.toLong } case class VertexData(domain: String, pageRank: Double, inDegree: Int, outDegree: Int) case class EdgeData(date: String, src: String, dst: String) def apply(records: RDD[ArchiveRecord], dynamic: Boolean = false, tolerance: Double = 0.001, numIter: Int = 3): Graph[VertexData, EdgeData] = { val vertices: RDD[(VertexId, VertexData)] = records.keepValidPages() .flatMap(r => ExtractLinks(r.getUrl, r.getContentString)) .flatMap(r => List(ExtractDomain(r._1).removePrefixWWW(), ExtractDomain(r._2).removePrefixWWW())) .distinct .map(r => (pageHash(r), VertexData(r, 0.0, 0, 0))) val edges: RDD[Edge[EdgeData]] = records.keepValidPages() - .map(r => (r.getCrawldate, ExtractLinks(r.getUrl, r.getContentString))) + .map(r => (r.getCrawlDate, ExtractLinks(r.getUrl, r.getContentString))) .flatMap(r => r._2.map(f => (r._1, ExtractDomain(f._1).removePrefixWWW(), ExtractDomain(f._2).removePrefixWWW()))) .filter(r => r._2 != "" && r._3 != "") .map(r => Edge(pageHash(r._2), pageHash(r._3), EdgeData(r._1, r._2, r._3))) val graph = Graph(vertices, edges) val graphInOut = graph.outerJoinVertices(graph.inDegrees) { case (vid, rv, inDegOpt) => VertexData(rv.domain, rv.pageRank, inDegOpt.getOrElse(0), rv.outDegree) }.outerJoinVertices(graph.outDegrees) { case (vid, rv, outDegOpt) => VertexData(rv.domain, rv.pageRank, rv.inDegree, outDegOpt.getOrElse(0)) } if (dynamic) { graphInOut.outerJoinVertices(graph.pageRank(tolerance).vertices) { case (vid, rv, pageRankOpt) => VertexData(rv.domain, pageRankOpt.getOrElse(0.0), rv.inDegree, rv.outDegree) } } else { graphInOut.outerJoinVertices(graph.staticPageRank(numIter).vertices) { case (vid, rv, pageRankOpt) => VertexData(rv.domain, pageRankOpt.getOrElse(0.0), rv.inDegree, rv.outDegree) } } } implicit class GraphWriter(graph: Graph[VertexData, EdgeData]) { def writeAsJson(verticesPath: String, edgesPath: String) = { // Combine edges of a given (date, src, dst) combination into single record with count value. val edgesCounted = graph.edges.countItems().map { r => Map("date" -> r._1.attr.date, "src" -> r._1.attr.src, "dst" -> r._1.attr.dst, "count" -> r._2) } edgesCounted.map(r => JsonUtil.toJson(r)).saveAsTextFile(edgesPath) graph.vertices.map(r => JsonUtil.toJson(r._2)).saveAsTextFile(verticesPath) } } } diff --git a/src/main/scala/org/warcbase/spark/rdd/RecordRDD.scala b/src/main/scala/org/warcbase/spark/rdd/RecordRDD.scala index 20a7389..bf67b9e 100644 --- a/src/main/scala/org/warcbase/spark/rdd/RecordRDD.scala +++ b/src/main/scala/org/warcbase/spark/rdd/RecordRDD.scala @@ -1,115 +1,115 @@ /* * Warcbase: an open-source platform for managing web archives * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.warcbase.spark.rdd import org.apache.spark.rdd.RDD import org.warcbase.spark.archive.io.ArchiveRecord import org.warcbase.spark.matchbox.{DetectLanguage, ExtractDate, ExtractDomain, RemoveHTML} import org.warcbase.spark.matchbox.ExtractDate.DateComponent import org.warcbase.spark.matchbox.ExtractDate.DateComponent.DateComponent import scala.reflect.ClassTag import scala.util.matching.Regex /** * RDD wrappers for working with Records */ object RecordRDD extends java.io.Serializable { /** * A Wrapper class around RDD to simplify counting */ implicit class CountableRDD[T: ClassTag](rdd: RDD[T]) extends java.io.Serializable { def countItems(): RDD[(T, Int)] = { rdd.map(r => (r, 1)) .reduceByKey((c1, c2) => c1 + c2) .sortBy(f => f._2, ascending = false) } } /** * A Wrapper class around RDD to allow RDDs of type ARCRecord and WARCRecord to be queried via a fluent API. * * To load such an RDD, please see [[org.warcbase.spark.matchbox.RecordLoader]] */ implicit class WARecordRDD(rdd: RDD[ArchiveRecord]) extends java.io.Serializable { def keepValidPages(): RDD[ArchiveRecord] = { rdd.filter(r => - r.getCrawldate != null + r.getCrawlDate != null && (r.getMimeType == "text/html" || r.getUrl.endsWith("htm") || r.getUrl.endsWith("html")) && !r.getUrl.endsWith("robots.txt")) } def keepMimeTypes(mimeTypes: Set[String]) = { rdd.filter(r => mimeTypes.contains(r.getMimeType)) } def keepDate(date: String, component: DateComponent = DateComponent.YYYYMMDD) = { - rdd.filter(r => ExtractDate(r.getCrawldate, component) == date) + rdd.filter(r => ExtractDate(r.getCrawlDate, component) == date) } def keepUrls(urls: Set[String]) = { rdd.filter(r => urls.contains(r.getUrl)) } def keepUrlPatterns(urlREs: Set[Regex]) = { rdd.filter(r => urlREs.map(re => r.getUrl match { case re() => true case _ => false }).exists(identity)) } def keepDomains(urls: Set[String]) = { rdd.filter(r => urls.contains(ExtractDomain(r.getUrl).replace("^\\s*www\\.", ""))) } def keepLanguages(lang: Set[String]) = { rdd.filter(r => lang.contains(DetectLanguage(RemoveHTML(r.getContentString)))) } def discardMimeTypes(mimeTypes: Set[String]) = { rdd.filter(r => !mimeTypes.contains(r.getMimeType)) } def discardDate(date: String) = { - rdd.filter(r => r.getCrawldate != date) + rdd.filter(r => r.getCrawlDate != date) } def discardUrls(urls: Set[String]) = { rdd.filter(r => !urls.contains(r.getUrl)) } def discardUrlPatterns(urlREs: Set[Regex]) = { rdd.filter(r => !urlREs.map(re => r.getUrl match { case re() => true case _ => false }).exists(identity)) } def discardDomains(urls: Set[String]) = { rdd.filter(r => !urls.contains(r.getDomain)) } } } \ No newline at end of file