Monday, September 1, 2014

MapReduce job for indexing documents to Solr Cloud


In this article we will look at indexing documents to solr cloud in near real time. The indexed documents are available for search immediately.
Familiarity with Solr is assumed.

We will take the example of indexing log files using a mapreduce job into solr.

The contents of the log file looks like:

08-01-2014 09:34:44,INFO,user1,tid0,DELETE,SUCCESS
08-01-2014 09:34:44,ERROR,user0,tid1,UPDATE,FAILURE
08-01-2014 09:34:45,INFO,user1,tid2,DELETE,SUCCESS

The comma separated values being tiemstamp, log level, username, action and status.
We will index documents to Solr Cloud so the log contents can be analyzed via solr search.

We will use a Mapper to read the log files. The Mapper will read each line of the log files and emit the line as a LogMessageWritable vlaue.

public class LogMessageWritable implements Writable {

private Text timestamp, logLevel, user, tid, action;
private String status;

public LogMessageWritable() {

this.timestamp = new Text();
this.logLevel = new Text();
this.user = new Text();
this.tid = new Text();
this.action = new Text();
status = "";
}
....... 

The Mapper class :


public static class LogLoadMapper extends Mapper<LongWritable, Text, Text,  LogMessageWritable> {

@Override
public void map(LongWritable key, Text line, Context context)
throws IOException, InterruptedException {

LogMessageWritable logmessage = parseLog(line);

if(logmessage != null) {
context.write(logmessage.getUser(), logmessage);
}
}




The Reducer Class :


public static class LogLoadReducer extends Reducer<Text, LogMessageWritable, NullWritable, Text> {

private CloudSolrServer solrServer = null;
private SimpleDateFormat sdf = new SimpleDateFormat("MM-dd-yyyy HH:mm:ss");
private int docBatchSize = 1000;

......
@Override
public void reduce(Text key, Iterable
<LogMessageWritable> values, Context context) throws IOException, InterruptedException {

List
doclist = new ArrayList();

for(LogMessageWritable value:values) {
SolrInputDocument doc = createSolrDocument(value);
doclist.add(doc);

if(doclist.size() == docBatchSize) {
addBatchToSolr(doclist);
doclist.clear();
}

if(doclist.size() > 0) {
addBatchToSolr(doclist);//last documents less than batch size if any
}
}

}
......


This code commits and updates live solr index so changes are visible immedialtely.
Please check with your requirements regarding commit requirements and preformance.
This might cause a bottleneck for bulk indexing.
Take a look at soft/hard commits and its effects for bulk indexing.

Refer to the code at github for full example : https://github.com/techusers/solrcloud-indexer-job

Other useful blogs  http://techusergit.blogspot.com/

Sunday, August 24, 2014

Custom MapReduce Writable

Hadoop MapReduce framework transmits data in key value pairs. Mapreduce always uses typed data.

From apache wiki page for Writable  :A serializable object which implements a simple, efficient, serialization protocol, based on DataInput and DataOutput.

In this article we will look at an example custom Writable type that can be used to store information from log files and pass around in the MapReduce as a value.

 We will take an example of processing log files containing information in the following format using MapReduce. We want to process information in each line as a value in MapReduce.


01-01-2014 09:46:24,099, WARN , user1, tid1, ADD, SUCCESS
01-01-2014 09:47:20,099, WARN , user2, tid2, ADD, FAILED
01-01-2014 09:47:21,099, WARN , user3, tid3, UPDATE, SUCCESS

logging information date, log level, username, transaction id, Action, Result.

A custom writable class for this is:



package my.loganalyzer.dto;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;

public class LogMessageWritable implements Writable {
   
    private Text timestamp, logLevel, user, tid, action;
    private String status;
   
    public LogMessageWritable() {
       
        this.timestamp = new Text();
        this.logLevel = new Text();
        this.user = new Text();
        this.tid = new Text();
        this.action = new Text();
        status = "";
    }

    public Text getTimestamp() {

        return timestamp;
    }

    public void setTimestamp(Text timestamp) {

        this.timestamp = timestamp;
    }

    public Text getLogLevel() {

        return logLevel;
    }

    public void setLogLevel(Text logLevel) {

        this.logLevel = logLevel;
    }

    public Text getUser() {

        return user;
    }

    public void setUser(Text user) {

        this.user = user;
    }

    public Text getTid() {

        return tid;
    }

    public void setTid(Text tid) {

        this.tid = tid;
    }

    public Text getAction() {

        return action;
    }

    public void setAction(Text action) {

        this.action = action;
    }

    public void readFields(DataInput in) throws IOException {

       
        timestamp.readFields(in);
        logLevel.readFields(in);
        user.readFields(in);
        tid.readFields(in);
        action.readFields(in);
        status = in.readUTF();
       
    }
   
    public void write(DataOutput out) throws IOException {
       
        timestamp.write(out);
        logLevel.write(out);
        user.write(out);
        tid.write(out);
        action.write(out);
        out.writeUTF(status);
    }  
   
}



Full code available on github at https://github.com/techusers/solrcloud-indexer-job/tree/master/src/main/java/my/loganalyzer/dto
Check out other useful blogs http://techusergit.blogspot.com/