Monday, September 1, 2014

MapReduce job for indexing documents to Solr Cloud


In this article we will look at indexing documents to solr cloud in near real time. The indexed documents are available for search immediately.
Familiarity with Solr is assumed.

We will take the example of indexing log files using a mapreduce job into solr.

The contents of the log file looks like:

08-01-2014 09:34:44,INFO,user1,tid0,DELETE,SUCCESS
08-01-2014 09:34:44,ERROR,user0,tid1,UPDATE,FAILURE
08-01-2014 09:34:45,INFO,user1,tid2,DELETE,SUCCESS

The comma separated values being tiemstamp, log level, username, action and status.
We will index documents to Solr Cloud so the log contents can be analyzed via solr search.

We will use a Mapper to read the log files. The Mapper will read each line of the log files and emit the line as a LogMessageWritable vlaue.

public class LogMessageWritable implements Writable {

private Text timestamp, logLevel, user, tid, action;
private String status;

public LogMessageWritable() {

this.timestamp = new Text();
this.logLevel = new Text();
this.user = new Text();
this.tid = new Text();
this.action = new Text();
status = "";
}
....... 

The Mapper class :


public static class LogLoadMapper extends Mapper<LongWritable, Text, Text,  LogMessageWritable> {

@Override
public void map(LongWritable key, Text line, Context context)
throws IOException, InterruptedException {

LogMessageWritable logmessage = parseLog(line);

if(logmessage != null) {
context.write(logmessage.getUser(), logmessage);
}
}




The Reducer Class :


public static class LogLoadReducer extends Reducer<Text, LogMessageWritable, NullWritable, Text> {

private CloudSolrServer solrServer = null;
private SimpleDateFormat sdf = new SimpleDateFormat("MM-dd-yyyy HH:mm:ss");
private int docBatchSize = 1000;

......
@Override
public void reduce(Text key, Iterable
<LogMessageWritable> values, Context context) throws IOException, InterruptedException {

List
doclist = new ArrayList();

for(LogMessageWritable value:values) {
SolrInputDocument doc = createSolrDocument(value);
doclist.add(doc);

if(doclist.size() == docBatchSize) {
addBatchToSolr(doclist);
doclist.clear();
}

if(doclist.size() > 0) {
addBatchToSolr(doclist);//last documents less than batch size if any
}
}

}
......


This code commits and updates live solr index so changes are visible immedialtely.
Please check with your requirements regarding commit requirements and preformance.
This might cause a bottleneck for bulk indexing.
Take a look at soft/hard commits and its effects for bulk indexing.

Refer to the code at github for full example : https://github.com/techusers/solrcloud-indexer-job

Other useful blogs  http://techusergit.blogspot.com/