diff --git a/README.md b/README.md index 75d6310..cb03f75 100644 --- a/README.md +++ b/README.md @@ -1,129 +1,129 @@ Warcbase ======== Warcbase is an open-source platform for managing web archives built on Hadoop and HBase. The platform provides a flexible data model for storing and managing raw content as well as metadata and extracted knowledge. Tight integration with Hadoop provides powerful tools for analytics and data processing via Spark. There are two main ways of using Warcbase: + The first and most common is to analyze web archives using [Spark](http://spark.apache.org/). + The second is to take advantage of HBase to provide random access as well as analytics capabilities. Random access allows Warcbase to provide temporal browsing of archived content (i.e., "wayback" functionality). You can use Warcbase without HBase, and since HBase requires more extensive setup, it is recommended that if you're just starting out, play with the Spark analytics and don't worry about HBase. Warcbase is built against CDH 5.4.1: + Hadoop version: 2.6.0-cdh5.4.1 + HBase version: 1.0.0-cdh5.4.1 + Spark version: 1.3.0-cdh5.4.1 The Hadoop ecosystem is evolving rapidly, so there may be incompatibilities with other versions. Detailed documentation is available [here](http://lintool.github.io/warcbase-docs/). Supporting files can be found in the [warcbase-resources repository](https://github.com/lintool/warcbase-resources). Getting Started --------------- Clone the repo: ``` $ git clone http://github.com/lintool/warcbase.git ``` You can then build Warcbase: ``` $ mvn clean package appassembler:assemble ``` For the impatient, to skip tests: ``` $ mvn clean package appassembler:assemble -DskipTests ``` To create Eclipse project files: ``` $ mvn eclipse:clean $ mvn eclipse:eclipse ``` You can then import the project into Eclipse. To generate Scaladocs: ``` $ mvn scala:doc ``` Generated Scaladocs will be under the `target/site` directory Spark Quickstart ---------------- For the impatient, let's do a simple analysis with Spark. Within the repo there's already a sample ARC file stored at `src/test/resources/arc/example.arc.gz`. Our supporting resources repository also has [larger ARC and WARC files as real-world examples](https://github.com/lintool/warcbase-resources/tree/master/Sample-Data). If you need to install Spark, [we have a walkthrough here for installation on OS X](http://lintool.github.io/warcbase-docs/Installing-and-Running-Spark-under-OS-X/). This page also has instructions on how to get Spark Notebook, an interactive web-based editor, running. Once you've got Spark installed, you can go ahead and fire up the Spark shell: ``` $ spark-shell --jars target/warcbase-0.1.0-SNAPSHOT-fatjar.jar ``` Here's a simple script that extracts and counts the top-level domains (i.e., number of pages for each top-level domain) in the sample ARC data: ``` import org.warcbase.spark.matchbox._ import org.warcbase.spark.rdd.RecordRDD._ val r = RecordLoader.loadArchives("src/test/resources/arc/example.arc.gz", sc) .keepValidPages() - .map(r => ExtractTopLevelDomain(r.getUrl)) + .map(r => ExtractDomain(r.getUrl)) .countItems() .take(10) ``` **Tip:** By default, commands in the Spark shell must be one line. To run multi-line commands, type `:paste` in Spark shell: you can then copy-paste the script above directly into Spark shell. Use Ctrl-D to finish the command. What to learn more? Check out [detailed documentation on analyzing web archives with Spark](http://lintool.github.io/warcbase-docs/). What About Pig? --------------- Warcbase was originally conceived with Pig for analytics, but we have transitioned over to Spark as the language of choice for scholarly interactions with web archive data. Spark has several advantages, including a cleaner interface, easier to write user-defined functions (UDFs), as well as integration with different "notebook" frontends. Visualizations -------------- The result of analyses of using Warcbase can serve as input to visualizations that help scholars interactively explore the data. Examples include: + [Basic crawl statistics](http://lintool.github.io/warcbase/vis/crawl-sites/index.html) from the Canadian Political Parties and Political Interest Groups collection. + [Interactive graph visualization](http://lintool.github.io/warcbase-docs/Gephi-Converting-Site-Link-Structure-into-Dynamic-Visualization/) using Gephi. + [Shine interface](http://webarchives.ca/) for faceted full-text search. Next Steps ---------- + [Ingesting content into HBase](http://lintool.github.io/warcbase-docs/Ingesting-Content-into-HBase/): loading ARC and WARC data into HBase + [Warcbase/Wayback integration](http://lintool.github.io/warcbase-docs/Warcbase-Wayback-Integration/): guide to provide temporal browsing capabilities + [Warcbase Java tools](http://lintool.github.io/warcbase-docs/Warcbase-Java-Tools/): building the URL mapping, extracting the webgraph License ------- Licensed under the [Apache License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0). Acknowledgments --------------- This work is supported in part by the National Science Foundation and by the Mellon Foundation (via Columbia University). Any opinions, findings, and conclusions or recommendations expressed are those of the researchers and do not necessarily reflect the views of the sponsors. diff --git a/src/main/scala/org/warcbase/spark/archive/io/ArcRecord.scala b/src/main/scala/org/warcbase/spark/archive/io/ArcRecord.scala index 6542fe3..c2cca3f 100644 --- a/src/main/scala/org/warcbase/spark/archive/io/ArcRecord.scala +++ b/src/main/scala/org/warcbase/spark/archive/io/ArcRecord.scala @@ -1,22 +1,22 @@ package org.warcbase.spark.archive.io import org.apache.spark.SerializableWritable import org.warcbase.data.ArcRecordUtils import org.warcbase.io.ArcRecordWritable import org.warcbase.spark.matchbox.ExtractDate.DateComponent -import org.warcbase.spark.matchbox.{ExtractDate, ExtractTopLevelDomain} +import org.warcbase.spark.matchbox.{ExtractDate, ExtractDomain} class ArcRecord(r: SerializableWritable[ArcRecordWritable]) extends ArchiveRecord { val getCrawldate: String = ExtractDate(r.t.getRecord.getMetaData.getDate, DateComponent.YYYYMMDD) val getMimeType: String = r.t.getRecord.getMetaData.getMimetype val getUrl: String = r.t.getRecord.getMetaData.getUrl - val getDomain: String = ExtractTopLevelDomain(r.t.getRecord.getMetaData.getUrl) + val getDomain: String = ExtractDomain(r.t.getRecord.getMetaData.getUrl) val getContentBytes: Array[Byte] = ArcRecordUtils.getBodyContent(r.t.getRecord) val getContentString: String = new String(getContentBytes) } diff --git a/src/main/scala/org/warcbase/spark/archive/io/GenericArchiveRecord.scala b/src/main/scala/org/warcbase/spark/archive/io/GenericArchiveRecord.scala index 2d6a1cb..3d37d12 100644 --- a/src/main/scala/org/warcbase/spark/archive/io/GenericArchiveRecord.scala +++ b/src/main/scala/org/warcbase/spark/archive/io/GenericArchiveRecord.scala @@ -1,63 +1,63 @@ package org.warcbase.spark.archive.io import java.text.SimpleDateFormat import org.apache.spark.SerializableWritable import org.archive.io.arc.ARCRecord import org.archive.io.warc.WARCRecord import org.archive.util.ArchiveUtils import org.warcbase.data.{ArcRecordUtils, WarcRecordUtils} import org.warcbase.io.GenericArchiveRecordWritable import org.warcbase.io.GenericArchiveRecordWritable.ArchiveFormat import org.warcbase.spark.matchbox.ExtractDate.DateComponent -import org.warcbase.spark.matchbox.{ExtractDate, ExtractTopLevelDomain} +import org.warcbase.spark.matchbox.{ExtractDate, ExtractDomain} class GenericArchiveRecord(r: SerializableWritable[GenericArchiveRecordWritable]) extends ArchiveRecord { var arcRecord: ARCRecord = null var warcRecord: WARCRecord = null if (r.t.getFormat == ArchiveFormat.ARC) arcRecord = r.t.getRecord.asInstanceOf[ARCRecord] else if (r.t.getFormat == ArchiveFormat.WARC) warcRecord = r.t.getRecord.asInstanceOf[WARCRecord] val ISO8601 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssX") val getCrawldate: String = { if (r.t.getFormat == ArchiveFormat.ARC) { ExtractDate(arcRecord.getMetaData.getDate, DateComponent.YYYYMMDD) } else { ExtractDate(ArchiveUtils.get14DigitDate(ISO8601.parse(warcRecord.getHeader.getDate)), DateComponent.YYYYMMDD) } } val getContentBytes: Array[Byte] = { if (r.t.getFormat == ArchiveFormat.ARC) { ArcRecordUtils.getBodyContent(arcRecord) } else { WarcRecordUtils.getContent(warcRecord) } } val getContentString: String = new String(getContentBytes) val getMimeType = { if (r.t.getFormat == ArchiveFormat.ARC) { arcRecord.getMetaData.getMimetype } else { WarcRecordUtils.getWarcResponseMimeType(getContentBytes) } } val getUrl = { if (r.t.getFormat == ArchiveFormat.ARC) { arcRecord.getMetaData.getUrl } else { warcRecord.getHeader.getUrl } } - val getDomain: String = ExtractTopLevelDomain(getUrl) + val getDomain: String = ExtractDomain(getUrl) } diff --git a/src/main/scala/org/warcbase/spark/archive/io/WarcRecord.scala b/src/main/scala/org/warcbase/spark/archive/io/WarcRecord.scala index 22e67d2..269910b 100644 --- a/src/main/scala/org/warcbase/spark/archive/io/WarcRecord.scala +++ b/src/main/scala/org/warcbase/spark/archive/io/WarcRecord.scala @@ -1,26 +1,26 @@ package org.warcbase.spark.archive.io import java.text.SimpleDateFormat import org.apache.spark.SerializableWritable import org.archive.util.ArchiveUtils import org.warcbase.data.WarcRecordUtils import org.warcbase.io.WarcRecordWritable import org.warcbase.spark.matchbox.ExtractDate.DateComponent -import org.warcbase.spark.matchbox.{ExtractDate, ExtractTopLevelDomain} +import org.warcbase.spark.matchbox.{ExtractDate, ExtractDomain} class WarcRecord(r: SerializableWritable[WarcRecordWritable]) extends ArchiveRecord { val ISO8601 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssX") val getCrawldate: String = ExtractDate(ArchiveUtils.get14DigitDate(ISO8601.parse(r.t.getRecord.getHeader.getDate)), DateComponent.YYYYMMDD) val getContentBytes: Array[Byte] = WarcRecordUtils.getContent(r.t.getRecord) val getContentString: String = new String(getContentBytes) val getMimeType = WarcRecordUtils.getWarcResponseMimeType(getContentBytes) val getUrl = r.t.getRecord.getHeader.getUrl - val getDomain = ExtractTopLevelDomain(getUrl) + val getDomain = ExtractDomain(getUrl) } diff --git a/src/main/scala/org/warcbase/spark/matchbox/ExtractTopLevelDomain.scala b/src/main/scala/org/warcbase/spark/matchbox/ExtractDomain.scala similarity index 97% rename from src/main/scala/org/warcbase/spark/matchbox/ExtractTopLevelDomain.scala rename to src/main/scala/org/warcbase/spark/matchbox/ExtractDomain.scala index 7320a9e..c3eaecd 100644 --- a/src/main/scala/org/warcbase/spark/matchbox/ExtractTopLevelDomain.scala +++ b/src/main/scala/org/warcbase/spark/matchbox/ExtractDomain.scala @@ -1,37 +1,37 @@ /* * Warcbase: an open-source platform for managing web archives * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.warcbase.spark.matchbox import java.net.URL -object ExtractTopLevelDomain { +object ExtractDomain { def apply(url: String, source: String = ""): String = { if (url == null) return null var host: String = null try { host = new URL(url).getHost } catch { case e: Exception => // it's okay } if (host != null || source == null) return host try { new URL(source).getHost } catch { case e: Exception => null } } } diff --git a/src/main/scala/org/warcbase/spark/rdd/RecordRDD.scala b/src/main/scala/org/warcbase/spark/rdd/RecordRDD.scala index 87c68f3..20a7389 100644 --- a/src/main/scala/org/warcbase/spark/rdd/RecordRDD.scala +++ b/src/main/scala/org/warcbase/spark/rdd/RecordRDD.scala @@ -1,115 +1,115 @@ /* * Warcbase: an open-source platform for managing web archives * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.warcbase.spark.rdd import org.apache.spark.rdd.RDD import org.warcbase.spark.archive.io.ArchiveRecord -import org.warcbase.spark.matchbox.{DetectLanguage, ExtractDate, ExtractTopLevelDomain, RemoveHTML} +import org.warcbase.spark.matchbox.{DetectLanguage, ExtractDate, ExtractDomain, RemoveHTML} import org.warcbase.spark.matchbox.ExtractDate.DateComponent import org.warcbase.spark.matchbox.ExtractDate.DateComponent.DateComponent import scala.reflect.ClassTag import scala.util.matching.Regex /** * RDD wrappers for working with Records */ object RecordRDD extends java.io.Serializable { /** * A Wrapper class around RDD to simplify counting */ implicit class CountableRDD[T: ClassTag](rdd: RDD[T]) extends java.io.Serializable { def countItems(): RDD[(T, Int)] = { rdd.map(r => (r, 1)) .reduceByKey((c1, c2) => c1 + c2) .sortBy(f => f._2, ascending = false) } } /** * A Wrapper class around RDD to allow RDDs of type ARCRecord and WARCRecord to be queried via a fluent API. * * To load such an RDD, please see [[org.warcbase.spark.matchbox.RecordLoader]] */ implicit class WARecordRDD(rdd: RDD[ArchiveRecord]) extends java.io.Serializable { def keepValidPages(): RDD[ArchiveRecord] = { rdd.filter(r => r.getCrawldate != null && (r.getMimeType == "text/html" || r.getUrl.endsWith("htm") || r.getUrl.endsWith("html")) && !r.getUrl.endsWith("robots.txt")) } def keepMimeTypes(mimeTypes: Set[String]) = { rdd.filter(r => mimeTypes.contains(r.getMimeType)) } def keepDate(date: String, component: DateComponent = DateComponent.YYYYMMDD) = { rdd.filter(r => ExtractDate(r.getCrawldate, component) == date) } def keepUrls(urls: Set[String]) = { rdd.filter(r => urls.contains(r.getUrl)) } def keepUrlPatterns(urlREs: Set[Regex]) = { rdd.filter(r => urlREs.map(re => r.getUrl match { case re() => true case _ => false }).exists(identity)) } def keepDomains(urls: Set[String]) = { - rdd.filter(r => urls.contains(ExtractTopLevelDomain(r.getUrl).replace("^\\s*www\\.", ""))) + rdd.filter(r => urls.contains(ExtractDomain(r.getUrl).replace("^\\s*www\\.", ""))) } def keepLanguages(lang: Set[String]) = { rdd.filter(r => lang.contains(DetectLanguage(RemoveHTML(r.getContentString)))) } def discardMimeTypes(mimeTypes: Set[String]) = { rdd.filter(r => !mimeTypes.contains(r.getMimeType)) } def discardDate(date: String) = { rdd.filter(r => r.getCrawldate != date) } def discardUrls(urls: Set[String]) = { rdd.filter(r => !urls.contains(r.getUrl)) } def discardUrlPatterns(urlREs: Set[Regex]) = { rdd.filter(r => !urlREs.map(re => r.getUrl match { case re() => true case _ => false }).exists(identity)) } def discardDomains(urls: Set[String]) = { rdd.filter(r => !urls.contains(r.getDomain)) } } } \ No newline at end of file diff --git a/src/main/scala/org/warcbase/spark/scripts/CrawlStatistics.scala b/src/main/scala/org/warcbase/spark/scripts/CrawlStatistics.scala index 5d4cff3..26da0ac 100644 --- a/src/main/scala/org/warcbase/spark/scripts/CrawlStatistics.scala +++ b/src/main/scala/org/warcbase/spark/scripts/CrawlStatistics.scala @@ -1,89 +1,89 @@ package org.warcbase.spark.scripts import com.google.common.io.Resources import org.apache.spark.{SparkConf, SparkContext} -import org.warcbase.spark.matchbox.{ExtractLinks, ExtractTopLevelDomain, RecordLoader} +import org.warcbase.spark.matchbox.{ExtractLinks, ExtractDomain, RecordLoader} import org.warcbase.spark.matchbox._ import org.warcbase.spark.rdd.RecordRDD._ object CrawlStatistics { private val arcPath = Resources.getResource("arc/example.arc.gz").getPath private val warcPath = Resources.getResource("warc/example.warc.gz").getPath def numPagesPerCrawl(sc: SparkContext) = { val a = RecordLoader.loadArc(arcPath, sc) val b = RecordLoader.loadWarc(warcPath, sc) val pageCounts = a.union(b) .keepValidPages() .map(_.getCrawldate) .countItems() println(pageCounts.take(1).mkString) } def numLinksPerCrawl(sc: SparkContext) = { val a = RecordLoader.loadArc(arcPath, sc) val b = RecordLoader.loadWarc(warcPath, sc) val linkCounts = a.union(b) .keepValidPages() .map(r => (r.getCrawldate.substring(0, 6), ExtractLinks(r.getUrl, r.getContentString))) .flatMap(r => r._2.map(f => (r._1, f._1, f._2))) .filter(r => r._2 != null && r._3 != null) .map(_._1) .countItems() println(linkCounts.take(1).mkString) } def numPagesByDomain(sc: SparkContext): Unit = { val a = RecordLoader.loadArc(arcPath, sc) val b = RecordLoader.loadWarc(warcPath, sc) val domainCounts = a.union(b) .keepValidPages() .map(r => (r.getCrawldate.substring(0, 6), r.getDomain.replaceAll("^\\s*www\\.", ""))) .countItems() .filter(f => f._2 > 10) .sortBy(f => (f._1, f._2)) .collect() println(domainCounts.take(1).mkString) } def linkStructure(sc: SparkContext) = { val linkStructure = RecordLoader.loadArc(arcPath, sc) .keepValidPages() .map(r => (r.getCrawldate.substring(0, 6), ExtractLinks(r.getUrl, r.getContentString))) - .flatMap(r => r._2.map(f => (r._1, ExtractTopLevelDomain(f._1).replaceAll("^\\s*www\\.", ""), ExtractTopLevelDomain(f._2).replaceAll("^\\s*www\\.", "")))) + .flatMap(r => r._2.map(f => (r._1, ExtractDomain(f._1).replaceAll("^\\s*www\\.", ""), ExtractDomain(f._2).replaceAll("^\\s*www\\.", "")))) .filter(r => r._2 != null && r._3 != null) .countItems() .collect() println(linkStructure.take(1).mkString) } def warclinkStructure(sc: SparkContext) = { val linkStructure = RecordLoader.loadWarc(warcPath, sc) .keepValidPages() .map(r => (ExtractDate(r.getCrawldate, ExtractDate.DateComponent.YYYYMM), ExtractLinks(r.getUrl, r.getContentString))) - .flatMap(r => r._2.map(f => (r._1, ExtractTopLevelDomain(f._1).replaceAll("^\\s*www\\.", ""), ExtractTopLevelDomain(f._2).replaceAll("^\\s*www\\.", "")))) + .flatMap(r => r._2.map(f => (r._1, ExtractDomain(f._1).replaceAll("^\\s*www\\.", ""), ExtractDomain(f._2).replaceAll("^\\s*www\\.", "")))) .filter(r => r._2 != null && r._3 != null) .countItems() .map(r => TupleFormatter.tabDelimit(r)) .collect() println(linkStructure.take(1).head) } def main(args: Array[String]) = { val master = "local[4]" val appName = "example-spark" val conf = new SparkConf() .setMaster(master) .setAppName(appName) val sc = new SparkContext(conf) try { numPagesPerCrawl(sc) numLinksPerCrawl(sc) numPagesByDomain(sc) linkStructure(sc) warclinkStructure(sc) } finally { sc.stop() } } } diff --git a/src/test/scala/org/warcbase/spark/matchbox/ExtractTopLevelDomainTest.scala b/src/test/scala/org/warcbase/spark/matchbox/ExtractDomainTest.scala similarity index 86% rename from src/test/scala/org/warcbase/spark/matchbox/ExtractTopLevelDomainTest.scala rename to src/test/scala/org/warcbase/spark/matchbox/ExtractDomainTest.scala index f02dcbf..0b1afd4 100644 --- a/src/test/scala/org/warcbase/spark/matchbox/ExtractTopLevelDomainTest.scala +++ b/src/test/scala/org/warcbase/spark/matchbox/ExtractDomainTest.scala @@ -1,46 +1,46 @@ /* * Warcbase: an open-source platform for managing web archives * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.warcbase.spark.matchbox import org.junit.runner.RunWith import org.scalatest.FunSuite import org.scalatest.junit.JUnitRunner @RunWith(classOf[JUnitRunner]) -class ExtractTopLevelDomainTest extends FunSuite { +class ExtractDomainTest extends FunSuite { private val data1: Seq[(String, String)] = Seq.newBuilder.+=( ("http://www.umiacs.umd.edu/~jimmylin/", "www.umiacs.umd.edu"), ("https://github.com/lintool", "github.com"), ("http://ianmilligan.ca/2015/05/04/iipc-2015-slides-for-warcs-wats-and-wgets-presentation/", "ianmilligan.ca"), ("index.html", null)).result() private val data2 = Seq.newBuilder.+=( ("index.html", "http://www.umiacs.umd.edu/~jimmylin/", "www.umiacs.umd.edu"), ("index.html", "lintool/", null)).result() test("simple") { data1.foreach { - case (link, domain) => assert(ExtractTopLevelDomain(link) == domain) + case (link, domain) => assert(ExtractDomain(link) == domain) } } test("withBase") { data2.foreach { - case (link, base, domain) => assert(ExtractTopLevelDomain(link, base) == domain) + case (link, base, domain) => assert(ExtractDomain(link, base) == domain) } } } diff --git a/src/test/scala/org/warcbase/spark/rdd/CountableRDDTest.scala b/src/test/scala/org/warcbase/spark/rdd/CountableRDDTest.scala index 36bf7b2..e1710ec 100644 --- a/src/test/scala/org/warcbase/spark/rdd/CountableRDDTest.scala +++ b/src/test/scala/org/warcbase/spark/rdd/CountableRDDTest.scala @@ -1,61 +1,61 @@ /* * Warcbase: an open-source platform for managing web archives * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.warcbase.spark.rdd import com.google.common.io.Resources import org.apache.spark.{SparkConf, SparkContext} import org.junit.runner.RunWith import org.scalatest.junit.JUnitRunner import org.scalatest.{BeforeAndAfter, FunSuite} import org.warcbase.spark.matchbox._ import org.warcbase.spark.rdd.RecordRDD._ @RunWith(classOf[JUnitRunner]) class CountableRDDTest extends FunSuite with BeforeAndAfter { private val arcPath = Resources.getResource("arc/example.arc.gz").getPath private val master = "local[4]" private val appName = "example-spark" private var sc: SparkContext = _ before { val conf = new SparkConf() .setMaster(master) .setAppName(appName) sc = new SparkContext(conf) } test("count records") { val base = RecordLoader.loadArc(arcPath, sc) .keepValidPages() - .map(r => ExtractTopLevelDomain(r.getUrl)) + .map(r => ExtractDomain(r.getUrl)) val r = base .map(r => (r, 1)) .reduceByKey(_ + _) .map(_.swap) .sortByKey(ascending = false) .map(_.swap) .collect() val r2 = base.countItems().collect() assert(r sameElements r2) } after { if (sc != null) { sc.stop() } } }