diff --git a/src/main/scala/org/warcbase/spark/matchbox/ExtractLinks.scala b/src/main/scala/org/warcbase/spark/matchbox/ExtractLinks.scala index 2940634..5f75beb 100644 --- a/src/main/scala/org/warcbase/spark/matchbox/ExtractLinks.scala +++ b/src/main/scala/org/warcbase/spark/matchbox/ExtractLinks.scala @@ -1,40 +1,43 @@ package org.warcbase.spark.matchbox import java.io.IOException import org.jsoup.Jsoup import org.jsoup.select.Elements import scala.collection.mutable /** * UDF for extracting links from a webpage given the HTML content (using Jsoup). * */ object ExtractLinks { /** - * @param html the content from which links are to be extracted - * Returns a sequence of links + * @param src the src link. + * @param html the content from which links are to be extracted. + * @param base an optional base domain. + * + * Returns a sequence of (source, target, anchortext) */ - def apply(html: String, base: String = ""): Seq[(String, String)] = { + def apply(src: String, html: String, base: String = ""): Seq[(String, String, String)] = { if (html.isEmpty) return Nil try { - val output = mutable.MutableList[(String, String)]() + val output = mutable.MutableList[(String, String, String)]() val doc = Jsoup.parse(html) val links: Elements = doc.select("a[href]") val it = links.iterator() while (it.hasNext) { val link = it.next() if (base.nonEmpty) link.setBaseUri(base) val target = link.attr("abs:href") if (target.nonEmpty) { - output += ((target, link.text)) + output += ((src, target, link.text)) } } output } catch { case e: Exception => throw new IOException("Caught exception processing input row ", e); } } } \ No newline at end of file diff --git a/src/test/scala/org/warcbase/spark/ArcTest.scala b/src/test/scala/org/warcbase/spark/ArcTest.scala index e6db3a9..01a6b6e 100644 --- a/src/test/scala/org/warcbase/spark/ArcTest.scala +++ b/src/test/scala/org/warcbase/spark/ArcTest.scala @@ -1,87 +1,87 @@ package org.warcbase.spark import com.google.common.io.Resources import org.apache.spark.{SparkConf, SparkContext} import org.junit.runner.RunWith import org.scalatest.junit.JUnitRunner import org.scalatest.{BeforeAndAfter, FunSuite} import org.warcbase.spark.matchbox._ import org.warcbase.spark.rdd.RecordRDD._ @RunWith(classOf[JUnitRunner]) class ArcTest extends FunSuite with BeforeAndAfter { private val arcPath = Resources.getResource("arc/example.arc.gz").getPath private val master = "local[4]" private val appName = "example-spark" private var sc: SparkContext = _ before { val conf = new SparkConf() .setMaster(master) .setAppName(appName) sc = new SparkContext(conf) } test("count records") { assert(RecordLoader.loadArc(arcPath, sc).count == 300L) } test("count links") { val links = RecordLoader.loadArc(arcPath, sc) - .map(r => ExtractLinks(r.getBodyContent)) + .map(r => ExtractLinks(r.getUrl, r.getBodyContent)) .reduce((a, b) => a ++ b) assert(links.size == 664) } test("detect language") { val languageCounts = RecordLoader.loadArc(arcPath, sc) .keepMimeTypes(Set("text/html")) .map(r => ExtractRawText(r.getBodyContent)) .groupBy(content => DetectLanguage(content)) .map(f => { (f._1, f._2.size) }) .collect languageCounts.foreach { case ("en", count) => assert(57L == count) case ("et", count) => assert(6L == count) case ("it", count) => assert(1L == count) case ("lt", count) => assert(61L == count) case ("no", count) => assert(6L == count) case ("ro", count) => assert(4L == count) case (_, count) => print(_) } } test("detect mime type tika") { val mimeTypeCounts = RecordLoader.loadArc(arcPath, sc) .map(r => ExtractRawText(r.getBodyContent)) .groupBy(content => DetectMimeTypeTika(content)) .map(f => { println(f._1 + " : " + f._2.size) (f._1, f._2.size) }).collect mimeTypeCounts.foreach { case ("image/gif", count) => assert(29L == count) case ("image/png", count) => assert(8L == count) case ("image/jpeg", count) => assert(18L == count) case ("text/html", count) => assert(132L == count) case ("text/plain", count) => assert(229L == count) case ("application/xml", count) => assert(1L == count) case ("application/rss+xml", count) => assert(9L == count) case ("application/xhtml+xml", count) => assert(1L == count) case ("application/octet-stream", count) => assert(26L == count) case ("application/x-shockwave-flash", count) => assert(8L == count) case (_, count) => print(_) } } after { if (sc != null) { sc.stop() } } } diff --git a/src/test/scala/org/warcbase/spark/matchbox/ExtractLinksTest.scala b/src/test/scala/org/warcbase/spark/matchbox/ExtractLinksTest.scala index 90cc1e2..1b3875f 100644 --- a/src/test/scala/org/warcbase/spark/matchbox/ExtractLinksTest.scala +++ b/src/test/scala/org/warcbase/spark/matchbox/ExtractLinksTest.scala @@ -1,28 +1,28 @@ package org.warcbase.spark.matchbox import org.junit.runner.RunWith import org.scalatest.FunSuite import org.scalatest.junit.JUnitRunner @RunWith(classOf[JUnitRunner]) class ExtractLinksTest extends FunSuite { test("simple") { val fragment: String = "Here is a search engine.\n" + "Here is Twitter.\n" - val extracted: Seq[(String, String)] = ExtractLinks(fragment) + val extracted: Seq[(String, String, String)] = ExtractLinks("", fragment) assert(extracted.size == 2) - assert("http://www.google.com" == extracted.head._1) - assert("a search engine" == extracted.head._2) - assert("http://www.twitter.com/" == extracted.last._1) - assert("Twitter" == extracted.last._2) + assert("http://www.google.com" == extracted.head._2) + assert("a search engine" == extracted.head._3) + assert("http://www.twitter.com/" == extracted.last._2) + assert("Twitter" == extracted.last._3) } test("relative") { val fragment: String = "Here is a search engine.\n" + "Here is a relative URL.\n" - val extracted: Seq[(String, String)] = ExtractLinks(fragment, "http://www.foobar.org/index.html") + val extracted: Seq[(String, String, String)] = ExtractLinks("", fragment, "http://www.foobar.org/index.html") assert(extracted.size == 2) - assert("http://www.google.com" == extracted.head._1) - assert("a search engine" == extracted.head._2) - assert("http://www.foobar.org/page.html" == extracted.last._1) - assert("a relative URL" == extracted.last._2) + assert("http://www.google.com" == extracted.head._2) + assert("a search engine" == extracted.head._3) + assert("http://www.foobar.org/page.html" == extracted.last._2) + assert("a relative URL" == extracted.last._3) } }