Creating Oozie workflow for mapreduce job that uses distributed cache

In the Using third part jars and files in your MapReduce application(Distributed cache) entry i blogged about how to create a MapReduce job that uses distributed cache for storing both required jar files and files for use in distributed cache. I wanted to figure out how to automate this mapreduce job using Apache Oozie so i followed these steps
  1. First i did create apachelog directory and in that i had to create job.properties file like this
  2. Then i create workflow.xml file that looks like this, in this one thing to notice is <file>GeoLite.mmdb#GeoLite2-City.mmdb</file>, so basically i have file GeoLite.mmdb on the disk but i want to refer to it as GeoLite2-City.mmdb in my program so that file element takes care of creating symlink
  3. Then i copied all the required jar files in the lib folder and then this is how my directory structure looks like
  4. I used following command to copy the apachelog directory that has everything that my oozie job needs to the hdfs
    
    hdfs dfs -put apachelog apachelog
    
  5. Last step is to invoke the oozie job by executing following command
    
    oozie job -oozie http://localhost:11000/oozie -config job.properties -run
    

Using Apache Oozie for automating streaming map-reduce job

In the WordCount MapReduce program using Hadoop streaming and python i talked about how to create a Streaming map-reduce job using python. I wanted to figure out how to automate that program using Oozie workflow so i followed these steps
  1. First step was to create a folder called streaming on my local machine and copying of mapper.py, reducer.py into the streaming folder, i also create the place holder for job.properties and workflow.xml
  2. Next i did create a job.properties file like this Now this job.properties is quite similar to the job.properties for java mapreduce job, only difference is you must set oozie.use.system.libpath=true, by default the streaming related jars are not included in the classpath, so unless you set that value to true you will get following error
    
    2014-07-23 06:15:13,170 WARN org.apache.hadoop.mapred.Child: Error running child
    java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.streaming.Pi
    peMapRunner not found
     at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:1649)
     at org.apache.hadoop.mapred.JobConf.getMapRunnerClass(JobConf.java:1010)
     at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:413)
     at org.apache.hadoop.mapred.MapTask.run(MapTask.java:332)
     at org.apache.hadoop.mapred.Child$4.run(Child.java:268)
     at java.security.AccessController.doPrivileged(Native Method)
     at javax.security.auth.Subject.doAs(Subject.java:396)
     at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408)
     at org.apache.hadoop.mapred.Child.main(Child.java:262)
    Caused by: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.streaming.PipeMapRunner not f
    ound
     at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:1617)
     at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:1641)
     ... 8 more
    Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.streaming.PipeMapRunner not found
     at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:1523)
     at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:1615)
     ... 9 more
    2014-07-23 06:15:13,175 INFO org.apache.hadoop.mapred.Task: Runnning cleanup for the task
    
  3. Next step in the process is to create workflow.xml file like this, make sure to add <file>mapper.py#mapper.py</file> element in the workflow.xml, which takes care of putting the mapper.py and reducer.py in the sharedlib and creating symbolic link to these two files.
  4. Upload the streaming folder with all your changes on hdfs by executing following command
    
    hdfs dfs -put streaming streaming
    
  5. You can trigger the oozie workflow by executing following command
    
    oozie job -oozie http://localhost:11000/oozie -config streaming/job.properties -run
    

Using Apache Oozie to execute MapReduce jobs

I wanted to learn about how to automate MapReduce job using Oozie, so i decide to create Oozie workflow to invoke WordCount(HelloWorld) MapReduce program. I had to follow these steps
  1. FIrst thing that i did was to download the WordCount program source code by executing
    
    git clone https://github.com/sdpatil/HadoopWordCount3
    
    This program does have maven script for building executable jar, so i used mvn clean package command to build Hadoop jar.
  2. After that i tried executing the program manually by using following following command
    
    hadoop jar target/HadoopWordCount.jar sorttest.txt output/wordcount
    
  3. Now in order to use Oozie workflow you will have to create a particular folder structure on your machine
    
    wordcount
       -- job.properties
       -- workflow.xml
       -- lib
             -- HadoopWordCount.jar  
    
  4. In the workcount folder create job.properties file like this, This file lets you pass parameters to your oozie workflow. Value of nameNode and jobTracker represent the name node and job tracker location. In my case i am using cloudera vm with single ndoe so both these properties point to localhost. The value of oozie.wf.application.path is equal to HDFS path where you uploaded the wordcount folder created in step 3
  5. Next define your Apache oozie workflow.xml file like this. In my case the workflow has single step which is to execute mapreduce job. I am
    • mapred.mapper.new-api & mapred.reducer.new-api: Set this property to true if your using the new MapReduce API based on org.apache.hadoop.mapreduce.* classes
    • mapreduce.map.class: The fully qualified name of your mapper class
    • mapreduce.reduce.class: The fully qualified name of your reducer class
    • mapred.output.key.class: Fully qualified name of the output key class. This is same as parameter to job.setOutputKeyClass() in your driver class
    • mapred.output.value.class: Fully qualified name of the output value class. This is same as parameter to job.setOutputValueClass() in your driver class
    • mapred.input.dir: Location of your input file in my case i have sorttext.txt in hdfs://localhost/user/cloudera directory
    • mapred.output.dir:Location of output file that will get generated. In my case i want output to go to hdfs://localhost/user/cloudera/output/wordcount directory
  6. Once your oozie workflow is ready upload the wordcount folder in HDFS by executing following command
    
    hdfs dfs -put oozie wordcount
    
  7. 
    Now run your oozie workflow by executing following command from your wordcount directory
    oozie job -oozie http://localhost:11000/oozie -config job.properties -run
    
    If it runs successfully you should see output generated in hdfs://localhost/user/cloudera/output/wordcount directory

Enabling Oozie console on Cloudera VM 4.4.0 and executing examples

I am trying to learn about Apache Oozie, so i wanted to figure out how to use it in Cloudera 4.4.0 VM. When you go to the Oozie web console it shows a message saying that the Console is disabled. In order to enable the console i had to follow these steps
  1. Go to your Cloudera Manager, in that i went to the oozie configuration screen and i did check the Enable Oozie Server Web Console screen like this. As you can see in the description it says install ExtJS2.2 in /usr/lib/oozie/libext
  2. Next i did go to /usr/lib/oozie/libext directory and executed following command to download the ext-2.2.zip.
    
    wget 'http://extjs.com/deploy/ext-2.2.zip'
    
    Since i am using CDH 4.4 i had to execute unzip ext-2.2.zip to unzip the ext-2.2.zip
  3. Last step was to restart the oozie service and now i could see the Oozie web console
Executing oozie examples After the Oozie console was enabled i wanted to execute oozie example to test out my installation so i followed these steps
  1. First thing for me was to find the oozie-examples.tar.gz file on my vm
    
    find / -name oozie-examples.tar.gz
    
    I found it under /usr/share/doc/oozie-3.3.2+92/ directory. So i did untar it using tar xvf oozie-examples.tar.gz
  2. Then i had to make change in the job.properties to change value of namenode and jobTracker from localhost to localhost.localdomain get rid of Error: E0901 : E0901: Namenode [localhost:8020] not allowed, not in Oozies whitelist error.
    
    nameNode=hdfs://localhost.localdomain:8020
    jobTracker=localhost.localdomain:8021
    queueName=default
    examplesRoot=examples
    
    oozie.wf.application.path=${nameNode}/user/${user.name}/${examplesRoot}/apps/map-reduce
    outputDir=map-reduce
    
  3. After making changes in job.properties i did upload the examples folder to HDFS using following command
    
    hdfs dfs -put examples examples
    
  4. The last step in the process was to actually run the mapreduce job in oozie by executing following command
    
    oozie job -oozie http://localhost:11000/oozie -config examples/apps/map-reduce/job.properties -run
    
  5. Once the job was started i could see the progress using Oozie web console like this

Where are MapReduce logs when your using yarn framework

For last couple of months i have been using Yarn framework for running my mapreduce jobs. Normally using Yarn is transparent so i did not have to do any thing different but just change my mapred-site.xml file to set value of mapreduce.framework.name to yarn like this. But YARN affects how the logs and job history gets stored. For example if your using traditional map reduce framework you can go to http://localhost:50030 to look at the job and task history and also access the logs generated by mapreduce framework. In case of Yarn you will have to go to http://localhost:8088/cluster and it will take you to Resource Manager Home page like this, there you should see list of applications and then click on the name of the application and to get more details
When you try to look at the logs for application, it takes you the nodemanager home page like this
Since i am working on single node cluster i like to go to the hadoop log directory and there under userlogs directory i can see log folders for each application. The application folder is subdivided into container folder one for mapper task one for reducer task and one for driver task and each of the container folders has one file for stdout, stderr and syslog that contains more output. If you have any System.out.println() in your mapper or reducer class you should find the appropriate container folder and stdout file in that container should have output that you generated using System.out.println()

Using counters in MapReduce program

While developing mapreduce jobs you might want to keep counters for some conditions that you find. For example in Map Reduce job that uses GeoIP to counts number of requests from particular city, i want to check how many requests came from US, India vs. other countries. Also there are cases when you try to find out location of a IP address and if the IP is not in the GeoIP database it throws error. I wanted to see how many ips are not found in DB. In order to do that i changed the ApacheLogMapper.java like this I had to make following changes in the ApacheLogMapper.java
  1. Declare Enum: On line 24 i had to create a Enum for GEO, i am declaring 4 different counters in it one for ERRORS, 1 for ips in US, 1 for ips in India and 1 for everything else
  2. Count countries: On line 67 i am checking if the location of the ip is in USA, if yes i am increasing counter for USA by 1 using context.getCounter(GEO.US).increment(1)
  3. Counting errors: Whenever GeoIP API is not able to find the ip in GeoIP database it throws GeoIp2Exception, i am catching that exception and using it as opportunity to increment ERROR count by 1 using context.getCounter(GEO.ERROR).increment(1);
After executing the MapReduce program i could see the different counters calculated by Hadoop on the console output like this

Using DistributedCache with MapReduce job

In the Using third part jars and files in your MapReduce application(Distributed cache) entry i blogged about how to use Distributed Cache in Hadoop using command line option. But you can also have option of using DistributedCache API. You will have to use following steps to use DistributedCache programmatically In order to use it, first change your MapReduce Driver class to add job.addCacheFile()
  1. In order to use a file with DistributedCache API, it has to available on either hdfs:// or http:// URL, that is accessible to all the cluster members. So first step was to upload the file that you are interested in into HDFS, in my case i used following command to copy the GoeLite2-City.mmdb file to hdfs.
    
    hdfs dfs -copyFromLocal GeoLite2-City.mmdb /GeoLite2-City.mmdb
    
  2. Next step is to change the Driver class and add job.addCacheFile(new URI("hdfs://localhost:9000/GeoLite2-City.mmdb#GeoLite2-City.mmdb")); call, this call takes the hdfs url of the file that you just uploaded to HDFS and passes it to DistributedCache class. The #GeoLite2-City.mmdb is used here to tell Hadoop that it should create a symbolic link to this file
  3. Now in your Mapper class you can read the GeoLite2-City.mmdb using normal File API
When you use the distributed cache Hadoop first copies the file specified in the DistributedCache API on the machine executing task. You can view it by looking at the mapreduce temp directory like this.

Killing bad behaving mapreduce job

I was working on building this MapReduce program, and after submitting it i realized that i made a mistake and it was taking really long time to complete the job. So i decided to kill it. These are the steps that i followed First i did execute the mapred job -list command to get list of jobs that were in progress. The output of the list command gives you the job id Then you can use mapred job -kill job_1405432500430_0001 command to kill the job that your interested in
You can confirm the mapreduce job was actually killed by using the web console like this

Configure LogStash to read Apache HTTP Server logs and add GeoIP information in it.

LogStash is a tool that you can use for managing your logs. Basic idea is you configure logstash to read the log file, it enhances log records and then it writes those records to ElasticSearch. Then you can use Kibana to view your log files. I wanted to figure out where my web traffic is coming from, so i configured the LogStash server to read the HTTP server log, then used its geoip capability to find out the location of the request based on the ip of the request and store it in elastic search. This is how my logstash configuration looks like, before starting this i did download the GeoCity database from maxmind and configured LogStash to use it. Next i did start elasticsearch server on local machine to collect logs and used following command to start logstash server

java -jar logstash-1.3.2-flatjar.jar agent -f httpaccess.conf
Once logstash server was started i could see how it was parsing logs and posting them in elasticsearch. For example for the following log statement

129.143.71.36 - - [31/Aug/2011:08:35:17 -0700] "GET /favicon.ico HTTP/1.1" 200 3935 "-" "Mozilla/5.0 (X11; U; Linux i686; en-US) AppleWebKit/534.10 (KHTML, like Gecko) Chrome/8.0.552.224 Safari/534.10"
I could see logstash converting it into following JSON before posting it into elasticsearch

Using third part jars and files in your MapReduce application(Distributed cache)

If you want to use a third party jar in your MapReduce program you have two options, one is to create a single jar with all dependencies and other is to use the hadoop distributed cache option. I wanted to play around with both these options, so i built this simple application in which i read the Standard Apache HTTP Server log and parse it to read the ip address request is coming from. Then i use the ip address and invoke the Geo IP lookup to find out what city, country that request came from. Once i have a city i am using it to count number of requests that came from particular city. You can download the source code for this project from here. This is how the mapper class for my map-reduce application looks like, In the setup() method of my class i am creating object of com.maxmind.geoip2.DatabaseReader. I am passing the GeoLite2-City.mmdb the geo ip database file to my reader. In order for this program to work my MapReduce program needs access to com.maxmind.geoip2.geoip2 along with its dependencies, it also needs access to GeoLite2-City.mmdb.

Creating single jar with all dependencies

In order for this method to work add following plugin in your maven build file like this, in this case the jar will get created with main class as com.spnotes.hadoop.logs.ApacheLogMRDriver Now you can build a single jar file by executing mvn clean compile assembly:single command. Copy the GeoLite2-City.mmdb to the hadoop cluster(You can package it inside jar but might not be a good idea). Now you can execute following command to execute this job on cluster

hadoop jar ApacheLogsMR-jar-with-dependencies.jar -files GeoLite2-City.mmdb  /apache.log /output/logs/apachelogs

Using Hadoop distributed cache

Second option you have is to create a jar only for your map reduce program and pass dependencies to hadoop using distributed cache. In this option, you should make sure that your Driver class extends Configured like this Copy the dependency jars as well as GeoLite2-City.mmdb file on to the same machine that has hadoop cluster. Then execute following command to pass both jar and file to the hadoop.

hadoop jar ApacheLogsMR.jar -libjars geoip2-0.7.1.jar,maxmind-db-0.3.2.jar,jackson-databind-2.2.3.jar,jackson-core-2.2.3.jar,jackson-annotations-2.2.3.jar  -files GeoLite2-City.mmdb  /apache.log /output/logs/apachelogs

How reading and writing of files in HDFS works

Read Path
  1. The client program starts with Hadoop library jar and copy of cluster configuration data, that specifies the location of the name node.
  2. The client begins by contact the node node indicating the file it wants to read.
  3. The name node will validate clients identity, either by simply trusting client or using authentication protocol such as Kerberos.
  4. The client identity is verified against the owner and permission of the file.
  5. Namenode responds to the client with the first block ID and the list of data nodes on which a copy of the block can be found, sorted by their distance to the client, Distance to the client is measured according to Hadoop's rack topology
  6. With the block IDS and datanode hostnames, the client can now contact the most appropriate datanode directly and read the block data it needs. This process repeats until all the blocks in the file have been read or the client closes the file stream.
Write Path
  1. Client makes a request to open a file for wringing using the Hadoop FileSystem APIs.
  2. A request is sent to the name node to create the file metadata if the user has the necessary permission to do so. However, it initially has no associated blocks.
  3. Namenode responds to the client indicating that the request was successful and it should start writing data.
  4. The client library sends request to name node asking set of datanodes to which data should be written, it gets a list from name node
  5. The client makes connection to first data node, which in turn makes connection to second and second datanode makes connection to third.
  6. The client starts writing data to first data node, the first data node writes data to disk as well as to the input stream pointing to second data node. The second data node writes the data the disk and writes to the connection pointing to third data node and so on.
  7. Once client is finished writing it indicates closing of the stream that flushes data and writes to disk.

HDFS Java Client

Hadoop provides you with the Java API that you can use to perform some of the commonly used file operations such as read, create a new file or append at the end of the existing file or search for files. I wanted to try these common operartions out so i built this HelloHDFS project, that you can download from here This is the main class that takes command line argument for operation name and file path and performs the operation. After downloading the source code for the project you can use following maven command to build a single jar that also contains all the dependencies in it.

mvn clean compile assembly:single
Once your jar is ready you can use it like this for reading file from hdfs with relative path

java -jar target/HelloHDFS-jar-with-dependencies.jar read aesop.txt
or fully qualified path

java -jar target/HelloHDFS-jar-with-dependencies.jar read hdfs://localhost/user/user/aesop.txt

HDFS Daemons

An HDFS cluster has two types of nodes operating in master-worker pattern
  • NameNode: Manages the filesystem's directory structure and meta data for all the files. This information is persisted on local disk in the form of 2 files
    1. fsimage This is master copy of the metadata for the file system.
    2. edits: This file stores changes(delta/modifications) made to the meta information. In new version of hadoop (I am using 2.4) there would be multiple edit files(per transaction) that get created which store the changes made to meta.
    In addition to this the data node also has mapping of blocks to the datanode where the block is persisted but, that information does not get persisted to the disk. Instead data ndoes send list of blocks that they have to Namenode on startup and Namenode keeps is in memory The name node filesystem metadata is served entirely from RAM for fast lookup and retrieval and thus places a cap on how much metadata the name node can handle.
  • Secondary namenode: The job of secondary namenode is to merge the copy of fsimage and edits file for primary Namenode. So the basic issue is its very CPU consuming to take the fsimage and apply all the edits to it, so that work is delegated to secondary namenode. The secondary namenode downloads the edits file from primary and applies/merges it with fsimage and then sends it back to primary.
  • DataNde: This is workhorse daemon that is responsible for storing and retrieving blocks of data. This daemon is also responsible for maintaining block report(List of blocks that are stored on that datanode). It sends a heart beat to Namenode at regular interval(1 hr) and as part of the heart beat it also sends block report
There are two ways to start the daemons necessary for HDFS one is you can start they individually using start <daemontype> ex. start namenode or you can start all of them using start-dfs

How to control replication for a file stored in HDFS

In Hadoop you can control how many replicas of your file gets created. I wanted to try that out, so i tried different options. First option is to set up the replication factor in hdfs-site.xml, the settings in hdfs-site.xml apply to all the files (globals ettings)
  • dfs.replication: Default block replication. The actual number of replications can be specified when the file is created. The default is used if replication is not specified in create time.
  • dfs.replication.max: Maximal block replication.
  • dfs.namenode.replication.min: Minimal block replication.
After configuring global replication settings, i did restart hdfs daemons and when i executed hdfs fsck on one of the files to see the effect of setting replication and this is the output i got
You could also use the command line tools to set particular value of replication factor by executing hdfs setrep command like this.

hdfs dfs -setrep 5 /user/user/aesop.txt
Then i could verify the effect of replication settings like this
Then the last option is to set replication factor programmatically

Setting replication factor for a file programmatically

I wanted to figure out how to set/change replication factor for a file stored in HDFS, so i built this sample program, which has 3 methods
  • read(): This method takes a file path as input and print its content to system console
  • write(): This method takes a file path as input and open it for writing. It takes user input and writes that to the HDFS file
  • setReplication(): This method takes file path and replication factor as input and sets the replication factor for the given file path
Once you build the jar you can execute it by using command like this

java -jar target/HelloHDFS-jar-with-dependencies.jar read <hdfsfile_path>
Ex. In my case i have aesop.txt at hdfs://localhost/user/user/aesop.txt so i can print it by using following command

java -jar target/HelloHDFS-jar-with-dependencies.jar read aesop.txt