Page Menu
Home
c4science
Search
Configure Global Search
Log In
Files
F66079830
IngestFiles.java
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Subscribers
None
File Metadata
Details
File Info
Storage
Attached
Created
Sat, Jun 8, 03:33
Size
12 KB
Mime Type
text/x-java
Expires
Mon, Jun 10, 03:33 (2 d)
Engine
blob
Format
Raw Data
Handle
18169764
Attached To
R1473 warcbase
IngestFiles.java
View Options
/*
* Warcbase: an open-source platform for managing web archives
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.warcbase.ingest
;
import
java.io.ByteArrayOutputStream
;
import
java.io.DataOutputStream
;
import
java.io.File
;
import
java.io.IOException
;
import
java.io.InputStream
;
import
java.text.DateFormat
;
import
java.text.SimpleDateFormat
;
import
java.util.Date
;
import
java.util.Iterator
;
import
org.apache.commons.cli.CommandLine
;
import
org.apache.commons.cli.CommandLineParser
;
import
org.apache.commons.cli.GnuParser
;
import
org.apache.commons.cli.HelpFormatter
;
import
org.apache.commons.cli.OptionBuilder
;
import
org.apache.commons.cli.Options
;
import
org.apache.commons.cli.ParseException
;
import
org.apache.hadoop.hbase.io.compress.Compression
;
import
org.apache.log4j.Logger
;
import
org.archive.io.ArchiveRecord
;
import
org.archive.io.ArchiveRecordHeader
;
import
org.archive.io.arc.ARCReader
;
import
org.archive.io.arc.ARCReaderFactory
;
import
org.archive.io.arc.ARCRecord
;
import
org.archive.io.arc.ARCRecordMetaData
;
import
org.archive.io.warc.WARCConstants
;
import
org.archive.io.warc.WARCReader
;
import
org.archive.io.warc.WARCReaderFactory
;
import
org.archive.io.warc.WARCRecord
;
import
org.archive.util.ArchiveUtils
;
import
org.warcbase.data.HBaseTableManager
;
import
org.warcbase.data.UrlUtils
;
import
org.warcbase.data.WarcRecordUtils
;
public
class
IngestFiles
{
private
static
final
String
CREATE_OPTION
=
"create"
;
private
static
final
String
APPEND_OPTION
=
"append"
;
private
static
final
String
NAME_OPTION
=
"name"
;
private
static
final
String
DIR_OPTION
=
"dir"
;
private
static
final
String
START_OPTION
=
"start"
;
private
static
final
String
GZ_OPTION
=
"gz"
;
private
static
final
String
MAX_CONTENT_SIZE_OPTION
=
"maxSize"
;
private
static
final
Logger
LOG
=
Logger
.
getLogger
(
IngestFiles
.
class
);
private
static
final
DateFormat
iso8601
=
new
SimpleDateFormat
(
"yyyy-MM-dd'T'HH:mm:ssX"
);
public
static
int
MAX_CONTENT_SIZE
=
10
*
1024
*
1024
;
private
int
cnt
=
0
;
private
int
errors
=
0
;
private
int
toolarge
=
0
;
private
int
invalidUrls
=
0
;
private
final
HBaseTableManager
hbaseManager
;
public
IngestFiles
(
String
name
,
boolean
create
,
Compression
.
Algorithm
compression
)
throws
Exception
{
hbaseManager
=
new
HBaseTableManager
(
name
,
create
,
compression
);
}
protected
final
byte
[]
scratchbuffer
=
new
byte
[
4
*
1024
];
protected
long
copyStream
(
final
InputStream
is
,
final
long
recordLength
,
boolean
enforceLength
,
final
DataOutputStream
out
)
throws
IOException
{
int
read
=
scratchbuffer
.
length
;
long
tot
=
0
;
while
((
tot
<
recordLength
)
&&
(
read
=
is
.
read
(
scratchbuffer
))
!=
-
1
)
{
int
write
=
read
;
// never write more than enforced length
write
=
(
int
)
Math
.
min
(
write
,
recordLength
-
tot
);
tot
+=
read
;
out
.
write
(
scratchbuffer
,
0
,
write
);
}
if
(
enforceLength
&&
tot
!=
recordLength
)
{
LOG
.
error
(
"Read "
+
tot
+
" bytes but expected "
+
recordLength
+
" bytes. Continuing..."
);
}
return
tot
;
}
private
void
ingestArcFile
(
File
inputArcFile
)
{
ARCReader
reader
=
null
;
// Per file trapping of exceptions so a corrupt file doesn't blow up entire ingest.
try
{
reader
=
ARCReaderFactory
.
get
(
inputArcFile
);
// The following snippet of code was adapted from the dump method in ARCReader.
boolean
firstRecord
=
true
;
for
(
Iterator
<
ArchiveRecord
>
ii
=
reader
.
iterator
();
ii
.
hasNext
();)
{
ARCRecord
r
=
(
ARCRecord
)
ii
.
next
();
ARCRecordMetaData
meta
=
r
.
getMetaData
();
if
(
firstRecord
)
{
firstRecord
=
false
;
while
(
r
.
available
()
>
0
)
{
r
.
read
();
}
continue
;
}
if
(
meta
.
getUrl
().
startsWith
(
"dns:"
))
{
invalidUrls
++;
continue
;
}
String
metaline
=
meta
.
getUrl
()
+
" "
+
meta
.
getIp
()
+
" "
+
meta
.
getDate
()
+
" "
+
meta
.
getMimetype
()
+
" "
+
(
int
)
meta
.
getLength
();
String
date
=
meta
.
getDate
();
ByteArrayOutputStream
baos
=
new
ByteArrayOutputStream
();
DataOutputStream
dout
=
new
DataOutputStream
(
baos
);
dout
.
write
(
metaline
.
getBytes
());
dout
.
write
(
"\n"
.
getBytes
());
copyStream
(
r
,
(
int
)
meta
.
getLength
(),
true
,
dout
);
String
key
=
UrlUtils
.
urlToKey
(
meta
.
getUrl
());
String
type
=
meta
.
getMimetype
();
if
(
key
==
null
)
{
LOG
.
error
(
"Invalid URL: "
+
meta
.
getUrl
());
invalidUrls
++;
continue
;
}
if
(
type
==
null
)
{
type
=
"text/plain"
;
}
if
((
int
)
meta
.
getLength
()
>
MAX_CONTENT_SIZE
)
{
toolarge
++;
}
else
{
if
(
hbaseManager
.
insertRecord
(
key
,
date
,
baos
.
toByteArray
(),
type
))
{
cnt
++;
}
else
{
errors
++;
}
}
if
(
cnt
%
10000
==
0
&&
cnt
>
0
)
{
LOG
.
info
(
"Ingested "
+
cnt
+
" records into HBase."
);
}
}
}
catch
(
Exception
e
)
{
LOG
.
error
(
"Error ingesting file: "
+
inputArcFile
);
e
.
printStackTrace
();
}
catch
(
OutOfMemoryError
e
)
{
LOG
.
error
(
"Encountered OutOfMemoryError ingesting file: "
+
inputArcFile
);
LOG
.
error
(
"Attempting to continue..."
);
}
finally
{
if
(
reader
!=
null
)
try
{
reader
.
close
();
}
catch
(
IOException
e
)
{
e
.
printStackTrace
();
}
}
}
private
void
ingestWarcFile
(
File
inputWarcFile
)
{
WARCReader
reader
=
null
;
// Per file trapping of exceptions so a corrupt file doesn't blow up entire ingest.
try
{
reader
=
WARCReaderFactory
.
get
(
inputWarcFile
);
// The following snippet of code was adapted from the dump method in ARCReader.
boolean
firstRecord
=
true
;
for
(
Iterator
<
ArchiveRecord
>
ii
=
reader
.
iterator
();
ii
.
hasNext
();)
{
WARCRecord
r
=
(
WARCRecord
)
ii
.
next
();
ArchiveRecordHeader
h
=
r
.
getHeader
();
byte
[]
recordBytes
=
WarcRecordUtils
.
toBytes
(
r
);
byte
[]
content
=
WarcRecordUtils
.
getContent
(
WarcRecordUtils
.
fromBytes
(
recordBytes
));
if
(
firstRecord
)
{
firstRecord
=
false
;
while
(
r
.
available
()
>
0
)
{
r
.
read
();
}
continue
;
}
// Only store WARC 'response' records
// Would it be useful to store 'request' and 'metadata' records too?
if
(!
h
.
getHeaderValue
(
WARCConstants
.
HEADER_KEY_TYPE
).
equals
(
"response"
))
{
continue
;
}
if
(
h
.
getUrl
().
startsWith
(
"dns:"
))
{
invalidUrls
++;
continue
;
}
Date
d
=
iso8601
.
parse
(
h
.
getDate
());
String
date
=
ArchiveUtils
.
get14DigitDate
(
d
);
String
key
=
UrlUtils
.
urlToKey
(
h
.
getUrl
());
String
type
=
WarcRecordUtils
.
getWarcResponseMimeType
(
content
);
if
(
key
==
null
)
{
LOG
.
error
(
"Invalid URL: "
+
h
.
getUrl
());
invalidUrls
++;
continue
;
}
if
(
type
==
null
)
{
type
=
"text/plain"
;
}
if
((
int
)
h
.
getLength
()
>
MAX_CONTENT_SIZE
)
{
toolarge
++;
}
else
{
if
(
hbaseManager
.
insertRecord
(
key
,
date
,
recordBytes
,
type
))
{
cnt
++;
}
else
{
errors
++;
}
}
if
(
cnt
%
10000
==
0
&&
cnt
>
0
)
{
LOG
.
info
(
"Ingested "
+
cnt
+
" records into HBase."
);
}
}
}
catch
(
Exception
e
)
{
LOG
.
error
(
"Error ingesting file: "
+
inputWarcFile
);
e
.
printStackTrace
();
}
catch
(
OutOfMemoryError
e
)
{
LOG
.
error
(
"Encountered OutOfMemoryError ingesting file: "
+
inputWarcFile
);
LOG
.
error
(
"Attempting to continue..."
);
}
finally
{
if
(
reader
!=
null
)
try
{
reader
.
close
();
}
catch
(
IOException
e
)
{
e
.
printStackTrace
();
}
}
}
private
void
ingestFolder
(
File
inputFolder
,
int
i
)
throws
Exception
{
long
startTime
=
System
.
currentTimeMillis
();
for
(;
i
<
inputFolder
.
listFiles
().
length
;
i
++)
{
File
inputFile
=
inputFolder
.
listFiles
()[
i
];
if
(!(
inputFile
.
getName
().
endsWith
(
".warc.gz"
)
||
inputFile
.
getName
().
endsWith
(
".arc.gz"
)
||
inputFile
.
getName
().
endsWith
(
".warc"
)
||
inputFile
.
getName
().
endsWith
(
".arc"
)))
{
continue
;
}
LOG
.
info
(
"processing file "
+
i
+
": "
+
inputFile
.
getName
());
if
(
inputFile
.
getName
().
endsWith
(
".warc.gz"
)
||
inputFile
.
getName
().
endsWith
(
".warc"
)
)
{
ingestWarcFile
(
inputFile
);
}
else
if
(
inputFile
.
getName
().
endsWith
(
".arc.gz"
)
||
inputFile
.
getName
().
endsWith
(
".arc"
))
{
ingestArcFile
(
inputFile
);
}
}
long
totalTime
=
System
.
currentTimeMillis
()
-
startTime
;
LOG
.
info
(
"Total "
+
cnt
+
" records inserted, "
+
toolarge
+
" records too large, "
+
invalidUrls
+
" invalid URLs, "
+
errors
+
" insertion errors."
);
LOG
.
info
(
"Total time: "
+
totalTime
+
"ms"
);
LOG
.
info
(
"Ingest rate: "
+
cnt
/
(
totalTime
/
1000
)
+
" records per second."
);
}
@SuppressWarnings
(
"static-access"
)
public
static
void
main
(
String
[]
args
)
throws
Exception
{
Options
options
=
new
Options
();
options
.
addOption
(
OptionBuilder
.
withArgName
(
"name"
).
hasArg
()
.
withDescription
(
"name of the archive"
).
create
(
NAME_OPTION
));
options
.
addOption
(
OptionBuilder
.
withArgName
(
"path"
).
hasArg
()
.
withDescription
(
"WARC files location"
).
create
(
DIR_OPTION
));
options
.
addOption
(
OptionBuilder
.
withArgName
(
"n"
).
hasArg
()
.
withDescription
(
"start from the n-th WARC file"
).
create
(
START_OPTION
));
options
.
addOption
(
OptionBuilder
.
withArgName
(
"n"
).
hasArg
()
.
withDescription
(
"max record size to insert (default = 10 MB)"
).
create
(
MAX_CONTENT_SIZE_OPTION
));
options
.
addOption
(
"create"
,
false
,
"create new table"
);
options
.
addOption
(
"append"
,
false
,
"append to existing table"
);
options
.
addOption
(
GZ_OPTION
,
false
,
"use GZ compression (default = Snappy)"
);
CommandLine
cmdline
=
null
;
CommandLineParser
parser
=
new
GnuParser
();
try
{
cmdline
=
parser
.
parse
(
options
,
args
);
}
catch
(
ParseException
exp
)
{
System
.
err
.
println
(
"Error parsing command line: "
+
exp
.
getMessage
());
System
.
exit
(-
1
);
}
if
(!
cmdline
.
hasOption
(
DIR_OPTION
)
||
!
cmdline
.
hasOption
(
NAME_OPTION
))
{
HelpFormatter
formatter
=
new
HelpFormatter
();
formatter
.
printHelp
(
IngestFiles
.
class
.
getCanonicalName
(),
options
);
System
.
exit
(-
1
);
}
if
(!
cmdline
.
hasOption
(
CREATE_OPTION
)
&&
!
cmdline
.
hasOption
(
APPEND_OPTION
))
{
System
.
err
.
println
(
String
.
format
(
"Must specify either -%s or -%s"
,
CREATE_OPTION
,
APPEND_OPTION
));
HelpFormatter
formatter
=
new
HelpFormatter
();
formatter
.
printHelp
(
IngestFiles
.
class
.
getCanonicalName
(),
options
);
System
.
exit
(-
1
);
}
String
path
=
cmdline
.
getOptionValue
(
DIR_OPTION
);
File
inputFolder
=
new
File
(
path
);
Compression
.
Algorithm
compression
=
Compression
.
Algorithm
.
SNAPPY
;
if
(
cmdline
.
hasOption
(
GZ_OPTION
))
{
compression
=
Compression
.
Algorithm
.
GZ
;
}
int
i
=
0
;
if
(
cmdline
.
hasOption
(
START_OPTION
))
{
i
=
Integer
.
parseInt
(
cmdline
.
getOptionValue
(
START_OPTION
));
}
if
(
cmdline
.
hasOption
(
MAX_CONTENT_SIZE_OPTION
))
{
IngestFiles
.
MAX_CONTENT_SIZE
=
Integer
.
parseInt
(
cmdline
.
getOptionValue
(
MAX_CONTENT_SIZE_OPTION
));
}
String
name
=
cmdline
.
getOptionValue
(
NAME_OPTION
);
boolean
create
=
cmdline
.
hasOption
(
CREATE_OPTION
);
LOG
.
info
(
"Input: "
+
inputFolder
);
LOG
.
info
(
"Table: "
+
name
);
LOG
.
info
(
"Create new table: "
+
create
);
LOG
.
info
(
"Compression: "
+
compression
);
LOG
.
info
(
"Max content size: "
+
IngestFiles
.
MAX_CONTENT_SIZE
);
IngestFiles
load
=
new
IngestFiles
(
name
,
create
,
compression
);
load
.
ingestFolder
(
inputFolder
,
i
);
}
}
Event Timeline
Log In to Comment