Sunday, August 24, 2014

Custom MapReduce Writable

Hadoop MapReduce framework transmits data in key value pairs. Mapreduce always uses typed data.

From apache wiki page for Writable  :A serializable object which implements a simple, efficient, serialization protocol, based on DataInput and DataOutput.

In this article we will look at an example custom Writable type that can be used to store information from log files and pass around in the MapReduce as a value.

 We will take an example of processing log files containing information in the following format using MapReduce. We want to process information in each line as a value in MapReduce.


01-01-2014 09:46:24,099, WARN , user1, tid1, ADD, SUCCESS
01-01-2014 09:47:20,099, WARN , user2, tid2, ADD, FAILED
01-01-2014 09:47:21,099, WARN , user3, tid3, UPDATE, SUCCESS

logging information date, log level, username, transaction id, Action, Result.

A custom writable class for this is:



package my.loganalyzer.dto;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;

public class LogMessageWritable implements Writable {
   
    private Text timestamp, logLevel, user, tid, action;
    private String status;
   
    public LogMessageWritable() {
       
        this.timestamp = new Text();
        this.logLevel = new Text();
        this.user = new Text();
        this.tid = new Text();
        this.action = new Text();
        status = "";
    }

    public Text getTimestamp() {

        return timestamp;
    }

    public void setTimestamp(Text timestamp) {

        this.timestamp = timestamp;
    }

    public Text getLogLevel() {

        return logLevel;
    }

    public void setLogLevel(Text logLevel) {

        this.logLevel = logLevel;
    }

    public Text getUser() {

        return user;
    }

    public void setUser(Text user) {

        this.user = user;
    }

    public Text getTid() {

        return tid;
    }

    public void setTid(Text tid) {

        this.tid = tid;
    }

    public Text getAction() {

        return action;
    }

    public void setAction(Text action) {

        this.action = action;
    }

    public void readFields(DataInput in) throws IOException {

       
        timestamp.readFields(in);
        logLevel.readFields(in);
        user.readFields(in);
        tid.readFields(in);
        action.readFields(in);
        status = in.readUTF();
       
    }
   
    public void write(DataOutput out) throws IOException {
       
        timestamp.write(out);
        logLevel.write(out);
        user.write(out);
        tid.write(out);
        action.write(out);
        out.writeUTF(status);
    }  
   
}



Full code available on github at https://github.com/techusers/solrcloud-indexer-job/tree/master/src/main/java/my/loganalyzer/dto
Check out other useful blogs http://techusergit.blogspot.com/