diff --git a/warcbase-core/src/main/scala/org/warcbase/spark/archive/io/ArcRecord.scala b/warcbase-core/src/main/scala/org/warcbase/spark/archive/io/ArcRecord.scala index c4ea11b..7aff49b 100644 --- a/warcbase-core/src/main/scala/org/warcbase/spark/archive/io/ArcRecord.scala +++ b/warcbase-core/src/main/scala/org/warcbase/spark/archive/io/ArcRecord.scala @@ -1,24 +1,32 @@ package org.warcbase.spark.archive.io import org.apache.spark.SerializableWritable import org.warcbase.data.ArcRecordUtils import org.warcbase.io.ArcRecordWritable import org.warcbase.spark.matchbox.ExtractDate.DateComponent -import org.warcbase.spark.matchbox.{ExtractDate, ExtractDomain} +import org.warcbase.spark.matchbox.{RemoveHttpHeader, ExtractDate, ExtractDomain} class ArcRecord(r: SerializableWritable[ArcRecordWritable]) extends ArchiveRecord { val getCrawlDate: String = ExtractDate(r.t.getRecord.getMetaData.getDate, DateComponent.YYYYMMDD) val getCrawlMonth: String = ExtractDate(r.t.getRecord.getMetaData.getDate, DateComponent.YYYYMM) val getMimeType: String = r.t.getRecord.getMetaData.getMimetype val getUrl: String = r.t.getRecord.getMetaData.getUrl val getDomain: String = ExtractDomain(r.t.getRecord.getMetaData.getUrl) val getContentBytes: Array[Byte] = ArcRecordUtils.getBodyContent(r.t.getRecord) val getContentString: String = new String(getContentBytes) + val getImageBytes: Array[Byte] = { + if (getContentString.startsWith("HTTP/")) + getContentBytes.slice( + getContentString.indexOf(RemoveHttpHeader.headerEnd) + + RemoveHttpHeader.headerEnd.length, getContentBytes.length) + else + getContentBytes + } } diff --git a/warcbase-core/src/main/scala/org/warcbase/spark/archive/io/ArchiveRecord.scala b/warcbase-core/src/main/scala/org/warcbase/spark/archive/io/ArchiveRecord.scala index 610ebfc..1f4c992 100644 --- a/warcbase-core/src/main/scala/org/warcbase/spark/archive/io/ArchiveRecord.scala +++ b/warcbase-core/src/main/scala/org/warcbase/spark/archive/io/ArchiveRecord.scala @@ -1,17 +1,19 @@ package org.warcbase.spark.archive.io trait ArchiveRecord extends Serializable { val getCrawlDate: String val getCrawlMonth: String val getUrl: String val getDomain: String val getMimeType: String val getContentString: String val getContentBytes: Array[Byte] + + val getImageBytes: Array[Byte] } diff --git a/warcbase-core/src/main/scala/org/warcbase/spark/archive/io/GenericArchiveRecord.scala b/warcbase-core/src/main/scala/org/warcbase/spark/archive/io/GenericArchiveRecord.scala index 7e8b0ad..7a7e2ba 100644 --- a/warcbase-core/src/main/scala/org/warcbase/spark/archive/io/GenericArchiveRecord.scala +++ b/warcbase-core/src/main/scala/org/warcbase/spark/archive/io/GenericArchiveRecord.scala @@ -1,71 +1,80 @@ package org.warcbase.spark.archive.io import java.text.SimpleDateFormat import org.apache.spark.SerializableWritable import org.archive.io.arc.ARCRecord import org.archive.io.warc.WARCRecord import org.archive.util.ArchiveUtils import org.warcbase.data.{ArcRecordUtils, WarcRecordUtils} import org.warcbase.io.GenericArchiveRecordWritable import org.warcbase.io.GenericArchiveRecordWritable.ArchiveFormat import org.warcbase.spark.matchbox.ExtractDate.DateComponent -import org.warcbase.spark.matchbox.{ExtractDate, ExtractDomain} +import org.warcbase.spark.matchbox.{RemoveHttpHeader, ExtractDate, ExtractDomain} class GenericArchiveRecord(r: SerializableWritable[GenericArchiveRecordWritable]) extends ArchiveRecord { var arcRecord: ARCRecord = null var warcRecord: WARCRecord = null if (r.t.getFormat == ArchiveFormat.ARC) arcRecord = r.t.getRecord.asInstanceOf[ARCRecord] else if (r.t.getFormat == ArchiveFormat.WARC) warcRecord = r.t.getRecord.asInstanceOf[WARCRecord] val ISO8601 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssX") val getCrawlDate: String = { if (r.t.getFormat == ArchiveFormat.ARC) { ExtractDate(arcRecord.getMetaData.getDate, DateComponent.YYYYMMDD) } else { ExtractDate(ArchiveUtils.get14DigitDate(ISO8601.parse(warcRecord.getHeader.getDate)), DateComponent.YYYYMMDD) } } val getCrawlMonth: String = { if (r.t.getFormat == ArchiveFormat.ARC) { ExtractDate(arcRecord.getMetaData.getDate, DateComponent.YYYYMM) } else { ExtractDate(ArchiveUtils.get14DigitDate(ISO8601.parse(warcRecord.getHeader.getDate)), DateComponent.YYYYMM) } } val getContentBytes: Array[Byte] = { if (r.t.getFormat == ArchiveFormat.ARC) { ArcRecordUtils.getBodyContent(arcRecord) } else { WarcRecordUtils.getContent(warcRecord) } } val getContentString: String = new String(getContentBytes) val getMimeType = { if (r.t.getFormat == ArchiveFormat.ARC) { arcRecord.getMetaData.getMimetype } else { WarcRecordUtils.getWarcResponseMimeType(getContentBytes) } } val getUrl = { if (r.t.getFormat == ArchiveFormat.ARC) { arcRecord.getMetaData.getUrl } else { warcRecord.getHeader.getUrl } } val getDomain: String = ExtractDomain(getUrl) -} + + val getImageBytes: Array[Byte] = { + if (getContentString.startsWith("HTTP/")) + getContentBytes.slice( + getContentString.indexOf(RemoveHttpHeader.headerEnd) + + RemoveHttpHeader.headerEnd.length, getContentBytes.length) + else + getContentBytes + } +} \ No newline at end of file diff --git a/warcbase-core/src/main/scala/org/warcbase/spark/archive/io/WarcRecord.scala b/warcbase-core/src/main/scala/org/warcbase/spark/archive/io/WarcRecord.scala index 6b18bbc..bfae6c5 100644 --- a/warcbase-core/src/main/scala/org/warcbase/spark/archive/io/WarcRecord.scala +++ b/warcbase-core/src/main/scala/org/warcbase/spark/archive/io/WarcRecord.scala @@ -1,28 +1,37 @@ package org.warcbase.spark.archive.io import java.text.SimpleDateFormat import org.apache.spark.SerializableWritable import org.archive.util.ArchiveUtils import org.warcbase.data.WarcRecordUtils import org.warcbase.io.WarcRecordWritable import org.warcbase.spark.matchbox.ExtractDate.DateComponent -import org.warcbase.spark.matchbox.{ExtractDate, ExtractDomain} +import org.warcbase.spark.matchbox.{RemoveHttpHeader, ExtractDate, ExtractDomain} class WarcRecord(r: SerializableWritable[WarcRecordWritable]) extends ArchiveRecord { val ISO8601 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssX") val getCrawlDate: String = ExtractDate(ArchiveUtils.get14DigitDate(ISO8601.parse(r.t.getRecord.getHeader.getDate)), DateComponent.YYYYMMDD) val getCrawlMonth: String = ExtractDate(ArchiveUtils.get14DigitDate(ISO8601.parse(r.t.getRecord.getHeader.getDate)), DateComponent.YYYYMM) val getContentBytes: Array[Byte] = WarcRecordUtils.getContent(r.t.getRecord) val getContentString: String = new String(getContentBytes) val getMimeType = WarcRecordUtils.getWarcResponseMimeType(getContentBytes) val getUrl = r.t.getRecord.getHeader.getUrl val getDomain = ExtractDomain(getUrl) + + val getImageBytes: Array[Byte] = { + if (getContentString.startsWith("HTTP/")) + getContentBytes.slice( + getContentString.indexOf(RemoveHttpHeader.headerEnd) + + RemoveHttpHeader.headerEnd.length, getContentBytes.length) + else + getContentBytes + } } diff --git a/warcbase-core/src/main/scala/org/warcbase/spark/matchbox/ComputeImageSize.scala b/warcbase-core/src/main/scala/org/warcbase/spark/matchbox/ComputeImageSize.scala new file mode 100644 index 0000000..91eb0c6 --- /dev/null +++ b/warcbase-core/src/main/scala/org/warcbase/spark/matchbox/ComputeImageSize.scala @@ -0,0 +1,25 @@ +package org.warcbase.spark.matchbox + +import java.io.ByteArrayInputStream +import javax.imageio.ImageIO + +/** + * Created by youngbinkim on 7/7/16. + */ +object ComputeImageSize { + def apply(bytes: Array[Byte]): (Int, Int) = { + val in = new ByteArrayInputStream(bytes) + + try { + val image = ImageIO.read(in) + if (image == null) + return (0, 0) + (image.getWidth(), image.getHeight()) + } catch { + case e: Throwable => { + e.printStackTrace() + return (0, 0) + } + } + } +} diff --git a/warcbase-core/src/main/scala/org/warcbase/spark/matchbox/ComputeMD5.scala b/warcbase-core/src/main/scala/org/warcbase/spark/matchbox/ComputeMD5.scala new file mode 100644 index 0000000..d2000fe --- /dev/null +++ b/warcbase-core/src/main/scala/org/warcbase/spark/matchbox/ComputeMD5.scala @@ -0,0 +1,19 @@ +package org.warcbase.spark.matchbox + +import java.security.MessageDigest + + +/** + * compute MD5 checksum.. + * + */ +object ComputeMD5 { + /** + * + * @param bytes + * @return + */ + def apply(bytes: Array[Byte]): String = { + new String(MessageDigest.getInstance("MD5").digest(bytes)) + } +} \ No newline at end of file diff --git a/warcbase-core/src/main/scala/org/warcbase/spark/matchbox/ExtractPopularImages.scala b/warcbase-core/src/main/scala/org/warcbase/spark/matchbox/ExtractPopularImages.scala new file mode 100644 index 0000000..7aaeaba --- /dev/null +++ b/warcbase-core/src/main/scala/org/warcbase/spark/matchbox/ExtractPopularImages.scala @@ -0,0 +1,26 @@ +package org.warcbase.spark.matchbox + +import org.warcbase.spark.rdd.RecordRDD._ +import org.apache.spark.rdd.RDD +import org.warcbase.spark.archive.io.ArchiveRecord + + +/** + * Extract most popular images + * + * limit: number of most popular images in the output + * timeoutVal: time allowed to connect to each image + */ +object ExtractPopularImages { + def apply(records: RDD[ArchiveRecord], limit: Int, minWidth: Int = 30, minHeight: Int = 30) = { + val res = records + .keepImages() + .map(r => ((r.getUrl, r.getImageBytes), 1)) + .map(img => (ComputeMD5(img._1._2), (ComputeImageSize(img._1._2), img._1._1, img._2))) + .filter(img => img._2._1._1 >= minWidth && img._2._1._2 >= minHeight) + .reduceByKey((image1, image2) => (image1._1, image1._2, image1._3 + image2._3)) + .takeOrdered(limit)(Ordering[Int].on(x => -x._2._3)) + res.foreach(x => println(x._2._2 + "\t" + x._2._3)) + res + } +} diff --git a/warcbase-core/src/main/scala/org/warcbase/spark/matchbox/RemoveHttpHeader.scala b/warcbase-core/src/main/scala/org/warcbase/spark/matchbox/RemoveHttpHeader.scala new file mode 100644 index 0000000..090ff77 --- /dev/null +++ b/warcbase-core/src/main/scala/org/warcbase/spark/matchbox/RemoveHttpHeader.scala @@ -0,0 +1,21 @@ +package org.warcbase.spark.matchbox + +/** + * Created by youngbinkim on 7/9/16. + */ +object RemoveHttpHeader { + val headerEnd = "\r\n\r\n" + def apply(content: String): String = { + try { + if (content.startsWith("HTTP/")) + content.substring(content.indexOf(headerEnd) + headerEnd.length) + else + content + } catch { + case e: Exception => { + println(e) + null + } + } + } +} \ No newline at end of file diff --git a/warcbase-core/src/main/scala/org/warcbase/spark/rdd/RecordRDD.scala b/warcbase-core/src/main/scala/org/warcbase/spark/rdd/RecordRDD.scala index dda3c16..2a52a80 100644 --- a/warcbase-core/src/main/scala/org/warcbase/spark/rdd/RecordRDD.scala +++ b/warcbase-core/src/main/scala/org/warcbase/spark/rdd/RecordRDD.scala @@ -1,133 +1,144 @@ /* * Warcbase: an open-source platform for managing web archives * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.warcbase.spark.rdd import org.apache.spark.rdd.RDD import org.warcbase.spark.archive.io.ArchiveRecord import org.warcbase.spark.matchbox.{DetectLanguage, ExtractDate, ExtractDomain, RemoveHTML} import org.warcbase.spark.matchbox.ExtractDate.DateComponent import org.warcbase.spark.matchbox.ExtractDate.DateComponent.DateComponent import scala.reflect.ClassTag import scala.util.matching.Regex /** * RDD wrappers for working with Records */ object RecordRDD extends java.io.Serializable { /** * A Wrapper class around RDD to simplify counting */ implicit class CountableRDD[T: ClassTag](rdd: RDD[T]) extends java.io.Serializable { def countItems(): RDD[(T, Int)] = { rdd.map(r => (r, 1)) .reduceByKey((c1, c2) => c1 + c2) .sortBy(f => f._2, ascending = false) } } /** * A Wrapper class around RDD to allow RDDs of type ARCRecord and WARCRecord to be queried via a fluent API. * * To load such an RDD, please see [[org.warcbase.spark.matchbox.RecordLoader]] */ implicit class WARecordRDD(rdd: RDD[ArchiveRecord]) extends java.io.Serializable { def keepValidPages(): RDD[ArchiveRecord] = { rdd.filter(r => r.getCrawlDate != null && (r.getMimeType == "text/html" || r.getUrl.endsWith("htm") || r.getUrl.endsWith("html")) && !r.getUrl.endsWith("robots.txt")) } + def keepImages() = { + rdd.filter(r => + r.getCrawlDate != null + && ( + (r.getMimeType != null && r.getMimeType.contains("image/")) + || r.getUrl.endsWith("jpg") + || r.getUrl.endsWith("jpeg") + || r.getUrl.endsWith("png")) + && !r.getUrl.endsWith("robots.txt")) + } + def keepMimeTypes(mimeTypes: Set[String]) = { rdd.filter(r => mimeTypes.contains(r.getMimeType)) } def keepDate(date: String, component: DateComponent = DateComponent.YYYYMMDD) = { rdd.filter(r => ExtractDate(r.getCrawlDate, component) == date) } def keepUrls(urls: Set[String]) = { rdd.filter(r => urls.contains(r.getUrl)) } def keepUrlPatterns(urlREs: Set[Regex]) = { rdd.filter(r => urlREs.map(re => r.getUrl match { case re() => true case _ => false }).exists(identity)) } def keepDomains(urls: Set[String]) = { rdd.filter(r => urls.contains(ExtractDomain(r.getUrl).replace("^\\s*www\\.", ""))) } def keepLanguages(lang: Set[String]) = { rdd.filter(r => lang.contains(DetectLanguage(RemoveHTML(r.getContentString)))) } def keepContent(contentREs: Set[Regex]) = { rdd.filter(r => contentREs.map(re => (re findFirstIn r.getContentString) match { case Some(v) => true case None => false }).exists(identity)) } def discardMimeTypes(mimeTypes: Set[String]) = { rdd.filter(r => !mimeTypes.contains(r.getMimeType)) } def discardDate(date: String) = { rdd.filter(r => r.getCrawlDate != date) } def discardUrls(urls: Set[String]) = { rdd.filter(r => !urls.contains(r.getUrl)) } def discardUrlPatterns(urlREs: Set[Regex]) = { rdd.filter(r => !urlREs.map(re => r.getUrl match { case re() => true case _ => false }).exists(identity)) } def discardDomains(urls: Set[String]) = { rdd.filter(r => !urls.contains(r.getDomain)) } def discardContent(contentREs: Set[Regex]) = { rdd.filter(r => !contentREs.map(re => (re findFirstIn r.getContentString) match { case Some(v) => true case None => false }).exists(identity)) } } } \ No newline at end of file