diff --git a/src/main/scala/org/warcbase/spark/matchbox/ExtractLinks.scala b/src/main/scala/org/warcbase/spark/matchbox/ExtractLinks.scala
index 2940634..5f75beb 100644
--- a/src/main/scala/org/warcbase/spark/matchbox/ExtractLinks.scala
+++ b/src/main/scala/org/warcbase/spark/matchbox/ExtractLinks.scala
@@ -1,40 +1,43 @@
package org.warcbase.spark.matchbox
import java.io.IOException
import org.jsoup.Jsoup
import org.jsoup.select.Elements
import scala.collection.mutable
/**
* UDF for extracting links from a webpage given the HTML content (using Jsoup).
*
*/
object ExtractLinks {
/**
- * @param html the content from which links are to be extracted
- * Returns a sequence of links
+ * @param src the src link.
+ * @param html the content from which links are to be extracted.
+ * @param base an optional base domain.
+ *
+ * Returns a sequence of (source, target, anchortext)
*/
- def apply(html: String, base: String = ""): Seq[(String, String)] = {
+ def apply(src: String, html: String, base: String = ""): Seq[(String, String, String)] = {
if (html.isEmpty) return Nil
try {
- val output = mutable.MutableList[(String, String)]()
+ val output = mutable.MutableList[(String, String, String)]()
val doc = Jsoup.parse(html)
val links: Elements = doc.select("a[href]")
val it = links.iterator()
while (it.hasNext) {
val link = it.next()
if (base.nonEmpty) link.setBaseUri(base)
val target = link.attr("abs:href")
if (target.nonEmpty) {
- output += ((target, link.text))
+ output += ((src, target, link.text))
}
}
output
} catch {
case e: Exception =>
throw new IOException("Caught exception processing input row ", e);
}
}
}
\ No newline at end of file
diff --git a/src/test/scala/org/warcbase/spark/ArcTest.scala b/src/test/scala/org/warcbase/spark/ArcTest.scala
index e6db3a9..01a6b6e 100644
--- a/src/test/scala/org/warcbase/spark/ArcTest.scala
+++ b/src/test/scala/org/warcbase/spark/ArcTest.scala
@@ -1,87 +1,87 @@
package org.warcbase.spark
import com.google.common.io.Resources
import org.apache.spark.{SparkConf, SparkContext}
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import org.scalatest.{BeforeAndAfter, FunSuite}
import org.warcbase.spark.matchbox._
import org.warcbase.spark.rdd.RecordRDD._
@RunWith(classOf[JUnitRunner])
class ArcTest extends FunSuite with BeforeAndAfter {
private val arcPath = Resources.getResource("arc/example.arc.gz").getPath
private val master = "local[4]"
private val appName = "example-spark"
private var sc: SparkContext = _
before {
val conf = new SparkConf()
.setMaster(master)
.setAppName(appName)
sc = new SparkContext(conf)
}
test("count records") {
assert(RecordLoader.loadArc(arcPath, sc).count == 300L)
}
test("count links") {
val links = RecordLoader.loadArc(arcPath, sc)
- .map(r => ExtractLinks(r.getBodyContent))
+ .map(r => ExtractLinks(r.getUrl, r.getBodyContent))
.reduce((a, b) => a ++ b)
assert(links.size == 664)
}
test("detect language") {
val languageCounts = RecordLoader.loadArc(arcPath, sc)
.keepMimeTypes(Set("text/html"))
.map(r => ExtractRawText(r.getBodyContent))
.groupBy(content => DetectLanguage(content))
.map(f => {
(f._1, f._2.size)
})
.collect
languageCounts.foreach {
case ("en", count) => assert(57L == count)
case ("et", count) => assert(6L == count)
case ("it", count) => assert(1L == count)
case ("lt", count) => assert(61L == count)
case ("no", count) => assert(6L == count)
case ("ro", count) => assert(4L == count)
case (_, count) => print(_)
}
}
test("detect mime type tika") {
val mimeTypeCounts = RecordLoader.loadArc(arcPath, sc)
.map(r => ExtractRawText(r.getBodyContent))
.groupBy(content => DetectMimeTypeTika(content))
.map(f => {
println(f._1 + " : " + f._2.size)
(f._1, f._2.size)
}).collect
mimeTypeCounts.foreach {
case ("image/gif", count) => assert(29L == count)
case ("image/png", count) => assert(8L == count)
case ("image/jpeg", count) => assert(18L == count)
case ("text/html", count) => assert(132L == count)
case ("text/plain", count) => assert(229L == count)
case ("application/xml", count) => assert(1L == count)
case ("application/rss+xml", count) => assert(9L == count)
case ("application/xhtml+xml", count) => assert(1L == count)
case ("application/octet-stream", count) => assert(26L == count)
case ("application/x-shockwave-flash", count) => assert(8L == count)
case (_, count) => print(_)
}
}
after {
if (sc != null) {
sc.stop()
}
}
}
diff --git a/src/test/scala/org/warcbase/spark/matchbox/ExtractLinksTest.scala b/src/test/scala/org/warcbase/spark/matchbox/ExtractLinksTest.scala
index 90cc1e2..1b3875f 100644
--- a/src/test/scala/org/warcbase/spark/matchbox/ExtractLinksTest.scala
+++ b/src/test/scala/org/warcbase/spark/matchbox/ExtractLinksTest.scala
@@ -1,28 +1,28 @@
package org.warcbase.spark.matchbox
import org.junit.runner.RunWith
import org.scalatest.FunSuite
import org.scalatest.junit.JUnitRunner
@RunWith(classOf[JUnitRunner])
class ExtractLinksTest extends FunSuite {
test("simple") {
val fragment: String = "Here is a search engine.\n" + "Here is Twitter.\n"
- val extracted: Seq[(String, String)] = ExtractLinks(fragment)
+ val extracted: Seq[(String, String, String)] = ExtractLinks("", fragment)
assert(extracted.size == 2)
- assert("http://www.google.com" == extracted.head._1)
- assert("a search engine" == extracted.head._2)
- assert("http://www.twitter.com/" == extracted.last._1)
- assert("Twitter" == extracted.last._2)
+ assert("http://www.google.com" == extracted.head._2)
+ assert("a search engine" == extracted.head._3)
+ assert("http://www.twitter.com/" == extracted.last._2)
+ assert("Twitter" == extracted.last._3)
}
test("relative") {
val fragment: String = "Here is a search engine.\n" + "Here is a relative URL.\n"
- val extracted: Seq[(String, String)] = ExtractLinks(fragment, "http://www.foobar.org/index.html")
+ val extracted: Seq[(String, String, String)] = ExtractLinks("", fragment, "http://www.foobar.org/index.html")
assert(extracted.size == 2)
- assert("http://www.google.com" == extracted.head._1)
- assert("a search engine" == extracted.head._2)
- assert("http://www.foobar.org/page.html" == extracted.last._1)
- assert("a relative URL" == extracted.last._2)
+ assert("http://www.google.com" == extracted.head._2)
+ assert("a search engine" == extracted.head._3)
+ assert("http://www.foobar.org/page.html" == extracted.last._2)
+ assert("a relative URL" == extracted.last._3)
}
}