Page Menu
Home
c4science
Search
Configure Global Search
Log In
Files
F91082405
IndexerReducer.java
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Subscribers
None
File Metadata
Details
File Info
Storage
Attached
Created
Thu, Nov 7, 17:10
Size
7 KB
Mime Type
text/x-java
Expires
Sat, Nov 9, 17:10 (1 d, 23 h)
Engine
blob
Format
Raw Data
Handle
22192938
Attached To
R1473 warcbase
IndexerReducer.java
View Options
/*
* Warcbase: an open-source platform for managing web archives
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.warcbase.index
;
import
java.io.IOException
;
import
java.util.ArrayList
;
import
java.util.Iterator
;
import
java.util.List
;
import
org.apache.commons.logging.Log
;
import
org.apache.commons.logging.LogFactory
;
import
org.apache.hadoop.fs.FileSystem
;
import
org.apache.hadoop.fs.Path
;
import
org.apache.hadoop.io.IntWritable
;
import
org.apache.hadoop.io.Text
;
import
org.apache.hadoop.mapred.JobConf
;
import
org.apache.hadoop.mapred.MapReduceBase
;
import
org.apache.hadoop.mapred.OutputCollector
;
import
org.apache.hadoop.mapred.Reducer
;
import
org.apache.hadoop.mapred.Reporter
;
import
org.apache.solr.client.solrj.SolrServer
;
import
org.apache.solr.client.solrj.embedded.EmbeddedSolrServer
;
import
org.apache.solr.client.solrj.response.UpdateResponse
;
import
org.apache.solr.common.SolrInputDocument
;
import
org.apache.solr.core.CoreContainer
;
import
org.apache.solr.core.HdfsDirectoryFactory
;
import
uk.bl.wa.apache.solr.hadoop.Solate
;
import
uk.bl.wa.hadoop.indexer.WritableSolrRecord
;
import
uk.bl.wa.solr.SolrRecord
;
public
class
IndexerReducer
extends
MapReduceBase
implements
Reducer
<
IntWritable
,
WritableSolrRecord
,
Text
,
Text
>
{
public
static
final
String
HDFS_OUTPUT_PATH
=
"IndexerReducer.HDFSOutputPath"
;
private
static
final
Log
LOG
=
LogFactory
.
getLog
(
IndexerReducer
.
class
);
private
static
final
int
SUBMISSION_PAUSE_MINS
=
5
;
private
static
final
String
SHARD_PREFIX
=
"shard"
;
private
SolrServer
solrServer
;
private
int
batchSize
=
1000
;
private
List
<
SolrInputDocument
>
docs
=
new
ArrayList
<
SolrInputDocument
>();
private
int
numberOfSequentialFails
=
0
;
private
FileSystem
fs
;
private
Path
solrHome
;
private
Path
outputDir
;
static
enum
MyCounters
{
NUM_RECORDS
,
NUM_ERRORS
,
NUM_DROPPED_RECORDS
}
@Override
public
void
configure
(
JobConf
job
)
{
LOG
.
info
(
"Configuring reducer..."
);
// Initialize the embedded server.
try
{
job
.
setBoolean
(
"fs.hdfs.impl.disable.cache"
,
true
);
fs
=
FileSystem
.
get
(
job
);
solrHome
=
Solate
.
findSolrConfig
(
job
,
IndexerRunner
.
solrHomeZipName
);
LOG
.
info
(
"Found solrHomeDir "
+
solrHome
);
}
catch
(
IOException
e
)
{
e
.
printStackTrace
();
LOG
.
error
(
"FAILED in reducer configuration: "
+
e
);
}
outputDir
=
new
Path
(
job
.
get
(
HDFS_OUTPUT_PATH
));
LOG
.
info
(
"HDFS index output path: "
+
outputDir
);
LOG
.
info
(
"Initialization complete."
);
}
private
void
initEmbeddedServer
(
int
slice
)
throws
IOException
{
if
(
solrHome
==
null
)
{
throw
new
IOException
(
"Unable to find solr home setting"
);
}
Path
outputShardDir
=
new
Path
(
fs
.
getHomeDirectory
()
+
"/"
+
outputDir
,
SHARD_PREFIX
+
slice
);
LOG
.
info
(
"Creating embedded Solr server with solrHomeDir: "
+
solrHome
+
", fs: "
+
fs
+
", outputShardDir: "
+
outputShardDir
);
Path
solrDataDir
=
new
Path
(
outputShardDir
,
"data"
);
if
(!
fs
.
exists
(
solrDataDir
)
&&
!
fs
.
mkdirs
(
solrDataDir
))
{
throw
new
IOException
(
"Unable to create "
+
solrDataDir
);
}
String
dataDirStr
=
solrDataDir
.
toUri
().
toString
();
LOG
.
info
(
"Attempting to set data dir to: "
+
dataDirStr
);
System
.
setProperty
(
"solr.data.dir"
,
dataDirStr
);
System
.
setProperty
(
"solr.home"
,
solrHome
.
toString
());
System
.
setProperty
(
"solr.solr.home"
,
solrHome
.
toString
());
System
.
setProperty
(
"solr.hdfs.home"
,
outputDir
.
toString
());
System
.
setProperty
(
"solr.directoryFactory"
,
HdfsDirectoryFactory
.
class
.
getName
());
System
.
setProperty
(
"solr.lock.type"
,
"hdfs"
);
System
.
setProperty
(
"solr.hdfs.nrtcachingdirectory"
,
"false"
);
System
.
setProperty
(
"solr.hdfs.blockcache.enabled"
,
"true"
);
System
.
setProperty
(
"solr.hdfs.blockcache.write.enabled"
,
"false"
);
System
.
setProperty
(
"solr.autoCommit.maxTime"
,
"600000"
);
System
.
setProperty
(
"solr.autoSoftCommit.maxTime"
,
"-1"
);
LOG
.
info
(
"Loading the container..."
);
CoreContainer
container
=
new
CoreContainer
();
container
.
load
();
for
(
String
s
:
container
.
getAllCoreNames
())
{
LOG
.
warn
(
"Got core name: "
+
s
);
}
String
coreName
=
""
;
if
(
container
.
getCoreNames
().
size
()
>
0
)
{
coreName
=
container
.
getCoreNames
().
iterator
().
next
();
}
LOG
.
error
(
"Now firing up the server..."
);
solrServer
=
new
EmbeddedSolrServer
(
container
,
coreName
);
LOG
.
error
(
"Server started successfully!"
);
}
@Override
public
void
reduce
(
IntWritable
key
,
Iterator
<
WritableSolrRecord
>
values
,
OutputCollector
<
Text
,
Text
>
output
,
Reporter
reporter
)
throws
IOException
{
SolrRecord
solr
;
// Get the shard number, but counting from 1 instead of 0:
int
shard
=
key
.
get
()
+
1
;
// For indexing into HDFS, set up a new server per key:
initEmbeddedServer
(
shard
);
// Go through the documents for this shard:
long
cnt
=
0
;
while
(
values
.
hasNext
())
{
solr
=
values
.
next
().
getSolrRecord
();
cnt
++;
docs
.
add
(
solr
.
getSolrDocument
());
// Have we exceeded the batchSize?
checkSubmission
(
docs
,
batchSize
,
reporter
);
// Occasionally update application-level status:
if
((
cnt
%
1000
)
==
0
)
{
reporter
.
setStatus
(
SHARD_PREFIX
+
shard
+
": processed "
+
cnt
+
", dropped "
+
reporter
.
getCounter
(
MyCounters
.
NUM_DROPPED_RECORDS
).
getValue
());
}
}
try
{
// If we have at least one document unsubmitted, make sure we submit it.
checkSubmission
(
docs
,
1
,
reporter
);
// If we are indexing to HDFS, shut the shard down:
// Commit, and block until the changes have been flushed.
solrServer
.
commit
(
true
,
false
);
solrServer
.
shutdown
();
}
catch
(
Exception
e
)
{
LOG
.
error
(
"ERROR on commit: "
+
e
);
e
.
printStackTrace
();
}
}
@Override
public
void
close
()
{}
private
void
checkSubmission
(
List
<
SolrInputDocument
>
docs
,
int
limit
,
Reporter
reporter
)
{
if
(
docs
.
size
()
>
0
&&
docs
.
size
()
>=
limit
)
{
try
{
// Inform that there is progress (still-alive):
reporter
.
progress
();
UpdateResponse
response
=
solrServer
.
add
(
docs
);
LOG
.
info
(
"Submitted "
+
docs
.
size
()
+
" docs ["
+
response
.
getStatus
()
+
"]"
);
reporter
.
incrCounter
(
MyCounters
.
NUM_RECORDS
,
docs
.
size
());
docs
.
clear
();
numberOfSequentialFails
=
0
;
}
catch
(
Exception
e
)
{
// Count up repeated fails:
numberOfSequentialFails
++;
// If there have been a lot of fails, drop the records (we have seen some
// "Invalid UTF-8 character 0xfffe at char" so this avoids bad data blocking job completion)
if
(
this
.
numberOfSequentialFails
>=
3
)
{
LOG
.
error
(
"Submission has repeatedly failed - assuming bad data and dropping these "
+
docs
.
size
()
+
" records."
);
reporter
.
incrCounter
(
MyCounters
.
NUM_DROPPED_RECORDS
,
docs
.
size
());
docs
.
clear
();
}
// SOLR-5719 possibly hitting us here; CloudSolrServer.RouteException
LOG
.
error
(
"Sleeping for "
+
SUBMISSION_PAUSE_MINS
+
" minute(s): "
+
e
.
getMessage
(),
e
);
// Also add a report for this condition.
reporter
.
incrCounter
(
MyCounters
.
NUM_ERRORS
,
1
);
try
{
Thread
.
sleep
(
1000
*
60
*
SUBMISSION_PAUSE_MINS
);
}
catch
(
InterruptedException
ex
)
{
LOG
.
warn
(
"Sleep between Solr submissions was interrupted!"
);
}
}
}
}
}
Event Timeline
Log In to Comment