diff --git a/src/main/scala/org/warcbase/spark/rdd/RecordRDD.scala b/src/main/scala/org/warcbase/spark/rdd/RecordRDD.scala index bf67b9e..dda3c16 100644 --- a/src/main/scala/org/warcbase/spark/rdd/RecordRDD.scala +++ b/src/main/scala/org/warcbase/spark/rdd/RecordRDD.scala @@ -1,115 +1,133 @@ /* * Warcbase: an open-source platform for managing web archives * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.warcbase.spark.rdd import org.apache.spark.rdd.RDD import org.warcbase.spark.archive.io.ArchiveRecord import org.warcbase.spark.matchbox.{DetectLanguage, ExtractDate, ExtractDomain, RemoveHTML} import org.warcbase.spark.matchbox.ExtractDate.DateComponent import org.warcbase.spark.matchbox.ExtractDate.DateComponent.DateComponent import scala.reflect.ClassTag import scala.util.matching.Regex /** * RDD wrappers for working with Records */ object RecordRDD extends java.io.Serializable { /** * A Wrapper class around RDD to simplify counting */ implicit class CountableRDD[T: ClassTag](rdd: RDD[T]) extends java.io.Serializable { def countItems(): RDD[(T, Int)] = { rdd.map(r => (r, 1)) .reduceByKey((c1, c2) => c1 + c2) .sortBy(f => f._2, ascending = false) } } /** * A Wrapper class around RDD to allow RDDs of type ARCRecord and WARCRecord to be queried via a fluent API. * * To load such an RDD, please see [[org.warcbase.spark.matchbox.RecordLoader]] */ implicit class WARecordRDD(rdd: RDD[ArchiveRecord]) extends java.io.Serializable { def keepValidPages(): RDD[ArchiveRecord] = { rdd.filter(r => r.getCrawlDate != null && (r.getMimeType == "text/html" || r.getUrl.endsWith("htm") || r.getUrl.endsWith("html")) && !r.getUrl.endsWith("robots.txt")) } def keepMimeTypes(mimeTypes: Set[String]) = { rdd.filter(r => mimeTypes.contains(r.getMimeType)) } def keepDate(date: String, component: DateComponent = DateComponent.YYYYMMDD) = { rdd.filter(r => ExtractDate(r.getCrawlDate, component) == date) } def keepUrls(urls: Set[String]) = { rdd.filter(r => urls.contains(r.getUrl)) } def keepUrlPatterns(urlREs: Set[Regex]) = { rdd.filter(r => urlREs.map(re => r.getUrl match { case re() => true case _ => false }).exists(identity)) } def keepDomains(urls: Set[String]) = { rdd.filter(r => urls.contains(ExtractDomain(r.getUrl).replace("^\\s*www\\.", ""))) } def keepLanguages(lang: Set[String]) = { rdd.filter(r => lang.contains(DetectLanguage(RemoveHTML(r.getContentString)))) } + def keepContent(contentREs: Set[Regex]) = { + rdd.filter(r => + contentREs.map(re => + (re findFirstIn r.getContentString) match { + case Some(v) => true + case None => false + }).exists(identity)) + } + def discardMimeTypes(mimeTypes: Set[String]) = { rdd.filter(r => !mimeTypes.contains(r.getMimeType)) } def discardDate(date: String) = { rdd.filter(r => r.getCrawlDate != date) } def discardUrls(urls: Set[String]) = { rdd.filter(r => !urls.contains(r.getUrl)) } def discardUrlPatterns(urlREs: Set[Regex]) = { rdd.filter(r => !urlREs.map(re => r.getUrl match { case re() => true case _ => false }).exists(identity)) } def discardDomains(urls: Set[String]) = { rdd.filter(r => !urls.contains(r.getDomain)) } + + def discardContent(contentREs: Set[Regex]) = { + rdd.filter(r => + !contentREs.map(re => + (re findFirstIn r.getContentString) match { + case Some(v) => true + case None => false + }).exists(identity)) + } } } \ No newline at end of file