diff --git a/src/main/scala/org/warcbase/spark/matchbox/Formatter.scala b/src/main/scala/org/warcbase/spark/matchbox/Formatter.scala new file mode 100644 index 0000000..5ca854d --- /dev/null +++ b/src/main/scala/org/warcbase/spark/matchbox/Formatter.scala @@ -0,0 +1,14 @@ +package org.warcbase.spark.matchbox + +object Formatter { + def tabDelimit(it: Iterator[Any]): String = { + it.map(i => i match { + case s: String => s + case d: Int => d + case i: Iterator[Any] => tabDelimit(i) + case t2: (Any, Any) => tabDelimit(t2.productIterator) + case t3: (Any, Any, Any) => tabDelimit(t3.productIterator) + case t4: (Any, Any, Any, Any) => tabDelimit(t4.productIterator) + }).mkString("\t") + } +} diff --git a/src/main/scala/org/warcbase/spark/scripts/CrawlStatistics.scala b/src/main/scala/org/warcbase/spark/scripts/CrawlStatistics.scala index b107fa8..4407fa6 100644 --- a/src/main/scala/org/warcbase/spark/scripts/CrawlStatistics.scala +++ b/src/main/scala/org/warcbase/spark/scripts/CrawlStatistics.scala @@ -1,87 +1,89 @@ package org.warcbase.spark.scripts import com.google.common.io.Resources import org.apache.spark.{SparkConf, SparkContext} import org.warcbase.spark.matchbox.{ExtractLinks, ExtractTopLevelDomain, RecordLoader} +import org.warcbase.spark.matchbox._ import org.warcbase.spark.rdd.RecordRDD._ object CrawlStatistics { private val arcPath = Resources.getResource("arc/example.arc.gz").getPath private val warcPath = Resources.getResource("warc/example.warc.gz").getPath def numPagesPerCrawl(sc: SparkContext) = { val a = RecordLoader.loadArc(arcPath, sc) val b = RecordLoader.loadWarc(warcPath, sc) val pageCounts = a.union(b) .keepValidPages() .map(_.getCrawldate) .countItems() println(pageCounts.take(1).mkString) } def numLinksPerCrawl(sc: SparkContext) = { val a = RecordLoader.loadArc(arcPath, sc) val b = RecordLoader.loadWarc(warcPath, sc) val linkCounts = a.union(b) .keepValidPages() .map(r => (r.getCrawldate.substring(0, 6), ExtractLinks(r.getUrl, r.getContentString))) .flatMap(r => r._2.map(f => (r._1, f._1, f._2))) .filter(r => r._2 != null && r._3 != null) .map(_._1) .countItems() println(linkCounts.take(1).mkString) } def numPagesByDomain(sc: SparkContext): Unit = { val a = RecordLoader.loadArc(arcPath, sc) val b = RecordLoader.loadWarc(warcPath, sc) val domainCounts = a.union(b) .keepValidPages() .map(r => (r.getCrawldate.substring(0, 6), r.getDomain.replaceAll("^\\s*www\\.", ""))) .countItems() .filter(f => f._2 > 10) .sortBy(f => (f._1, f._2)) .collect() println(domainCounts.take(1).mkString) } def linkStructure(sc: SparkContext) = { val linkStructure = RecordLoader.loadArc(arcPath, sc) .keepValidPages() .map(r => (r.getCrawldate.substring(0, 6), ExtractLinks(r.getUrl, r.getContentString))) .flatMap(r => r._2.map(f => (r._1, ExtractTopLevelDomain(f._1).replaceAll("^\\s*www\\.", ""), ExtractTopLevelDomain(f._2).replaceAll("^\\s*www\\.", "")))) .filter(r => r._2 != null && r._3 != null) .countItems() .collect() println(linkStructure.take(1).mkString) } def warclinkStructure(sc: SparkContext) = { val linkStructure = RecordLoader.loadWarc(warcPath, sc) .keepValidPages() .map(r => (r.getCrawldate.substring(0, 6), ExtractLinks(r.getUrl, r.getContentString))) .flatMap(r => r._2.map(f => (r._1, ExtractTopLevelDomain(f._1).replaceAll("^\\s*www\\.", ""), ExtractTopLevelDomain(f._2).replaceAll("^\\s*www\\.", "")))) .filter(r => r._2 != null && r._3 != null) .countItems() + .map(r => Formatter.tabDelimit(r.productIterator)) .collect() - println(linkStructure.take(1).mkString) + println(linkStructure.take(1).head) } def main(args: Array[String]) = { val master = "local[4]" val appName = "example-spark" val conf = new SparkConf() .setMaster(master) .setAppName(appName) val sc = new SparkContext(conf) try { numPagesPerCrawl(sc) numLinksPerCrawl(sc) numPagesByDomain(sc) linkStructure(sc) warclinkStructure(sc) } finally { sc.stop() } } } diff --git a/src/test/scala/org/warcbase/spark/matchbox/FormatterTest.scala b/src/test/scala/org/warcbase/spark/matchbox/FormatterTest.scala new file mode 100644 index 0000000..17d49ea --- /dev/null +++ b/src/test/scala/org/warcbase/spark/matchbox/FormatterTest.scala @@ -0,0 +1,13 @@ +package org.warcbase.spark.matchbox + +import org.junit.runner.RunWith +import org.scalatest.FunSuite +import org.scalatest.junit.JUnitRunner + +@RunWith(classOf[JUnitRunner]) +class FormatterTest extends FunSuite { + test("tab delimit") { + val tuple = (("a", "b", "c"), "d", 5, ("hi", 1)) + assert(Formatter.tabDelimit(tuple.productIterator) == "a\tb\tc\td\t5\thi\t1") + } +}