Page Menu
Home
c4science
Search
Configure Global Search
Log In
Files
F64775968
RecordRDD.scala
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Subscribers
None
File Metadata
Details
File Info
Storage
Attached
Created
Wed, May 29, 09:11
Size
3 KB
Mime Type
text/x-c
Expires
Fri, May 31, 09:11 (1 d, 23 h)
Engine
blob
Format
Raw Data
Handle
17956580
Attached To
R1473 warcbase
RecordRDD.scala
View Options
/*
* Warcbase: an open-source platform for managing web archives
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.warcbase.spark.rdd
import org.apache.spark.rdd.RDD
import org.warcbase.spark.archive.io.ArchiveRecord
import org.warcbase.spark.matchbox.{DetectLanguage, ExtractDate, ExtractDomain, RemoveHTML}
import org.warcbase.spark.matchbox.ExtractDate.DateComponent
import org.warcbase.spark.matchbox.ExtractDate.DateComponent.DateComponent
import scala.reflect.ClassTag
import scala.util.matching.Regex
/**
* RDD wrappers for working with Records
*/
object RecordRDD extends java.io.Serializable {
/**
* A Wrapper class around RDD to simplify counting
*/
implicit class CountableRDD[T: ClassTag](rdd: RDD[T]) extends java.io.Serializable {
def countItems(): RDD[(T, Int)] = {
rdd.map(r => (r, 1))
.reduceByKey((c1, c2) => c1 + c2)
.sortBy(f => f._2, ascending = false)
}
}
/**
* A Wrapper class around RDD to allow RDDs of type ARCRecord and WARCRecord to be queried via a fluent API.
*
* To load such an RDD, please see [[org.warcbase.spark.matchbox.RecordLoader]]
*/
implicit class WARecordRDD(rdd: RDD[ArchiveRecord]) extends java.io.Serializable {
def keepValidPages(): RDD[ArchiveRecord] = {
rdd.filter(r =>
r.getCrawlDate != null
&& (r.getMimeType == "text/html"
|| r.getUrl.endsWith("htm")
|| r.getUrl.endsWith("html"))
&& !r.getUrl.endsWith("robots.txt"))
}
def keepMimeTypes(mimeTypes: Set[String]) = {
rdd.filter(r => mimeTypes.contains(r.getMimeType))
}
def keepDate(date: String, component: DateComponent = DateComponent.YYYYMMDD) = {
rdd.filter(r => ExtractDate(r.getCrawlDate, component) == date)
}
def keepUrls(urls: Set[String]) = {
rdd.filter(r => urls.contains(r.getUrl))
}
def keepUrlPatterns(urlREs: Set[Regex]) = {
rdd.filter(r =>
urlREs.map(re =>
r.getUrl match {
case re() => true
case _ => false
}).exists(identity))
}
def keepDomains(urls: Set[String]) = {
rdd.filter(r => urls.contains(ExtractDomain(r.getUrl).replace("^\\s*www\\.", "")))
}
def keepLanguages(lang: Set[String]) = {
rdd.filter(r => lang.contains(DetectLanguage(RemoveHTML(r.getContentString))))
}
def keepContent(contentREs: Set[Regex]) = {
rdd.filter(r =>
contentREs.map(re =>
(re findFirstIn r.getContentString) match {
case Some(v) => true
case None => false
}).exists(identity))
}
def discardMimeTypes(mimeTypes: Set[String]) = {
rdd.filter(r => !mimeTypes.contains(r.getMimeType))
}
def discardDate(date: String) = {
rdd.filter(r => r.getCrawlDate != date)
}
def discardUrls(urls: Set[String]) = {
rdd.filter(r => !urls.contains(r.getUrl))
}
def discardUrlPatterns(urlREs: Set[Regex]) = {
rdd.filter(r =>
!urlREs.map(re =>
r.getUrl match {
case re() => true
case _ => false
}).exists(identity))
}
def discardDomains(urls: Set[String]) = {
rdd.filter(r => !urls.contains(r.getDomain))
}
def discardContent(contentREs: Set[Regex]) = {
rdd.filter(r =>
!contentREs.map(re =>
(re findFirstIn r.getContentString) match {
case Some(v) => true
case None => false
}).exists(identity))
}
}
}
Event Timeline
Log In to Comment