Tuesday, 11 September 2012

Hadoop and java Questions for interviews


Q1. What are the default configuration files that are used in Hadoop 

As of 0.20 release, Hadoop supported the following read-only default configurations
- src/core/core-default.xml
- src/hdfs/hdfs-default.xml
- src/mapred/mapred-default.xml

Q2. How will you make changes to the default configuration files 
Hadoop does not recommends changing the default configuration files, instead it recommends making all site specific changes in the following files
- conf/core-site.xml
- conf/hdfs-site.xml
- conf/mapred-site.xml

Unless explicitly turned off, Hadoop by default specifies two resources, loaded in-order from the classpath:
- core-default.xml : Read-only defaults for hadoop.
- core-site.xml: Site-specific configuration for a given hadoop installation.

Hence if same configuration is defined in file core-default.xml and src/core/core-default.xml then the values in file core-default.xml (same is true for other 2 file pairs) is used.

Q3. Consider case scenario where you have set property mapred.output.compress to true to ensure that all output files are compressed for efficient space usage on the cluster.  If a cluster user does not want to compress data for a specific job then what will you recommend him to do ? 
Ask him to create his own configuration file and specify configuration mapred.output.compress to false and load this file as a resource in his job.

Q4. In the above case scenario, how can ensure that user cannot override the configuration mapred.output.compress to false in any of his jobs
This can be done by setting the property final to true in the core-site.xml file

Q5. What of the following is the only required variable that needs to be set in file conf/hadoop-env.sh for hadoop to work 

- HADOOP_LOG_DIR
- JAVA_HOME
- HADOOP_CLASSPATH
The only required variable to set is JAVA_HOME that needs to point to <java installation> directory


Q6. List all the daemons required to run the Hadoop cluster 
- NameNode
- DataNode
- JobTracker
- TaskTracker


Q7. Whats the default port that jobtrackers listens to
50030


Q8. Whats the default  port where the dfs namenode web ui will listen on
50070

 Q21. Explain difference of Class Variable and Instance Variable and how are they declared in Java 
Class Variable is a variable which is declared with static modifier.
Instance variable is a variable in a class without static modifier.
The main difference between the class variable and Instance variable is, that first time, when class is loaded in to memory, then only memory is allocated for all class variables. That means, class variables do not depend on the Objets of that classes. What ever number of objects are there, only one copy is created at the time of class loding.
Q22. Since an Abstract class in Java cannot be instantiated then how can you use its non static methods 
By extending it
Q23. How would you make a copy of an entire Java object with its state? 
Have this class implement Cloneable interface and call its method clone().
Q24. Explain Encapsulation,Inheritance and Polymorphism 
Encapsulation is a process of binding or wrapping the data and the codes that operates on the data into a single entity. This keeps the data safe from outside interface and misuse. One way to think about encapsulation is as a protective wrapper that prevents code and data from being arbitrarily accessed by other code defined outside the wrapper.
Inheritance is the process by which one object acquires the properties of another object.
The meaning of Polymorphism is something like one name many forms. Polymorphism enables one entity to be used as as general category for different types of actions. The specific action is determined by the exact nature of the situation. The concept of polymorphism can be explained as "one interface, multiple methods".
Q25. Explain garbage collection? 
Garbage collection is one of the most important feature of Java.
Garbage collection is also called automatic memory management as JVM automatically removes the unused variables/objects (value is null) from the memory. User program cann't directly free the object from memory, instead it is the job of the garbage collector to automatically free the objects that are no longer referenced by a program. Every class inherits finalize() method from java.lang.Object, the finalize() method is called by garbage collector when it determines no more references to the object exists. In Java, it is good idea to explicitly assign null into a variable when no more in us
Q26. What is similarities/difference between an Abstract class and Interface? 
Differences- Interfaces provide a form of multiple inheritance. A class can extend only one other class.
- Interfaces are limited to public methods and constants with no implementation. Abstract classes can have a partial implementation, protected parts, static methods, etc.
- A Class may implement several interfaces. But in case of abstract class, a class may extend only one abstract class.
- Interfaces are slow as it requires extra indirection to find corresponding method in in the actual class. Abstract classes are fast.
Similarities
- Neither Abstract classes or Interface can be instantiated
Q27. What are different ways to make your class multithreaded in Java 
There are two ways to create new kinds of threads:
- Define a new class that extends the Thread class
- Define a new class that implements the Runnable interface, and pass an object of that class to a Thread's constructor.
Q28. What do you understand by Synchronization? How do synchronize a method call in Java? How do you synchonize a block of code in java ?
Synchronization is a process of controlling the access of shared resources by the multiple threads in such a manner that only one thread can access one resource at a time. In non synchronized multithreaded application, it is possible for one thread to modify a shared object while another thread is in the process of using or updating the object's value. Synchronization prevents such type of data corruption.
- Synchronizing a method: Put keyword synchronized as part of the method declaration
- Synchronizing a block of code inside a method: Put block of code in synchronized (this) { Some Code }
Q29. What is transient variable? 
Transient variable can't be serialize. For example if a variable is declared as transient in a Serializable class and the class is written to an ObjectStream, the value of the variable can't be written to the stream instead when the class is retrieved from the ObjectStreamthe value of the variable becomes null.
Q30. What is Properties class in Java. Which class does it extends? 
The Properties class represents a persistent set of properties. The Properties can be saved to a stream or loaded from a stream. Each key and its corresponding value in the property list is a string
Q31. Explain the concept of shallow copy vs deep copy in Java 
In case of shallow copy, the cloned object also refers to the same object to which the original object refers as only the object references gets copied and not the referred objects themselves.
In case deep copy, a clone of the class and all all objects referred by that class is made.
Q32. How can you make a shallow copy of an object in Java 
Use clone() method inherited by Object class
Q33. How would you make a copy of an entire Java object (deep copy) with its state? 
Have this class implement Cloneable interface and call its method clone().
Q11. Which of the following object oriented principal is met with method overloading in java
- Inheritance
- Polymorphism
- Inheritance 

Polymorphism
Q12. Which of the following object oriented principal is met with method overriding in java
- Inheritance
- Polymorphism
- Inheritance 

Polymorphism
Q13. What is the name of collection interface used to maintain unique elements 
Map
Q14. What access level do you need to specify in the class declaration to ensure that only classes from the same directory can access it? What keyword is used to define this specifier? It has to have default specifier.
You do not need to specify any access level, and Java will use a default package access level
Q15. What's the difference between a queue and a stack? 
Stacks works by last-in-first-out rule (LIFO), while queues use the FIFO rule
Q16. How can you write user defined exceptions in Java 
Make your class extend Exception Class
Q17. What is the difference between checked and Unchecked Exceptions in Java ? Give an example of each type 
All predefined exceptions in Java are either a checked exception or an unchecked exception. Checked exceptions must be caught using try .. catch() block or we should throw the exception using throws clause. If you dont, compilation of program will fail.
- Example checked Exception: ParseTextException
- Example unchecked exception: ArrayIndexOutOfBounds
Q18. We know that FileNotFoundExceptionis inherited from IOExceptionthen does it matter in what order catch statements for FileNotFoundExceptionand IOExceptipon are written? 
Yes, it does. The FileNoFoundExceptionis inherited from the IOException. Exception's subclasses have to be caught first.
Q19. How do we find if two string are same or not in Java. If answer is equals() then why do we have to use equals, why cant we compare string like integers 
We use method equals() to compare the values of the Strings. We can't use == like we do for primitive types like int because == checks if two variables point at the same instance of a String object.
Q20. What is "package" keyword 
This is a way to organize files when a project consists of multiple modules. It also helps resolve naming conflicts when different packages have classes with the same names. Packages access level also allows you to protect data from being used by the non-authorized classes .

Since Hadoop and all its eco-system is built in java hence when hiring for a hadoop developer it makes sense to test the core java skills of the interviewee as well. Following are some questions that I have compiled that test the basic java understanding of the candidate. I would expect any decent candidate to answer 90% of these questions
Q1. What is mutable object and immutable object
If a object value is changeable then we can call it as Mutable object. (Ex., StringBuffer) If you are not allowed to change the value of an object, it is immutable object. (Ex., String, Integer, Float)
Q2. What are wrapped classes in Java. Why do they exist. Give examples 
Wrapped classes are classes that allow primitive types to be accessed as objects, e.g. Integer, Float etc
Q3. Even though garbage collection cleans memory, why can't it guarantee that a program will run out of memory? Give an example of a case when garbage collection will run out ot memory 
Because it is possible for programs to use up memory resources faster than they are garbage collected. It is also possible for programs to create objects that are not subject to garbage collection. Once example can be if yuo try to load a very big file into an array.
Q4. What is the difference between Process and Thread? 
A process can contain multiple threads. In most multithreading operating systems, a process gets its own memory address space; a thread doesn't. Threads typically share the heap belonging to their parent process. For instance, a JVM runs in a single process in the host O/S. Threads in the JVM share the heap belonging to that process; that's why several threads may access the same object. Typically, even though they share a common heap, threads have their own stack space. This is how one thread's invocation of a method is kept separate from another's
Q5. How can you write a indefinate loop in java 
while(true) {
}
OR
for ( ; ; ){
}
Q6. How can you create singleton class in Java 
Make the constructor of the class private and provide a static method to get instance of the class
Q7. What do keywords "this" and "super" do in Java 
"this" is used to refer to current object. "super" is used to refer to the class extended by the current class
Q8. What are access specifiers in java. List all of them. Access specifiers are used to define score of variables in Java. There are four levels of access specifiers in java- public
- private
- protected
- default
Q9. Which of the following 3 object oriented principals does access specifiers implement in java
- Encapsulation
- Polymorphism
- Intheritance 

Encapsulation
Q10. What is method overriding and method overloading 
With overriding, you change the method behavior for a subclass class. Overloading involves having a method with the same name within the class with different signature

Q1. What is HDFS  HDFS, the Hadoop Distributed File System, is a distributed file system designed to hold very large amounts of data (terabytes or even petabytes), and provide high-throughput access to this information. Files are stored in a redundant fashion across multiple machines to ensure their durability to failure and high availability to very parallel applications
Q2. What does the statement "HDFS is block structured file system" means  It means that in HDFS individual files are broken into blocks of a fixed size. These blocks are stored across a cluster of one or more machines with data storage capacity
Q3. What does the term "Replication factor" mean  Replication factor is the number of times a file needs to be replicated in HDFS
Q4. What is the default replication factor in HDFS  3
Q5. What is the typical block size of an HDFS block  64Mb to 128Mb
Q6. What is the benefit of having such big block size (when compared to block size of linux file system like ext)  It allows HDFS to decrease the amount of metadata storage required per file (the list of blocks per file will be smaller as the size of individual blocks increases). Furthermore, it allows for fast streaming reads of data, by keeping large amounts of data sequentially laid out on the disk
Q7. Why is it recommended to have few very large files instead of a lot of small files in HDFS  This is because the Name node contains the meta data of each and every file in HDFS and more files means more metadata and since namenode loads all the metadata in memory for speed hence having a lot of files may make the metadata information big enough to exceed the size of the memory on the Name node
Q8. True/false question. What is the lowest granularity at which you can apply replication factor in HDSF
- You can choose replication factor per directory
- You can choose replication factor per file in a directory
- You can choose replication factor per block of a file

- True
- True
- False
Q9. What is a datanode in HDFS  ndividual machines in the HDFS cluster that hold blocks of data are called datanodes
Q10. What is a Namenode in HDSF  The Namenode stores all the metadata for the file system
Q11. What alternate way does HDFS provides to recover data in case a Namenode, without backup, fails and cannot be recovered  There is no way. If Namenode dies and there is no backup then there is no way to recover data
Q12. Describe how a HDFS client will read a file in HDFS, like will it talk to data node or namenode ... how will data flow etc  To open a file, a client contacts the Name Node and retrieves a list of locations for the blocks that comprise the file. These locations identify the Data Nodes which hold each block. Clients then read file data directly from the Data Node servers, possibly in parallel. The Name Node is not directly involved in this bulk data transfer, keeping its overhead to a minimum.
Q13. Using linux command line. how will you
- List the the number of files in a HDFS directory
- Create a directory in HDFS
- Copy file from your local directory to HDSF

hadoop fs -ls 
hadoop fs -mkdir 
hadoop fs -put localfile hdfsfile 

Q31. How will you write a custom partitioner for a Hadoop job  
To have hadoop use a custom partitioner you will have to do minimum the following three
- Create a new class that extends Partitioner class
- Override method getPartition
- In the wrapper that runs the Map Reducer, either
  - add the custom partitioner to the job programtically using method setPartitionerClass or
  - add the custom partitioner to the job as a config file (if your wrapper reads from config file or oozie)

Q32. How did you debug your Hadoop code  
There can be several ways of doing this but most common ways are
- By using counters
- The web interface provided by Hadoop framework

Q33. Did you ever built a production process in Hadoop ? If yes then what was the process when your hadoop job fails due to any reason
Its an open ended question but most candidates, if they have written a production job, should talk about some type of alert mechanisn like email is sent or there monitoring system sends an alert. Since Hadoop works on unstructured data, its very important to have a good alerting system for errors since unexpected data can very easily break the job.

Q34. Did you ever ran into a lop sided job that resulted in out of memory error, if yes then how did you handled it
This is an open ended question but a candidate who claims to be an intermediate developer and has worked on large data set (10-20GB min) should have run into this problem. There can be many ways to handle this problem but most common way is to alter your algorithm and break down the job into more map reduce phase or use a combiner if possible.

Hadoop Interview Questions Part 3

Q22. Whats is Distributed Cache in Hadoop
Distributed Cache is a facility provided by the Map/Reduce framework to cache files (text, archives, jars and so on) needed by applications during execution of the job. The framework will copy the necessary files to the slave node before any tasks for the job are executed on that node.

Q23. What is the benifit of Distributed cache, why can we just have the file in HDFS and have the application read it  
This is because distributed cache is much faster. It copies the file to all trackers at the start of the job. Now if the task tracker runs 10 or 100 mappers or reducer, it will use the same copy of distributed cache. On the other hand, if you put code in file to read it from HDFS in the MR job then every mapper will try to access it from HDFS hence if a task tracker run 100 map jobs then it will try to read this file 100 times from HDFS. Also HDFS is not very efficient when used like this.

Q.24 What mechanism does Hadoop framework provides to synchronize changes made in Distribution Cache during runtime of the application  
This is a trick questions. There is no such mechanism. Distributed Cache by design is read only during the time of Job execution

Q25. Have you ever used Counters in Hadoop. Give us an example scenario
Anybody who claims to have worked on a Hadoop project is expected to use counters

Q26. Is it possible to provide multiple input to Hadoop? If yes then how can you give multiple directories as input to the Hadoop job  
Yes, The input format class provides methods to add multiple directories as input to a Hadoop job

Q27. Is it possible to have Hadoop job output in multiple directories. If yes then how  
Yes, by using Multiple Outputs class

Q28. What will a hadoop job do if you try to run it with an output directory that is already present? Will it
- overwrite it
- warn you and continue
- throw an exception and exit
The hadoop job will throw an exception and exit.

Q29. How can you set an arbitary number of mappers to be created for a job in Hadoop  
This is a trick question. You cannot set it

Q30. How can you set an arbitary number of reducers to be created for a job in Hadoop  
You can either do it progamatically by using method setNumReduceTasksin the JobConfclass or set it up as a configuration setting.
Q21. Explain difference of Class Variable and Instance Variable and how are they declared in Java 
Class Variable is a variable which is declared with static modifier.
Instance variable is a variable in a class without static modifier.
The main difference between the class variable and Instance variable is, that first time, when class is loaded in to memory, then only memory is allocated for all class variables. That means, class variables do not depend on the Objets of that classes. What ever number of objects are there, only one copy is created at the time of class loding.

Q22. Since an Abstract class in Java cannot be instantiated then how can you use its non static methods 
By extending it

Q23. How would you make a copy of an entire Java object with its state? 
Have this class implement Cloneable interface and call its method clone().

Q24. Explain Encapsulation,Inheritance and Polymorphism 
Encapsulation is a process of binding or wrapping the data and the codes that operates on the data into a single entity. This keeps the data safe from outside interface and misuse. One way to think about encapsulation is as a protective wrapper that prevents code and data from being arbitrarily accessed by other code defined outside the wrapper.
Inheritance is the process by which one object acquires the properties of another object.
The meaning of Polymorphism is something like one name many forms. Polymorphism enables one entity to be used as as general category for different types of actions. The specific action is determined by the exact nature of the situation. The concept of polymorphism can be explained as "one interface, multiple methods".

Q25. Explain garbage collection? 
Garbage collection is one of the most important feature of Java.
Garbage collection is also called automatic memory management as JVM automatically removes the unused variables/objects (value is null) from the memory. User program cann't directly free the object from memory, instead it is the job of the garbage collector to automatically free the objects that are no longer referenced by a program. Every class inherits finalize() method from java.lang.Object, the finalize() method is called by garbage collector when it determines no more references to the object exists. In Java, it is good idea to explicitly assign null into a variable when no more in us

Q26. What is similarities/difference between an Abstract class and Interface? 
Differences- Interfaces provide a form of multiple inheritance. A class can extend only one other class.
- Interfaces are limited to public methods and constants with no implementation. Abstract classes can have a partial implementation, protected parts, static methods, etc.
- A Class may implement several interfaces. But in case of abstract class, a class may extend only one abstract class.
- Interfaces are slow as it requires extra indirection to find corresponding method in in the actual class. Abstract classes are fast.
Similarities
- Neither Abstract classes or Interface can be instantiated

Q27. What are different ways to make your class multithreaded in Java 
There are two ways to create new kinds of threads:
- Define a new class that extends the Thread class
- Define a new class that implements the Runnable interface, and pass an object of that class to a Thread's constructor.

Q28. What do you understand by Synchronization? How do synchronize a method call in Java? How do you synchonize a block of code in java ?
Synchronization is a process of controlling the access of shared resources by the multiple threads in such a manner that only one thread can access one resource at a time. In non synchronized multithreaded application, it is possible for one thread to modify a shared object while another thread is in the process of using or updating the object's value. Synchronization prevents such type of data corruption.
- Synchronizing a method: Put keyword synchronized as part of the method declaration
- Synchronizing a block of code inside a method: Put block of code in synchronized (this) { Some Code }

Q29. What is transient variable? 
Transient variable can't be serialize. For example if a variable is declared as transient in a Serializable class and the class is written to an ObjectStream, the value of the variable can't be written to the stream instead when the class is retrieved from the ObjectStreamthe value of the variable becomes null.

Q30. What is Properties class in Java. Which class does it extends? 
The Properties class represents a persistent set of properties. The Properties can be saved to a stream or loaded from a stream. Each key and its corresponding value in the property list is a string

Q31. Explain the concept of shallow copy vs deep copy in Java 
In case of shallow copy, the cloned object also refers to the same object to which the original object refers as only the object references gets copied and not the referred objects themselves.
In case deep copy, a clone of the class and all all objects referred by that class is made.

Q32. How can you make a shallow copy of an object in Java 
Use clone() method inherited by Object class

Q33. How would you make a copy of an entire Java object (deep copy) with its state? 
Have this class implement Cloneable interface and call its method clone().


Hadoop Questions


  1. What is a JobTracker in Hadoop? How many instances of JobTracker run on a Hadoop Cluster?

JobTracker is the daemon service for submitting and tracking MapReduce jobs in Hadoop. There is only One Job Tracker process run on any hadoop cluster. Job Tracker runs on its own JVM process. In a typical production cluster its run on a separate machine. Each slave node is configured with job tracker node location. The JobTracker is single point of failure for the Hadoop MapReduce service. If it goes down, all running jobs are halted. JobTracker in Hadoop performs following actions(from Hadoop Wiki:)
  • Client applications submit jobs to the Job tracker.
  • The JobTracker talks to the NameNode to determine the location of the data
  • The JobTracker locates TaskTracker nodes with available slots at or near the data
  • The JobTracker submits the work to the chosen TaskTracker nodes.
  • The TaskTracker nodes are monitored. If they do not submit heartbeat signals often enough, they are deemed to have failed and the work is scheduled on a different TaskTracker.
  • A TaskTracker will notify the JobTracker when a task fails. The JobTracker decides what to do then: it may resubmit the job elsewhere, it may mark that specific record as something to avoid, and it may may even blacklist the TaskTracker as unreliable.
  • When the work is completed, the JobTracker updates its status.

Client applications can poll the JobTracker for information. 

  1. How JobTracker schedules a task?
The TaskTrackers send out heartbeat messages to the JobTracker, usually every few minutes, to reassure the JobTracker that it is still alive. These message also inform the JobTracker of the number of available slots, so the JobTracker can stay up to date with where in the cluster work can be delegated. When the JobTracker tries to find somewhere to schedule a task within the MapReduce operations, it first looks for an empty slot on the same server that hosts the DataNode containing the data, and if not, it looks for an empty slot on a machine in the same rack.
A TaskTracker is a slave node daemon in the cluster that accepts tasks (Map, Reduce and Shuffle operations) from a JobTracker. There is only One Task Tracker process run on any hadoop slave node. Task Tracker runs on its own JVM process. Every TaskTracker is configured with a set of slots, these indicate the number of tasks that it can accept. The TaskTracker starts a separate JVM processes to do the actual work (called as Task Instance) this is to ensure that process failure does not take down the task tracker. The TaskTracker monitors these task instances, capturing the output and exit codes. When the Task instances finish, successfully or not, the task tracker notifies the JobTracker. The TaskTrackers also send out heartbeat messages to the JobTracker, usually every few minutes, to reassure the JobTracker that it is still alive. These message also inform the JobTracker of the number of available slots, so the JobTracker can stay up to date with where in the cluster work can be delegated.
Task instances are the actual MapReduce jobs which are run on each slave node. The TaskTracker starts a separate JVM processes to do the actual work (called as Task Instance) this is to ensure that process failure does not take down the task tracker. Each Task Instance runs on its own JVM process. There can be multiple processes of task instance running on a slave node. This is based on the number of slots configured on task tracker. By default a new task instance JVM process is spawned for a task.
Hadoop is comprised of five separate daemons. Each of these daemon run in its own JVM. Following 3 Daemons run on Master nodes NameNode - This daemon stores and maintains the metadata for HDFS. Secondary NameNode - Performs housekeeping functions for the NameNode. JobTracker - Manages MapReduce jobs, distributes individual tasks to machines running the Task Tracker. Following 2 Daemons run on each Slave nodes DataNode – Stores actual HDFS data blocks. TaskTracker - Responsible for instantiating and monitoring individual Map and Reduce tasks.
  • Single instance of a Task Tracker is run on each Slave node. Task tracker is run as a separate JVM process.
  • Single instance of a DataNode daemon is run on each Slave node. DataNode daemon is run as a separate JVM process.
  • One or Multiple instances of Task Instance is run on each slave node. Each task instance is run as a separate JVM process. The number of Task instances can be controlled by configuration. Typically a high end machine is configured to run more task instances.
The Hadoop Distributed File System (HDFS) is a distributed file system designed to run on commodity hardware. It has many similarities with existing distributed file systems. However, the differences from other distributed file systems are significant. Following are differences between HDFS and NAS
  • In HDFS Data Blocks are distributed across local drives of all machines in a cluster. Whereas in NAS data is stored on dedicated hardware.
  • HDFS is designed to work with MapReduce System, since computation are moved to data. NAS is not suitable for MapReduce since data is stored seperately from the computations.
  • HDFS runs on a cluster of machines and provides redundancy usinga replication protocal. Whereas NAS is provided by a single machine therefore does not provide data redundancy.
NameNode periodically receives a Heartbeat and a Blockreport from each of the DataNodes in the cluster. Receipt of a Heartbeat implies that the DataNode is functioning properly. A Blockreport contains a list of all blocks on a DataNode. When NameNode notices that it has not recieved a hearbeat message from a data node after a certain amount of time, the data node is marked as dead. Since blocks will be under replicated the system begins replicating the blocks that were stored on the dead datanode. The NameNode Orchestrates the replication of data blocks from one datanode to another. The replication data transfer happens directly between datanodes and the data never passes through the namenode.
Nope, MapReduce programming model does not allow reducers to communicate with each other. Reducers run in isolation.
Yes, Setting the number of reducers to zero is a valid configuration in Hadoop. When you set the reducers to zero no reducers will be executed, and the output of each mapper will be stored to a separate file on HDFS. [This is different from the condition when reducers are set to a number greater than zero and the Mappers output (intermediate data) is written to the Local file system(NOT HDFS) of each mappter slave node.]
The mapper output (intermediate data) is stored on the Local file system (NOT HDFS) of each individual mapper nodes. This is typically a temporary directory location which can be setup in config by the hadoop administrator. The intermediate data is cleaned up after the Hadoop Job completes.
Combiners are used to increase the efficiency of a MapReduce program. They are used to aggregate intermediate map output locally on individual mapper outputs. Combiners can help you reduce the amount of data that needs to be transferred across to the reducers. You can use your reducer code as a combiner if the operation performed is commutative and associative. The execution of combiner is not guaranteed, Hadoop may or may not execute a combiner. Also, if required it may execute it more then 1 times. Therefore your MapReduce jobs should not depend on the combiners execution.
  • org.apache.hadoop.io.Writable is a Java interface. Any key or value type in the Hadoop Map-Reduce framework implements this interface. Implementations typically implement a static read(DataInput) method which constructs a new instance, calls readFields(DataInput) and returns the instance.
  • org.apache.hadoop.io.WritableComparable is a Java interface. Any type which is to be used as a key in the Hadoop Map-Reduce framework should implement this interface. WritableComparable objects can be compared to each other using Comparators.
  • The Key must implement the org.apache.hadoop.io.WritableComparable interface.
  • The value must implement the org.apache.hadoop.io.Writable interface.
  • org.apache.hadoop.mapred.lib.IdentityMapper Implements the identity function, mapping inputs directly to outputs. If MapReduce programmer do not set the Mapper Class using JobConf.setMapperClass then IdentityMapper.class is used as a default value.
  • org.apache.hadoop.mapred.lib.IdentityReducer Performs no reduction, writing all input values directly to the output. If MapReduce programmer do not set the Reducer Class using JobConf.setReducerClass then IdentityReducer.class is used as a default value.


Speculative execution is a way of coping with individual Machine performance. In large clusters where hundreds or thousands of machines are involved there may be machines which are not performing as fast as others. This may result in delays in a full job due to only one machine not performaing well. To avoid this, speculative execution in hadoop can run multiple copies of same map or reduce task on different slave nodes. The results from first node to finish are used.
In a MapReduce job reducers do not start executing the reduce method until the all Map jobs have completed. Reducers start copying intermediate key-value pairs from the mappers as soon as they are available. The programmer defined reduce method is called only after all the mappers have finished.
Reducers start copying intermediate key-value pairs from the mappers as soon as they are available. The progress calculation also takes in account the processing of data transfer which is done by reduce process, therefore the reduce progress starts showing up as soon as any intermediate key-value pair for a mapper is available to be transferred to reducer. Though the reducer progress is updated still the programmer defined reduce method is called only after all the mappers have finished.
HDFS, the Hadoop Distributed File System, is responsible for storing huge data on the cluster. This is a distributed file system designed to run on commodity hardware. It has many similarities with existing distributed file systems. However, the differences from other distributed file systems are significant.
  • HDFS is highly fault-tolerant and is designed to be deployed on low-cost hardware.
  • HDFS provides high throughput access to application data and is suitable for applications that have large data sets.
  • HDFS is designed to support very large files. Applications that are compatible with HDFS are those that deal with large data sets. These applications write their data only once but they read it one or more times and require these reads to be satisfied at streaming speeds. HDFS supports write-once-read-many semantics on files.
In HDFS data is split into blocks and distributed across multiple nodes in the cluster. Each block is typically 64Mb or 128Mb in size. Each block is replicated multiple times. Default is to replicate each block three times. Replicas are stored on different nodes. HDFS utilizes the local file system to store each HDFS block as a separate file. HDFS Block size can not be compared with the traditional file system block size.
The NameNode is the centerpiece of an HDFS file system. It keeps the directory tree of all files in the file system, and tracks where across the cluster the file data is kept. It does not store the data of these files itself. There is only One NameNode process run on any hadoop cluster. NameNode runs on its own JVM process. In a typical production cluster its run on a separate machine. The NameNode is a Single Point of Failure for the HDFS Cluster. When the NameNode goes down, the file system goes offline. Client applications talk to the NameNode whenever they wish to locate a file, or when they want to add/copy/move/delete a file. The NameNode responds the successful requests by returning a list of relevant DataNode servers where the data lives.
A DataNode stores data in the Hadoop File System HDFS. There is only One DataNode process run on any hadoop slave node. DataNode runs on its own JVM process. On startup, a DataNode connects to the NameNode. DataNode instances can talk to each other, this is mostly during replicating data.
The Client communication to HDFS happens using Hadoop HDFS API. Client applications talk to the NameNode whenever they wish to locate a file, or when they want to add/copy/move/delete a file on HDFS. The NameNode responds the successful requests by returning a list of relevant DataNode servers where the data lives. Client applications can talk directly to a DataNode, once the NameNode has provided the location of the data.
HDFS is designed to reliably store very large files across machines in a large cluster. It stores each file as a sequence of blocks; all blocks in a file except the last block are the same size. The blocks of a file are replicated for fault tolerance. The block size and replication factor are configurable per file. An application can specify the number of replicas of a file. The replication factor can be specified at file creation time and can be changed later. Files in HDFS are write-once and have strictly one writer at any time. The NameNode makes all decisions regarding replication of blocks. HDFS uses rack-aware replica placement policy. In default configuration there are total 3 copies of a datablock on HDFS, 2 copies are stored on datanodes on same rack and 3rd copy on a different rack.






Understanding of Big Data


    Understanding of Big Data
  • What is Big Data?
  1. The term Big Data applies to information that can’t be processed or analyzed using traditional processes or tools. Increasingly, organizations today are facing more and more Big Data challenges. They have access to a wealth of information, but they don’t know how to get value out of it because it is sitting in its most raw form or in a semi structured or unstructured format; and as a result, they don’t even know whether it’s worth keeping (or even able to keep it for that matter).
  2. Big data is a collection of digital information whose size is beyond
the ability of most software tools and people to capture, manage, and
process the data.

  1. Big Data solutions are ideal for analyzing not only raw structured data but
    semi-structured data and also unstructured data from a wide variety of source.
  2. Big Data solutions are ideal when all or most of the data needs to be analyzed versus a sample of the data or a sampling of data is not nearly as effective as a larger set of data from which to derive analysis.
  3. Big data solutions are ideal for iterative and exploratory analysis when business measures on data are not predetermined.
  4. Big data technologies describe a new generation of technologies and architectures, designed to economically extract value from very large volumes of a wide variety of data, by enabling high velocity capture, discovery, and/or analysis.
  5. Big data” is a big buzz phrase in the IT and business world right now – and there are a dizzying array of opinions on just what these two simple words really mean. Technology vendors in the legacy database or data warehouse spaces say “big data” simply refers to a traditional data warehousing scenario involving data volumes in either the single or multi-terabyte range. Others disagree: They say “big data” isn’t limited to traditional data warehouse situations, but includes real-time or operational data stores used as the primary data foundation for online applications that power key external or internal business systems. It used to be that these transactional/real-time databases were typically “pruned” so they could be manageable from a data volume standpoint. Their most recent or “hot” data stayed in the database, and older information was archived to a data warehouse via extract-transform-load (ETL) routines.
  6. But big data has changed dramatically. The evolution of the web has redefined:
The speed at which information flows into these primary online systems.
The number of customers a company must deal with.
The acceptable interval between the time that data first enters a system, and its transformation into information that can be analyzed to make key business decisions.
    1. Big Data is a term used to describe large collections of data (also known as data sets) that may be unstructured, and grow so large and quickly that it is difficult to manage with regular database or statistics tools.
  • Characteristics of Big Data
Four characteristics define Big Data: Volume, Variety, Value and Velocity

1. Volume – TB’s to PB’s of data
2. Velocity – how fast the data is coming in
3. Variety – all types are now being captured. (structured, semi-structured, unstructured)
4. Value – mining the valuable pieces of data from among data that does not matter.


  • The Volume of Data
    The sheer volume of data being stored today is exploding. In the year 2000, 800,000 petabytes (PB) of data were stored in the world. Of course, a lot of the
data that’s being created today isn’t analyzed at all and that’s another problem
we’re trying to address with BigInsights. We expect this number to reach 35 zettabytes (ZB) by 2020. Twitter alone generates more than 7 terabytes (TB) of data every day, Facebook 10 TB, and some enterprises generate terabytes of data every hour of every day of the year.

The volume of data available to organizations today is on the rise, while the percent of data they can analyze is on the decline.

  • The Variety of Data
The volume associated with the Big Data phenomena brings along new challenges
for data centers trying to deal with it: its variety. With the explosion of
sensors, and smart devices, as well as social collaboration technologies, data in
an enterprise has become complex, because it includes not only traditional relational data, but also raw, semi structured, and unstructured data from web
pages, web log files (including click-stream data), search indexes, social media
forums, e-mail, documents, sensor data from active and passive systems, and
so on. What’s more, traditional systems can struggle to store and perform the
required analytics to gain understanding from the contents of these logs because
much of the information being generated doesn’t lend itself to traditional
database technologies. In our experience, although some companies are
moving down the path, by and large, most are just beginning to understand
the opportunities of Big Data (and what’s at stake if it’s not considered).

  • The Velocity of Data
Just as the sheer volume and variety of data we collect and store has changed,
so, too, has the velocity at which it is generated and needs to be handled. A conventional understanding of velocity typically considers how quickly the data is
arriving and stored, and its associated rates of retrieval. While managing all of
that quickly is good—and the volumes of data that we are looking at are a consequence of how quick the data arrives—we believe the idea of velocity is actually something far more compelling than these conventional definitions.
To accommodate velocity, a new way of thinking about a problem must
start at the inception point of the data. Rather than confining the idea of velocity
to the growth rates associated with your data repositories, we suggest
you apply this definition to data in motion: The speed at which the data is
flowing. After all, we’re in agreement that today’s enterprises are dealing
with petabytes of data instead of terabytes, and the increase in RFID sensors
and other information streams has led to a constant flow of data at a pace
that has made it impossible for traditional systems to handle.

  • The Value of Data
The economic value of different data varies significantly. Typically there is good information hidden amongst a larger body of non-traditional data; the challenge is identifying what is valuable and then transforming and extracting that data for analysis.
  • Big Data Use Cases

  • Sentiment Analysis
Let's start with the most widely discussed use case, sentiment analysis. Whether looking for broad economic indicators, specific market indicators, or sentiments concerning a specific company or its stocks, there is obviously a trove of data to be harvested here, available from traditional as well as new media (including social media) sources. While news keyword analysis and entity extraction have been in play for a while, and are readily offered by many vendors, the availability of social media intelligence is relatively new and has certainly captured the attention of those looking to gauge public opinion. (In a previous post, I discussed the applicability of Semantic technology and Entity Extraction for this purpose, but as promised, I'm sticking to the usage topic this time).
Sentiment analysis is considered straightforward, as the data resides outside the institution and is therefore not confined by organizational boundaries. In fact, sentiment analysis is becoming so popular that some hedge funds are basing their entire strategies on trading signals generated by Twitter analytics. While this is an extreme example, most financial institutions at this point are using some sort of sentiment analysis to gauge public opinion about their company, market, or the economy as a whole.
  • Predictive Analytics
Another fairly common use case is predictive analytics. Including correlations, back-testing strategies, and probability calculations using Monte Carlo simulations, these analytics are the bread and butter of all capital market firms, and are relevant both for strategy development and risk management. The large amounts of historical market data, and the speed at which new data sometimes needs to be evaluated (e.g. complex derivatives valuations) certainly make this a big data problem. And while traditionally these types of analytics have been processed by large compute grids, today, more and more institutions are looking at technologies that would bring compute workloads closer to the data, in order to speed things up. In the past, these types of analytics have been primarily executed using proprietary tools, while today they are starting to move towards open source frameworks such as R and Hadoop (detailed in previous posts).
  • Risk Management
As we move closer to continuous risk management, broader calculations such as the aggregation of counter party exposure or VAR also fall within the realm of Big Data, if only due to the mounting pressure to rapidly analyze risk scenarios well beyond the capacity of current systems, while dealing with ever-growing volumes of data. New computing paradigms that parallelize data access as well as computation are gaining a lot of traction in this space. A somewhat related topic is the integration of risk and finance, as risk-adjusted returns and P&L require that growing amounts of data be integrated from multiple, standalone departments across the firm, and accessed and analyzed on the fly.
  • Rogue Trading
Speaking of finance and accounting, a less common use case - but one that is frequently discussed as we're faced with increasing implications - is rogue trading. Deep analytics that correlate accounting data with position tracking and order management systems can provide valuable insights that are not available using traditional data management tools. Here too, a lot of data needs to be crunched from multiple, inconsistent sources in a very dynamic way, requiring some of the technologies and patterns discussed in earlier posts.
  • Fraud
Turning our attention to the detection of more sinister fraud, a similar point can be made. Correlating data from multiple, unrelated sources has the potential to catch fraudulent activities earlier than current methods. Consider for instance the potential of correlating Point of Sale data (available to a credit card issuer) with web behavior analysis (either on the bank's site or externally), and cross-examining it with other financial institutions or service providers such as First Data or SWIFT. This would not only improve fraud detection but could also decrease the number of false positives (which are part and parcel of many travelers' experience today).
  • Retail Banking
Most banks are paying much closer attention to their customers these days than in the past, as many look at ways to offer new, targeted services in order to reduce customer turnover and increase customer loyalty, (and, in turn, the banks' revenue). In some ways this is no different than retailers’ targeted offering and discounting strategies. The attention that mobile wallets have been getting recently alone, attests to the importance that all parties involved – from retailers to telcos to financial institutions – are putting on these types of analytics, rendered even more powerful when geo-location information is added to the mix.
Banks, however, have additional concerns, as their products all revolve around risk, and the ability to accurately assess the risk profile of an individual or a loan is paramount to offering (or denying) services to a customer. Though the need to protect consumer privacy will always prevail, banks now have more access to web data about their customers – undoubtedly putting more informational options at their fingertips – to provide them with the valuable information needed to target service offerings with a greater level of sophistication and certainty. Additionally, web data can help to signal customer life events such as a marriage, childbirth, or a home purchase, which can help banks introduce opportunities for more targeted services. And again, with location information (available from almost every cell phone) banks can achieve extremely granular customer targeting.

Advanced MapReduce


Advanced MapReduce Features

Introduction

In previous Module you learned the basics of programming with Hadoop MapReduce. That module explains how data moves through a general MapReduce architecture, and what particular methods and classes facilitate the use of the Hadoop for processing. In this module we will look more closely at how to override Hadoop's functionality in various ways. These techniques allow you to customize Hadoop for application-specific purposes.

Goals for this Module:

  • Understand advanced Hadoop features
  • Be able to use Hadoop on Amazon EC2 and S3

Outline

  1. Introduction
  2. Goals for this Module
  3. Outline
  4. Custom Data Types
    1. Writable Types
    2. Custom Key Types
    3. Using Custom Types
    4. Faster Comparison Operations
    5. Final Writable Notes
  5. Input Formats
    1. Custom File Formats
    2. Alternate Data Sources
  6. Output Formats
  7. Partitioning Data
  8. Reporting Custom Metrics
  9. Distributing Auxiliary Job Data
  10. Distributing Debug Scripts
  11. Using Amazon Web Services

Custom Data Types

Hadoop MapReduce uses typed data at all times when it interacts with user-provided Mappers and Reducers: data read from files into Mappers, emitted by mappers to reducers, and emitted by reducers into output files is all stored in Java objects.

Writable Types

Objects which can be marshaled to or from files and across the network must obey a particular interface, called Writable, which allows Hadoop to read and write the data in a serialized form for transmission. Hadoop provides several stock classes which implement Writable: Text (which stores String data), IntWritable, LongWritable, FloatWritable, BooleanWritable, and several others. The entire list is in the org.apache.hadoop.io package of the Hadoop source.
In addition to these types, you are free to define your own classes which implement Writable. You can organize a structure of virtually any layout to fit your data and be transmitted by Hadoop. As a motivating example, consider a mapper which emits key-value pairs where the key is the name of an object, and the value is its coordinates in some 3-dimensional space. The key is some string-based data, and the value is a structure of the form:
struct point3d {
  float x;
  float y;
  float z;
}
The key can be represented as a Text object, but what about the value? How do we build a Point3D class which Hadoop can transmit? The answer is to implement the Writable interface, which requires two methods:
public interface Writable {
  void readFields(DataInput in);
  void write(DataOutput out);
}
The first of these methods initializes all of the fields of the object based on data contained in the binary stream in. The latter writes all the information needed to reconstruct the object to the binary stream out. The DataInput and DataOutput classes (part of java.io) contain methods to serialize most basic types of data; the important contract between your readFields() and write() methods is that they read and write the data from and to the binary stream in the same order. The following code implements a Point3D class usable by Hadoop:
public class Point3D implements Writable {
  public float x;
  public float y;
  public float z;

  public Point3D(float x, float y, float z) {
    this.x = x;
    this.y = y;
    this.z = z;
  }

  public Point3D() {
    this(0.0f, 0.0f, 0.0f);
  }

  public void write(DataOutput out) throws IOException {
    out.writeFloat(x);
    out.writeFloat(y);
    out.writeFloat(z);
  }

  public void readFields(DataInput in) throws IOException {
    x = in.readFloat();
    y = in.readFloat();
    z = in.readFloat();
  }

  public String toString() {
    return Float.toString(x) + ", "
        + Float.toString(y) + ", "
        + Float.toString(z);
  }
}
A Point class which implements Writable

Custom Key Types

As written, the Point3D type will work as a value type like we require for the mapper problem described above. But what if we want to emit Point3D objects as keys too? In Hadoop MapReduce, if (key, value) pairs sent to a single reduce task include multiple keys, the reducer will process the keys in sorted order. So key types must implement a stricter interface, WritableComparable. In addition to being Writable so they can be transmitted over the network, they also obey Java's Comparable interface. The following code listing extends Point3D to meet this interface:
public class Point3D implements WritableComparable {
  public float x;
  public float y;
  public float z;

  public Point3D(float x, float y, float z) {
    this.x = x;
    this.y = y;
    this.z = z;
  }

  public Point3D() {
    this(0.0f, 0.0f, 0.0f);
  }

  public void write(DataOutput out) throws IOException {
    out.writeFloat(x);
    out.writeFloat(y);
    out.writeFloat(z);
  }

  public void readFields(DataInput in) throws IOException {
    x = in.readFloat();
    y = in.readFloat();
    z = in.readFloat();
  }

  public String toString() {
    return Float.toString(x) + ", "
        + Float.toString(y) + ", "
        + Float.toString(z);
  }

  /** return the Euclidean distance from (0, 0, 0) */
  public float distanceFromOrigin() {
    return (float)Math.sqrt(x*x + y*y + z*z);
  }

  public int compareTo(Point3D other) {
    float myDistance = distanceFromOrigin();
    float otherDistance = other.distanceFromOrigin();

    return Float.compare(myDistance, otherDistance);
  }

  public boolean equals(Object o) {
    if (!(other instanceof Point3D)) {
      return false;
    }

    Point3D other = (Point3D)o;
    return this.x == other.x && this.y == other.y  && this.z == other.z;
  }

  public int hashCode() {
    return Float.floatToIntBits(x)
         ^ Float.floatToIntBits(y)
         ^ Float.floatToIntBits(z);
  }
}
A WritableComparable version of Point3D
It is important for key types to implement hashCode() as well; the section on Partitioners later in this module explains why. The methods hashCode() and equals() have been provided in this version of the class as well.

Using Custom Types

Now that you have created a custom data type, Hadoop must be told to use it. You can control the output key or value data type for a job by using the setOutputKeyClass() and setOutputValueClass() methods of the JobConf object that defines your job. By default, this will set the types expected as output from both the map and reduce phases. If your Mapper emits different types than the Reducer, you can set the types emitted by the mapper with the JobConf's setMapOutputKeyClass() and setMapOutputValueClass() methods. These implicitly set the input types expected by the Reducer. The types delivered as input to the Mapper are governed by the InputFormat used; see the next section of this module for more details.

Faster Comparison Operations

The default sorting process for keys will read instances of the key type in from a stream, parsing the byte stream with the readFields() method of the key class, and then call the compareTo() method of the key class on the two objects. For faster performance, it may be possible to decide on an ordering between two keys just by looking at the byte streams and without parsing all of the data contained therein. For example, consider comparing strings of text. If characters are read in sequentially, then a decision can be made on their ordering as soon as a character position is found where the two strings differ. Even if all of the bytes for the object must be read in, the object itself does not necessarily need to be instantiated around those bytes. To support this higher-speed sorting mechanism, you can extend the WritableComparator class with a comparator specific to your own data type. In particular, the method which should be overridden is
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2)
The default implementation is in the class org.apache.hadoop.io.WritableComparator. The relevant method has been reproduced here:
  public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
    try {
      buffer.reset(b1, s1, l1);                   // parse key1
      key1.readFields(buffer);

      buffer.reset(b2, s2, l2);                   // parse key2
      key2.readFields(buffer);

    } catch (IOException e) {
      throw new RuntimeException(e);
    }

    return compare(key1, key2);                   // compare them
  }
Its operation is exactly as described above; it performs the straightforward comparison of the two objects after they have been individually deserialized from their separate byte streams (the b variables), which each have their own start offset (s) and length (l) attributes. Both objects must be fully constructed and deserialized before comparison can occur. The Text class, on the other hand, allows incremental comparison via its own implementation of this method. The code from org.apache.hadoop.io.Text is shown here:
   /** A WritableComparator optimized for Text keys. */
  public static class Comparator extends WritableComparator {
    public Comparator() {
      super(Text.class);
    }

    public int compare(byte[] b1, int s1, int l1,
                       byte[] b2, int s2, int l2) {
      int n1 = WritableUtils.decodeVIntSize(b1[s1]);
      int n2 = WritableUtils.decodeVIntSize(b2[s2]);
      return compareBytes(b1, s1+n1, l1-n1, b2, s2+n2, l2-n2);
    }
  }
The Text object is serialized by first writing its length field to the byte stream, followed by the UTF-encoded string. The method decodeVIntSize determines the length of the integer describing the length of the byte stream. The comparator then skips these bytes, directly comparing the UTF-encoded bytes of the actual string-portion of the stream in the compareBytes() method. As soon as it finds a character in which the two streams differ, it returns a result without examining the rest of the strings.
Note that you do not need to manually specify this comparator's use in your Hadoop programs. Hadoop automatically uses this special comparator implementation for Text data due to the following code being added to Text's static initialization:
  static {
    // register this comparator
    WritableComparator.define(Text.class, new Comparator());
  }

Final Writable Notes

Defining custom writable types allows you to intelligently use Hadoop to manipulate higher-level data structures, without needing to use toString() to convert all your data types to text for sending over the network. If you will be using a type in a lot of MapReduce jobs, or you must process a very large volume of them (as is usually the case in Hadoop), defining your own data type classes will provide a significant performance benefit.
Exercise: Assume that we have a mapper which emits line segments as keys and values. A line segment is defined by its endpoints. For our purposes, line segments can be ordered by their lengths. Implement a LineSegment class which implements WritableComparable. Hint: make use of Point3D objects.

Input Formats

The InputFormat defines how to read data from a file into the Mapper instances. Hadoop comes with several implementations of InputFormat; some work with text files and describe different ways in which the text files can be interpreted. Others, like SequenceFileInputFormat, are purpose-built for reading particular binary file formats. These types are described in more detail in previous documents.
More powerfully, you can define your own InputFormat implementations to format the input to your programs however you want. For example, the default TextInputFormat reads lines of text files. The key it emits for each record is the byte offset of the line read (as a LongWritable), and the value is the contents of the line up to the terminating '\n' character (as a Text object). If you have multi-line records each separated by a $ character, you could write your own InputFormat that parses files into records split on this character instead.
Another important job of the InputFormat is to divide the input data sources (e.g., input files) into fragments that make up the inputs to individual map tasks. These fragments are called "splits" and are encapsulated in instances of the InputSplit interface. Most files, for example, are split up on the boundaries of the underlying blocks in HDFS, and are represented by instances of the FileInputSplit class. Other files may be unsplittable, depending on application-specific data. Dividing up other data sources (e.g., tables from a database) into splits would be performed in a different, application-specific fashion. When dividing the data into input splits, it is important that this process be quick and cheap. The data itself should not need to be accessed to perform this process (as it is all done by a single machine at the start of the MapReduce job).
The TextInputFormat divides files into splits strictly by byte offsets. It then reads individual lines of the files from the split in as record inputs to the Mapper. The RecordReader associated with TextInputFormat must be robust enough to handle the fact that the splits do not necessarily correspond neatly to line-ending boundaries. In fact, the RecordReader will read past the theoretical end of a split to the end of a line in one record. The reader associated with the next split in the file will scan for the first full line in the split to begin processing that fragment. All RecordReader implementations must use some similar logic to ensure that they do not miss records that span InputSplit boundaries.

Custom File Formats

In this section we will describe how to develop a custom InputFormat that reads files of a particular format.
Rather than implement InputFormat directly, it is usually best to subclass the FileInputFormat. This abstract class provides much of the basic handling necessary to manipulate files. If we want to parse the file in a particular way, then we must override the getRecordReader() method, which returns an instance of RecordReader: an object that can read from the input source. To motivate this discussion with concrete code, we will develop an InputFormat and RecordReader implementation which can read lists of objects and positions from files. We assume that we are reading text files where each line contains the name of an object and then its coordinates as a set of three comma-separated floating-point values. For instance, some sample data may look like the following:
ball, 3.5, 12.7, 9.0
car, 15, 23.76, 42.23
device, 0.0, 12.4, -67.1
We must read individual lines of the file, separate the key (Text) from the three floats, and then read those into a Point3D object as we developed earlier.
The ObjectPositionInputFormat class itself is very straightforward. Since it will be reading from files, all we need to do is define a factory method for RecordReader implementations:
public class ObjectPositionInputFormat extends
    FileInputFormat<Text, Point3D> {

  public RecordReader<Text, Point3D> getRecordReader(
      InputSplit input, JobConf job, Reporter reporter)
      throws IOException {

    reporter.setStatus(input.toString());
    return new ObjPosRecordReader(job, (FileSplit)input);
  }
}
Listing 5.3: InputFormat for object-position files
Note that we define the types of the keys and values emitted by the InputFormat in its definition; these must match the types read in as input by the Mapper in its class definition.
The RecordReader implementation is where the actual file information is read and parsed. We will implement this by making use of the LineRecordReader class; this is the RecordReader implementation used by TextInputFormat to read individual lines from files and return them unparsed. We will wrap the LineRecordReader with our own implementation which converts the values to the expected types. By using LineRecordReader, we do not need to worry about what happens if a record spans an InputSplit boundary, since this underlying record reader already has logic to take care of this fact.



class ObjPosRecordReader implements RecordReader<Text, Point3D> {

  private LineRecordReader lineReader;
  private LongWritable lineKey;
  private Text lineValue;

  public ObjPosRecordReader(JobConf job, FileSplit split) throws IOException {
    lineReader = new LineRecordReader(job, split);

    lineKey = lineReader.createKey();
    lineValue = lineReader.createValue();
  }

  public boolean next(Text key, Point3D value) throws IOException {
    // get the next line
    if (!lineReader.next(lineKey, lineValue)) {
      return false;
    }

    // parse the lineValue which is in the format:
    // objName, x, y, z
    String [] pieces = lineValue.toString().split(",");
    if (pieces.length != 4) {
      throw new IOException("Invalid record received");
    }

    // try to parse floating point components of value
    float fx, fy, fz;
    try {
      fx = Float.parseFloat(pieces[1].trim());
      fy = Float.parseFloat(pieces[2].trim());
      fz = Float.parseFloat(pieces[3].trim());
    } catch (NumberFormatException nfe) {
      throw new IOException("Error parsing floating point value in record");
    }

    // now that we know we'll succeed, overwrite the output objects

    key.set(pieces[0].trim()); // objName is the output key.

    value.x = fx;
    value.y = fy;
    value.z = fz;

    return true;
  }

  public Text createKey() {
    return new Text("");
  }

  public Point3D createValue() {
    return new Point3D();
  }

  public long getPos() throws IOException {
    return lineReader.getPos();
  }

  public void close() throws IOException {
    lineReader.close();
  }

  public float getProgress() throws IOException {
    return lineReader.getProgress();
  }
}
RecordReader for object-position files
You can control the InputFormat used by your MapReduce job with the JobConf.setInputFormat() method.
Exercise: Write an InputFormat and RecordReader that read strings of text separated by '$' characters instead of newlines.

Alternate Data Sources

An InputFormat describes both how to present the data to the Mapper and where the data originates from. Most implementations descend from FileInputFormat, which reads from files on the local machine or HDFS. If your data does not come from a source like this, you can write an InputFormat implementation that reads from an alternate source. For example, HBase (a distributed database system) provides a TableInputFormat that reads records from a database table. You could imagine a system where data is streamed to each machine over the network on a particular port; the InputFormat reads data from the port and parses it into individual records for mapping.

Output Formats

The InputFormat and RecordReader interfaces define how data is read into a MapReduce program. By analogy, the OutputFormat and RecordWriter interfaces dictate how to write the results of a job back to the underlying permanent storage. Several useful OutputFormat implementations are described in. The default format (TextOutputFormat) will write (key, value) pairs as strings to individual lines of an output file (using the toString() methods of the keys and values). The SequenceFileOutputFormat will keep the data in binary, so it can be later read quickly by the SequenceFileInputFormat. These classes make use of the write() and readFields() methods of the specific Writable classes used by your MapReduce pass.
You can define your own OutputFormat implementation that will write data to an underlying medium in the format that you control. If you want to write to output files on the local system or in HDFS, you should extend the FileOutputFormat abstract class. When you want to use a different output format, you can control this with the JobConf.setOutputFormat() method.
Why might we want to define our own OutputFormat? A custom OutputFormat allows you to exactly control what data is put into a file, and how it is laid out. Suppose another process you use has a custom input file format. Your MapReduce job is supposed to generate inputs compatible with this program. You may develop an OutputFormat implementation which will produce the correct type of file to work with this subsequent process in your tool chain. As an example of how to write an OutputFormat, we will walk through the implementation of a simple XML-based format developed for this tutorial, XmlOutputFormat. Given a set of (key, value) pairs from the Reducer, (e.g., (k1, v1), (k2, v2), etc...) this will generate a file laid out like so:
<results>
  <k1>v1</k1>
  <k2>v2</k2>

  ...
</results>
The code to generate these files is presented below:
import java.io.DataOutputStream;
import java.io.IOException;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Progressable;

public class XmlOutputFormat<K, V> extends FileOutputFormat {

  protected static class XmlRecordWriter<K, V> implements RecordWriter<K, V> {
    private static final String utf8 = "UTF-8";

    private DataOutputStream out;

    public XmlRecordWriter(DataOutputStream out) throws IOException {
      this.out = out;
      out.writeBytes("<results>\n");
    }

    /**
     * Write the object to the byte stream, handling Text as a special case.
     *
     * @param o
     *          the object to print
     * @throws IOException
     *           if the write throws, we pass it on
     */
    private void writeObject(Object o) throws IOException {
      if (o instanceof Text) {
        Text to = (Text) o;
        out.write(to.getBytes(), 0, to.getLength());
      } else {
        out.write(o.toString().getBytes(utf8));
      }
    }

    private void writeKey(Object o, boolean closing) throws IOException {
      out.writeBytes("<");
      if (closing) {
        out.writeBytes("/");
      }
      writeObject(o);
      out.writeBytes(">");
      if (closing) {
        out.writeBytes("\n");
      }
    }

    public synchronized void write(K key, V value) throws IOException {

      boolean nullKey = key == null || key instanceof NullWritable;
      boolean nullValue = value == null || value instanceof NullWritable;

      if (nullKey && nullValue) {
        return;
      }

      Object keyObj = key;

      if (nullKey) {
        keyObj = "value";
      }

      writeKey(keyObj, false);

      if (!nullValue) {
        writeObject(value);
      }

      writeKey(keyObj, true);
    }

    public synchronized void close(Reporter reporter) throws IOException {
      try {
        out.writeBytes("</results>\n");
      } finally {
        // even if writeBytes() fails, make sure we close the stream
        out.close();
      }
    }
  }

  public RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job,
      String name, Progressable progress) throws IOException {
    Path file = FileOutputFormat.getTaskOutputPath(job, name);
    FileSystem fs = file.getFileSystem(job);
    FSDataOutputStream fileOut = fs.create(file, progress);
    return new XmlRecordWriter<K, V>(fileOut);
  }
}
The FileOutputFormat which XmlOutputFormat subclasses will handle most of the heavy lifting. The only method directly implemented in XmlOutputFormat is getRecordWriter(), which is a factory method for the RecordWriter object which will actually write the file. The inner class XmlRecordWriter is the implementation which generates files in the format shown above. The RecordWriter is initialized with an output stream connected to a file in the output file system. At the same time, the XML prologue is written into the output file. The particular output file system and filename associated with this output stream are determined based on the current job configuration. The XmlRecordWriter's write() method is then called each time a (key, value) pair is provided to the OutputCollector by the Reducer. When the Reducer finishes, the close() method of the XmlRecordWriter will write the XML epilogue and close the underlying stream.

Partitioning Data

"Partitioning" is the process of determining which reducer instance will receive which intermediate keys and values. Each mapper must determine for all of its output (key, value) pairs which reducer will receive them. It is necessary that for any key, regardless of which mapper instance generated it, the destination partition is the same. If the key "cat" is generated in two separate (key, value) pairs, they must both be reduced together. It is also important for performance reasons that the mappers be able to partition data independently -- they should never need to exchange information with one another to determine the partition for a particular key.
Hadoop uses an interface called Partitioner to determine which partition a (key, value) pair will go to. A single partition refers to all (key, value) pairs which will be sent to a single reduce task. Hadoop MapReduce determines when the job starts how many partitions it will divide the data into. If twenty reduce tasks are to be run (controlled by the JobConf.setNumReduceTasks()) method), then twenty partitions must be filled.
The Partitioner defines one method which must be filled:
public interface Partitioner<K, V> extends JobConfigurable {
  int getPartition(K key, V value, int numPartitions);
}
The getPartition() method receives a key and a value and the number of partitions to split the data across; a number in the range [0, numPartitions) must be returned by this method, indicating which partition to send the key and value to. For any two keys k1 and k2, k1.equals(k2) implies getPartition(k1, *, n) == getPartition(k2, *, n).
The default Partitioner implementation is called HashPartitioner. It uses the hashCode() method of the key objects modulo the number of partitions total to determine which partition to send a given (key, value) pair to.
For most randomly-distributed data, this should result in all partitions being of roughly equal size. If the data in your data set is skewed in some way, however, this might not produce good results. For example, if you know that the key 0 will appear much more frequently than any other key, then you may want to send all the 0-keyed data to one partition, and distribute the other keys over all other partitions by their hashCode(). Also, if the hashCode() method for your data type does not provide uniformly-distributed values over its range, then data may not be sent to reducers evenly. Poor partitioning of data means that some reducers will have more data input than others, which usually means they'll have more work to do than the other reducers. Thus the entire job will wait for one reducer to finish its extra-large share of the load, when it might have been possible to spread that across many different reducers.
If you are dissatisfied with the performance of HashPartitioner, you are free to write your own Partitioner implementation. It can be general-purpose, or tailored to the specific data types or values that you expect to use in your application. After implementing the Partitioner interface, use the JobConf.setPartitionerClass() method to tell Hadoop to use it for your job.

Reporting Custom Metrics

The Hadoop system records a set of metric counters for each job that it runs. For example, the number of input records mapped, the number of bytes it reads from or writes to HDFS, etc. To profile your applications, you may wish to record other values as well. For example, if the records sent into your mappers fall into two categories (call them "A" and "B"), you may wish to count the total number of A-records seen vs. the total number of B-records.
The Reporter object passed in to your Mapper and Reducer classes can be used to update counters. The same set of counter variables can be contributed to by all Mapper and Reducer instances across your cluster. The values are aggregated by the master node of the cluster, so they are "thread-safe" in this manner.
Counters are incremented through the Reporter.incrCounter() method. The names of the counters are defined as Java enum's. The following example demonstrates how to count the number of "A" vs. "B" records seen by the mapper:
public class MyMapper extends MapReduceBase implements
    Mapper<Text, Text, Text, Text> {

  static enum RecordCounters { TYPE_A, TYPE_B, TYPE_UNKNOWN };

  // actual definitions elided
  public boolean isTypeARecord(Text input) { ... }
  public boolean isTypeBRecord(Text input) { ... }

  public void map(Text key, Text val, OutputCollector<Text, Text> output,
      Reporter reporter) throws IOException {

    if (isTypeARecord(key)) {
      reporter.incrCounter(RecordCounters.TYPE_A, 1);
    } else if (isTypeBRecord(key)) {
      reporter.incrCounter(RecordCounters.TYPE_B, 1);
    } else {
      reporter.incrCounter(RecordCounters.TYPE_UNKNOWN, 1);
    }

    // actually process the record here, call
    // output.collect( .. ), etc.
  }
}
If you launch your job with JobClient.runJob(), the diagnostic information printed to stdout when the job completes will contain the values of all the counters. Both runJob() and submitJob() will return a RunningJob object that refers to the job in question. The RunningJob.getCounters() method will return a Counters object that contains the values of all the counters so that you can query them programmatically. The Counters.getCounter(Enum key) method returns the value of a particular counter.

Distributing Auxiliary Job Data

The bulk of the data that you process in a MapReduce job will probably be stored in large files spread across the HDFS. You can reliably store petabytes of information in HDFS and individual jobs can process several terabytes at a time. The HDFS access model, however, assumes that the data from a file should be read into a single mapper. The individual files stored in HDFS are very large and can possibly be broken into different chunks for processing in parallel.
Sometimes it is necessary for every Mapper to read a single file; for example, a distributed spell-check application would require every Mapper to read in a copy of the dictionary before processing documents. The dictionary will be small (only a few megabytes), but needs to be widely available so that all nodes can reach it.
Hadoop provides a mechanism specifically for this purpose, called the distributed cache. The distributed cache can contain small data files needed for initialization or libraries of code that may need to be accessed on all nodes in the cluster.
To use the distributed cache to disseminate files, create an instance of the DistributedCache class when setting up your job. Use the DistributedCache.addCacheFile() method to add names of files which should be sent to all nodes on the system. The file names are specified as URI objects; unless qualified otherwise, they assume that the file is present on the HDFS in the path indicated. You can copy local files to HDFS with the FileSystem.copyFromLocalFile() method.
When you want to retrieve files from the distributed cache (e.g., when the mapper is in its configure() step and wants to load config data like the dictionary mentioned above), use the DistributedCache.getLocalCacheFiles() method to retrieve the list of paths local to the current node for the cached files. These are copies of all cached files, placed in the local file system of each worker machine. (They will be in a subdirectory of mapred.local.dir.) Each of the paths returned by getLocalCacheFiles() can be accessed via regular Java file I/O mechanisms, such as java.io.FileInputStream.
As a cautionary note: If you use the local JobRunner in Hadoop (i.e., what happens if you call JobClient.runJob() in a program with no or an empty hadoop-conf.xml accessible), then no local data directory is created; the getLocalCacheFiles() call will return an empty set of results. Unit test code should take this into account.
Suppose that we were writing an inverted index builder. We do not want to include very common words such "the," "a," "and," etc. These so-called stop words might all be listed in a file. All the mappers should read the stop word list when they are initialized, and then filter the index they generate against this list. We can disseminate a list of stop words to all the Mappers with the following code. The first listing will put the stop-words file into the distributed cache:












  public static final String LOCAL_STOPWORD_LIST =
      "/home/aaron/stop_words.txt";

  public static final String HDFS_STOPWORD_LIST = "/data/stop_words.txt";

  void cacheStopWordList(JobConf conf) throws IOException {
    FileSystem fs = FileSystem.get(conf);
    Path hdfsPath = new Path(HDFS_STOPWORD_LIST);

    // upload the file to hdfs. Overwrite any existing copy.
    fs.copyFromLocalFile(false, true, new Path(LOCAL_STOPWORD_LIST),
        hdfsPath);

    DistributedCache.addCacheFile(hdfsPath.toUri(), conf);
  }
  This code copies the local stop_words.txt file into HDFS, and then tells the distributed cache to send the HDFS copy to all nodes in the system. The next listing actually uses the file in the mapper:
class IndexMapperExample implements Mapper {
  void configure(JobConf conf) {
    try {
      String stopwordCacheName = new Path(HDFS_STOPWORD_LIST).getName();
      Path [] cacheFiles = DistributedCache.getLocalCacheFiles(conf);
      if (null != cacheFiles && cacheFiles.length > 0) {
        for (Path cachePath : cacheFiles) {
          if (cachePath.getName().equals(stopwordCacheName)) {
            loadStopWords(cachePath);
            break;
          }
        }
      }
    } catch (IOException ioe) {
      System.err.println("IOException reading from distributed cache");
      System.err.println(ioe.toString());
    }
  }
  void loadStopWords(Path cachePath) throws IOException {
    // note use of regular java.io methods here - this is a local file now
    BufferedReader wordReader = new BufferedReader(
        new FileReader(cachePath.toString()));
    try {
      String line;
      this.stopWords = new HashSet<String>();
      while ((line = wordReader.readLine()) != null) {
        this.stopWords.add(line);
      }
    } finally {
      wordReader.close();
    }
}

  /* actual map() method, etc go here */
}
The code above belongs in the Mapper instance associated with the index generation process. We retrieve the list of files cached in the distributed cache. We then compare the basename of each file (using Path.getName()) with the one we expect for our stop word list. Once we find this file, we read the words, one per line, into a Set instance that we will consult during the mapping process.
The distributed cache has additional uses too. For instance, you can use the DistributedCache.addArchiveToClassPath() method to send a .jar file to all the nodes. It will be inserted into the classpath as well, so that classes in the archive can be accessed by all the nodes.

Distributing Debug Scripts

Hadoop will generate a large number of log files for a job, distributed across all the nodes that participated in the job's execution. Often times only a subset of these logs will be of interest when debugging failing tasks. MapReduce can help with this by running a user-provided script when either a map or reduce task fails. These scripts are provided the names of files containing the stdout and stderr from the task, as well as the task's Hadoop log and job.xml file (i.e., its complete JobConf in serialized form).
These scripts will be run on whichever node encounters failing tasks. You can use these scripts to perform automation to allow you to more easily inspect only the failing tasks: e.g., email the stdout/stderr to an administrator email address; upload the failed task's log files to a common NFS-mounted "debug dump" directory, preserve local state modifications made by map tasks, etc.
Separate scripts can be provided for map and reduce task failure. They each receive as arguments, in order, the names of files containing the task's stdout, stderr, syslog, and jobconf. Because they are run on all the task nodes, and not on the client machine where the job was submitted, these scripts must be sent to the nodes through the distributed cache listed above.
The following method will enable failed task scripts on a MapReduce job being prepared. It assumes that you have given it the names of two scripts (e.g., bash scripts) which do your debug actions with the log filenames provided (e.g., copy them to a shared NFS mount). In this script these are /home/aaron/src/map-fail and reduce-fail.
  private static final String FAILED_MAP_SCRIPT_NAME = "map-fail";
  private static final String FAILED_REDUCE_SCRIPT_NAME = "reduce-fail";

  private static final String HDFS_SCRIPT_DIR = "/debug";

  private static final String HDFS_FAILED_MAP_SCRIPT =
    HDFS_SCRIPT_DIR + "/" + FAILED_MAP_SCRIPT_NAME;
  private static final String HDFS_FAILED_REDUCE_SCRIPT =
    HDFS_SCRIPT_DIR + "/" + FAILED_REDUCE_SCRIPT_NAME;
  private static final String LOCAL_FAILED_MAP_SCRIPT  =
    "/home/aaron/src/" + FAILED_MAP_SCRIPT_NAME;
  private static final String LOCAL_FAILED_REDUCE_SCRIPT =
    "/home/aaron/src/" + FAILED_REDUCE_SCRIPT_NAME;

  public static void setupFailedTaskScript(JobConf conf) throws IOException {

    // create a directory on HDFS where we'll upload the fail scripts
    FileSystem fs = FileSystem.get(conf);
    Path debugDir = new Path(HDFS_SCRIPT_DIR);

    // who knows what's already in this directory; let's just clear it.
    if (fs.exists(debugDir)) {
      fs.delete(debugDir, true);
    }

    // ...and then make sure it exists again
    fs.mkdirs(debugDir);

    // upload the local scripts into HDFS
    fs.copyFromLocalFile(new Path(LOCAL_FAILED_MAP_SCRIPT),
        new Path(HDFS_FAILED_MAP_SCRIPT));
    fs.copyFromLocalFile(new Path(LOCAL_FAILED_REDUCE_SCRIPT),
        new Path(HDFS_FAILED_REDUCE_SCRIPT));

    conf.setMapDebugScript("./" + FAILED_MAP_SCRIPT_NAME);
    conf.setReduceDebugScript("./" + FAILED_REDUCE_SCRIPT_NAME);
    DistributedCache.createSymlink(conf);

    URI fsUri = fs.getUri();

    String mapUriStr = fsUri.toString() + HDFS_FAILED_MAP_SCRIPT
        + "#" + FAILED_MAP_SCRIPT_NAME;
    URI mapUri = null;
    try {
      mapUri = new URI(mapUriStr);
    } catch (URISyntaxException use) {
      throw new IOException(use);
    }

    DistributedCache.addCacheFile(mapUri, conf);

    String reduceUriStr = fsUri.toString() + HDFS_FAILED_REDUCE_SCRIPT
        + "#" + FAILED_REDUCE_SCRIPT_NAME;
    URI reduceUri = null;
    try {
      reduceUri = new URI(reduceUriStr);
    } catch (URISyntaxException use) {
      throw new IOException(use);
    }

    DistributedCache.addCacheFile(reduceUri, conf);
  }
How does this all work? The scripts are, presumably, initially hosted on the client machine that is submitting the job. The client is responsible for injecting them into HDFS and enabling them in the distributed cache. It first creates the HDFS_SCRIPT_DIR and then uploads the local script files into this directory.



It must then set the commands for the TaskTracker to execute to run the scripts. This is accomplished by the lines:
    conf.setMapDebugScript("./" + FAILED_MAP_SCRIPT_NAME);
    conf.setReduceDebugScript("./" + FAILED_REDUCE_SCRIPT_NAME);
    DistributedCache.createSymlink(conf);
The distributed cache copies the files to the mapred.local.dir on each task node. The TaskTracker will then execute the scripts if necessary. But the TaskTracker does not run with its working directory set to mapred.local.dir. Fortunately, the distributed cache can be told to create symlinks in the working directory for files in the distributed cache. The third line of the snippit above enables this functionality. Now ./FAILED_MAP_SCRIPT_NAME will point to the copy of FAILED_MAP_SCRIPT_NAME in the local cache directory, and the script can be run.
But before that can happen, we must add the files themselves to the distributed cache. (As of now they are only in HDFS.) Ordinarily, we could just call DistributedCache.addCacheFile(new Path("hdfs_path_to_some_file").toUri()) on a filename and that would be sufficient. But since we need to create symlinks, we must provide the distributed cache with information as to how the symlink should be created--what filename it should take in the working directory. This is provided as the URI "anchor" part following the "#" in the URI. A subtlety of Hadoop's Path class is that if you put a '#' in the path string, it will URL-encode it and treat it as part of the filename. Therefore, we use some extra code to construct our URIs manually to ensure that the '#' remains unescaped.

Using Amazon Web Services

Hadoop's power comes from its ability to perform work on a large number of machines simultaneously. What if you want to experiment with Hadoop, but do not have many machines? While operations on a two or four-node cluster are functionally equivalent to those on a 40 or 100-node cluster, processing larger volumes of data will require a larger number of nodes.
Amazon provides machines for rent on demand through their Elastic Compute Cloud (a.k.a. EC2) service. EC2 is part of a broader set of services collectively called the Amazon Web Services, or AWS. EC2 allows you request a set of nodes ("instances" in their parlance) for as long as you need them. You pay by the instance*hour, plus costs for bandwidth. You can use EC2 instances to run a Hadoop cluster. Hadoop comes with a set of scripts which will provision EC2 instances.
The first step in this process is visit the EC2 web site (link above) and click "Sign Up For This Web Service". You will need to create an account and provide billing information. Then follow the instructions in the Getting started guide to set up your account and configure your system to run the AWS tools.
Once you have done so, follow the instructions in the Hadoop wiki specific to running Hadoop on Amazon EC2. While more details are available in the above document, the shortest steps to provisioning a cluster are:
  • Edit src/contrib/ec2/bin/hadoop-ec2-env.sh to contain your Amazon account information and parameters about the desired cluster size.
  • Execute src/contrib/ec2/bin/hadoop-ec2 launch-cluster.
After the cluster has been started, you can log in to the head node over ssh with the bin/hadoop-ec2 login script, and perform your MapReduce computation. When you are done, log out and type bin/hadoop-ec2 terminate-cluster to release the EC2 instances. The contents of the virtual hard drives on the instances will disappear, so be sure to copy off any important data with scp or another tool first!
A very thorough introduction to configuring Hadoop on EC2 and running a test job is provided in the Amazon Web Services Developer Wiki site.