How to read and write JSON files with Spark

I wanted to build a Spark program that would read text file where every line in the file was a Complex JSON object like this. I wanted to parse the file and filter out few records and write output back as file. You can download the project from GitHub

{
   "first":"Sachin",
   "last":"Tendulkar",
   "address":{
      "line1":"1 main street",
      "city":"mumbai",
      "state":"ms",
      "zip":"12345"
   }
}
There are lot of options when it comes to parsing JSON, but i decide to use Jackscon parser as i am comfortable with it. So add the Jackson dependencies in the pom.xml Next i did create Spark Driver program like this, it takes 2 parameters. One is inputFile which is path to input file that contains JSON and other is outputFile which is path to output Folder where the output should be saved. In my code i did define 2 classes Person and Address which are simple beans with json related annotations. Most of the Spark code is similar to what it would look like for dealing with say comma separate file or simple text file. Only difference is line 52 to 60, where i am calling the ObjectMapper.readValue() method to convert the line of input into object of Person class. Now if the JSON is invalid it throws exception like you can see at the bottom of the post. So to deal with it i am catching exception and every time there is exception i am increment errorRecords by one

2015-12-31 07:52:44 ERROR TaskSetManager:75 - Task 0 in stage 0.0 failed 1 times; aborting job
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): com.fasterxml.jackson.databind.JsonMappingException: Unexpected end-of-input: was expecting closing quote for a string value
 at [Source: {"first":"VVS","last":"Laxman" , "address": {"line1":"1 main street", "city":"Hyderabad","state":"AN","zip":"121212}}; line: 1, column: 235] (through reference chain: com.spnotes.spark.Person["address"]->com.spnotes.spark.Address["zip"])
 at com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:210)
 at com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:177)
 at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.wrapAndThrow(BeanDeserializerBase.java:1474)
 at com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:260)
 at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:125)
 at com.fasterxml.jackson.databind.deser.SettableBeanProperty.deserialize(SettableBeanProperty.java:520)
 at com.fasterxml.jackson.databind.deser.impl.FieldProperty.deserializeAndSet(FieldProperty.java:101)
 at com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:258)
 at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:125)
 at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3736)
 at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2726)
 at com.spnotes.spark.NetcatStreamClient$$anonfun$4.apply(NetcatStreamClient.scala:74)
 at com.spnotes.spark.NetcatStreamClient$$anonfun$4.apply(NetcatStreamClient.scala:72)
 at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:396)
 at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:413)
 at scala.collection.Iterator$class.foreach(Iterator.scala:742)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
 at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:798)
 at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:798)
 at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498)
 at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
 at org.apache.spark.scheduler.Task.run(Task.scala:64)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)
Caused by: com.fasterxml.jackson.core.JsonParseException: Unexpected end-of-input: was expecting closing quote for a string value
 at [Source: {"first":"VVS","last":"Laxman" , "address": {"line1":"1 main street", "city":"Hyderabad","state":"AN","zip":"121212}}; line: 1, column: 235]
 at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1581)
 at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:533)
 at com.fasterxml.jackson.core.base.ParserMinimalBase._reportInvalidEOF(ParserMinimalBase.java:470)
 at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._finishString2(ReaderBasedJsonParser.java:1760)
 at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._finishString(ReaderBasedJsonParser.java:1747)
 at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.getText(ReaderBasedJsonParser.java:233)
 at com.fasterxml.jackson.databind.deser.std.StringDeserializer.deserialize(StringDeserializer.java:32)
 at com.fasterxml.jackson.databind.deser.std.StringDeserializer.deserialize(StringDeserializer.java:11)
 at com.fasterxml.jackson.databind.deser.SettableBeanProperty.deserialize(SettableBeanProperty.java:520)
 at com.fasterxml.jackson.databind.deser.impl.FieldProperty.deserializeAndSet(FieldProperty.java:101)
 at com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:258)
 ... 23 more

Driver stacktrace:
 at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
 at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)
 at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
 at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
 at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)
 at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
 at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
 at scala.Option.foreach(Option.scala:257)
 at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
 at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
 at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
 at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

Process finished with exit code 1

Hello Spark Streaming

In the WordCount program built using Apache Spark in Java , i built simple Spark program that takes name of the file as input, reads the file and performs word count on the file. Now Spark also has concept of Spark Streaming which allows you to read file as stream of real time events instead of one time load of input file. But the API for transforming the data, in both cases Spark converts the input in RDD. In case of Spark Streaming it would convert the input events into Micro RDD which is nothing but collecting all the incoming data for certain duration (microBatchTime) and then exposes it as RDD. I built this simple NetcatStreamClient Streaming application that listens for incoming data on netcat port, once it has data it performs wordcount on it and prints that to console. You can download the full source code from GitHub

package com.spnotes.spark

import com.typesafe.scalalogging.Logger
import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.slf4j.LoggerFactory

object NetcatStreamClient{
  val logger = Logger(LoggerFactory.getLogger("NetcatStreamClient"))

  def main(argv:Array[String]): Unit ={
    logger.debug("Entering NetcatStreamClient.main")
    if(argv.length != 3){
      println("Please provide 3 parameters   ")
      System.exit(1)
    }
    val hostName =argv(0)
    val port = argv(1).toInt
    val microBatchTime = argv(2).toInt

    logger.debug(s"Listening on $hostName at $port batching records every $microBatchTime")

    //Create Spark Configuration
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
 
 
    //Create SparkStreamingContext with microBatchTime which specifies how long spark should collect data
    val sparkStreamingContext = new StreamingContext(sparkConf,Seconds(microBatchTime))
    //Start listening for data on given host and port
    val lines = sparkStreamingContext.socketTextStream(hostName,port)
// Logic for implementing word count on the input batch
    lines.flatMap(_.split(" ")).map(word => (word,1)).reduceByKey(_+_).print()
 
    logger.debug("Number of words " + lines.count())

    //Start the stream so that data starts flowing, you must define transformation logic before calling start()
    sparkStreamingContext.start()
    sparkStreamingContext.awaitTermination()
    logger.debug("Exiting NetcatStreamClient.main")
  }
}
Once your project is built using mvn clean compile assembly:single, first thing you should do is executing following command to start netcat on localhost at port 9999 nc -l 9999
Next execute following code to start Spark Driver that takes 3 parameters host and port where netcat is listening and last parameter is how the batch duration should be bin/spark-submit ~/HelloSparkStreaming-1.0-SNAPSHOT-jar-with-dependencies.jar localhost 9999 5

How to create Spark Scala fat jar with dependencies using Maven

If your developing a Spark application in Scala or your developing a Standalone Scala application and you want to create a fat jar that includes dependencies you can use following Maven Script as template for your build file. Couple of things are different here you must include scala-library as one of the library and then also include maven-scala-plugin that takes care of compiling the scala code. The value of sourceDirectory specifies the directory that contains your scala code. Then for packaging the project as a fat jar you can use maven-assembly-plugin as you would for any other java project built using Maven.

Configuring Flume to use Twitter as Source

I wanted to figure out how to Configure Twitter as source for Flume so i tried these steps
  1. First go to Twitter Application Management page and configure application. This should give you consumerKey, consumerSecret, accessToken and accessTokenSecret
  2. Next create twitterflume.properties, that looks like this. You should create source of org.apache.flume.source.twitter.TwitterSource type and use the 4 values you got in the last step to configure access to twitter
    
    agent1.sources = twitter1
    agent1.sinks = logger1
    agent1.channels = memory1
    
    
    agent1.sources.twitter1.type = org.apache.flume.source.twitter.TwitterSource
    agent1.sources.twitter1.consumerKey =<consumerkey>
    agent1.sources.twitter1.consumerSecret =<consumerSecret>
    agent1.sources.twitter1.accessToken =<accessToken>
    agent1.sources.twitter1.accessTokenSecret =<accessTokenSecret>
    agent1.sources.twitter1.keywords = bigdata, hadoop
    agent1.sources.twitter1.maxBatchSize = 10
    agent1.sources.twitter1.maxBatchDurationMillis = 200
    
    
    # Describe the sink
    agent1.sinks.logger1.type = logger
    
    # Use a channel which buffers events in memory
    agent1.channels.memory1.type = memory
    agent1.channels.memory1.capacity = 1000
    agent1.channels.memory1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    agent1.sources.twitter1.channels = memory1
    agent1.sinks.logger1.channel = memory1
    
  3. Now last step is to run the flume agent and you should see twitter messages being dumped to console bin/flume-ng agent --conf conf --conf-file conf/twitterflume.properties --name agent1 -Dflume.root.logger=DEBUG,console
Note: When i tried this in the Hadoop Sandbox i started getting following authentication error, it seems the problem is that if your VM time is in the past then this causes this issue. Ex. when i did execute the date command on my sandbox i got date which was 3 days in the past. So i did restart the VM and after restart when i tried date command it gave me accurate time and the following error went away

[Twitter Stream consumer-1[Establishing connection]] ERROR   
org.apache.flume.source.twitter.TwitterSource (TwitterSource.java:331) -   
Exception while streaming tweets
stream.twitter.com
Relevant discussions can be found on the Internet at:
    http://www.google.co.jp/search?q=d0031b0b or
    http://www.google.co.jp/search?q=1db75522
TwitterException{exceptionCode=[d0031b0b-1db75522 db667dea-99334ae4],    
statusCode=-1, message=null, code=-1, retryAfter=-1, rateLimitStatus=null,   
version=3.0.3}
    at   
twitter4j.internal.http.HttpClientImpl.request(HttpClientImpl.java:192)
    at   
twitter4j.internal.http.HttpClientWrapper.request(HttpClientWrapper.java:61)
    at   
twitter4j.internal.http.HttpClientWrapper.get(HttpClientWrapper.java:89)
    at  
twitter4j.TwitterStreamImpl.getSampleStream(TwitterStreamImpl.java:176)
    at twitter4j.TwitterStreamImpl$4.getStream(TwitterStreamImpl.java:164)
    at  
   twitter4j.TwitterStreamImpl$TwitterStreamConsumer.run
(TwitterStreamImpl.java:462)
Caused by: java.net.UnknownHostException: stream.twitter.com
    at   
java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:178)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    at java.net.Socket.connect(Socket.java:579)
    at sun.security.ssl.SSLSocketImpl.connect(SSLSocketImpl.java:637)
    at sun.net.NetworkClient.doConnect(NetworkClient.java:175)
    at sun.net.www.http.HttpClient.openServer(HttpClient.java:432)
    at sun.net.www.http.HttpClient.openServer(HttpClient.java:527)
    at sun.net.www.protocol.https.HttpsClient.(HttpsClient.java:264)
    at sun.net.www.protocol.https.HttpsClient.New(HttpsClient.java:367)
    at  
   sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.
getNewHttpClient
(AbstractDelegateHttpsURLConnection.java:191)
    at  sun.net.www.protocol.http.HttpURLConnection.plainConnect
(HttpURLConnection.java:933)
    at  
sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.connect
(AbstractDelegateHttpsURLConnection.java:177)
    at sun.net.www.protocol.http.HttpURLConnection.getInputStream
(HttpURLConnection.java:1301)
    at java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:468)
    at sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode
(HttpsURLConnectionImpl.java:338)
    at twitter4j.internal.http.HttpResponseImpl.    
(HttpResponseImpl.java:34)
    at  
twitter4j.internal.http.HttpClientImpl.request(HttpClientImpl.java:156)

Simple Java Program for publishing Syslog Events

In the Using Syslog as source in Flume i blogged about how to configure flume to listen for Syslog event on particular UDP port. I wanted to test that configuration so i built this simple java program that can publish Syslog event on given host and port no. You can download the source code for this project from GitHub This program takes 3 arguments first is hostname for the syslog server, second is the port on which the server is listening and third is the actual message that you want to send.

Using Syslog as source in Flume

I wanted to figure out how to use Flume for receiving Syslog message. So i tried 2 different configurations one is using Syslog server on TCP port and other on UDP port. This is the flume configuration for listening on UDP port Copy the flumesyslogudp.properties file in the conf directory of your flume server and use following command to start flume server

bin/flume-ng agent --conf conf --conf-file conf/flumesyslogudp.properties --name agent1 
-Dflume.root.logger=DEBUG,console
Or you can configure flume to listen on TCP port. Only difference is the source type is syslogtcp instead of syslogudp

bin/flume-ng agent --conf conf --conf-file conf/flumesyslogtcp.properties --name agent1 
-Dflume.root.logger=DEBUG,console

Read configuration from .properties file in

This is small utility function in Scala, that takes fully qualified path of the properties file, and converts it into Map and returns. I use it for taking path of the properties file in my standalone Scala program and load it into Map

def getConfig(filePath: String)= {
    Source.fromFile(filePath).getLines().filter(line => line.contains("=")).map{ line =>
      println(line)
      val tokens = line.split("=")
      ( tokens(0) -> tokens(1))
    }.toMap
  }
Once you have this method you can call it like this getConfig(filePath)

Flume Hello World tutotiral

I am using flume for some time now and really like it. This is simple HelloWorld tutorial that i thought would be helpful if you want to get started with Flume. This tutorial will walk you through steps for setting up Flume that listens to messages on port 44444, once it gets message it just prints it out on console, Follow these steps
  1. First create sampleflume.properties file on your machine like this
    
    # example.conf: A single-node Flume configuration
    
    # Name the components on this agent
    agent1.sources = netcat1
    agent1.sinks = logger1
    agent1.channels = memory1
    
    # Describe/configure the source
    agent1.sources.netcat1.type = netcat
    agent1.sources.netcat1.bind = localhost
    agent1.sources.netcat1.port = 44444
    
    # Describe the sink
    agent1.sinks.logger1.type = logger
    
    
    # Use a channel which buffers events in memory
    agent1.channels.memory1.type = memory
    agent1.channels.memory1.capacity = 1000
    agent1.channels.memory1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    agent1.sources.netcat1.channels = memory1
    agent1.sinks.logger1.channel = memory1
    
    Your flume configuration file must have at least 3 elements a source, channel and sink
    • netcat1: netcat1 source defines how flume is listening to messages. In this case type of netcat means it will listen on port that you can connect to using either netcat or telnet
    • memory: memory channel defines how flume stores messages that it has received before they are consumed by sink. In this case i am saying keep the messages in memory
    • logger1: Logger sink is for testing, it just prints the messages on console
  2. Once your configuration file is ready you can start a flume agent by executing following command
    
    flume-ng agent --conf conf --conf-file sampleflume.properties  --name agent1 -Dflume.root.logger=DEBUG,console
    
    YOu will see flume printing messages on the console while it is starting like this
  3. Once server is started you can connect to it using nc or telnet and send messages to it like this. Whatever messages you send will be printed to console
  4. Once you send messages using nc command look at the server console and you should see the messages that you sent

Configuring Flume to write avro events into HDFS

Recently i wanted to figure out how to configure Flume so that it can listen for Avro Events and whenever it gets event it should dump it in the HDFS. In order to do that i built this simple Flume configuration

# example.conf: A single-node Flume configuration

# Name the components on this agent
agent1.sources = avro
agent1.sinks = logger1
agent1.channels = memory1

# Describe/configure the source
agent1.sources.avro.type = avro
agent1.sources.avro.bind = localhost
agent1.sources.avro.port = 41414
agent1.sources.avro.selector.type = replicating
agent1.sources.avro.channels = memory1

# Describe the sink
agent1.sinks.hdfs1.type = hdfs
agent1.sinks.hdfs1.hdfs.path=/tmp/flume/events
agent1.sinks.hdfs1.hdfs.rollInterval=60
#The number of events to be written into a file before it is rolled.
agent1.sinks.hdfs1.hdfs.rollSize=0
agent1.sinks.hdfs1.hdfs.batchSize=100
agent1.sinks.hdfs1.hdfs.serializer=org.apache.flume.sink.hdfs.AvroEventSerializer$Builder
agent1.sinks.hdfs1.hdfs.fileType = DataStream
agent1.sinks = hdfs1
agent1.sinks.hdfs1.channel = memory1

# Use a channel which buffers events in memory
agent1.channels.memory1.type = memory
agent1.channels.memory1.capacity = 1000
agent1.channels.memory1.transactionCapacity = 100
In this i have Avro source listening on local machine at port 41414, once it gets event it writes that in HDFS in /tmp/flume/events directory Once this file is saved in local machine as hellohdfsavro.conf i can start the flume agent using following command

flume-ng agent --conf conf --conf-file hellohdfsavro.conf  --name agent1 -Dflume.root.logger=DEBUG,console

Configure Flume to use IBM MQ as JMS Source

Recently i had a requirement in which i wanted to figure out how to read XML documents stored as message in IBM MQ and post them into Hadoop. I decided to use Apache Flume + Flume JMS Source + Flume HDFS Sink for this. I had to use following steps for this setup. Please note that i am not WebSphere MQ expert so there might be a better/easier way to achieve this.
  1. First i had to install WebSphere MQ Client on my windows machine
  2. Next i did create a simple jms.config like this in c:\temp folder of my windows box
    
    INITIAL_CONTEXT_FACTORY=com.sun.jndi.fscontext.RefFSContextFactory
    PROVIDER_URL=file:/C:/temp/jmsbinding
    
  3. Next step is to run JMSAdmin.bat c:\temp\jms.config, it opens up a console like this, type following command in it and change it to use the right configuration that you need
    
    DEF CF(myConnectionFactory) QMGR(myQueueManager) HOSTNAME(myHostName) PORT(1426) CHANNEL(myChannelName) TRANSPORT(CLIENT)
    
    Once you execute this command it will generate .bindings file in C:/temp/jmsbinding (Folder that is configured as value of PROVIDER_URL)
  4. Next step for me was to copy the C:/temp/jmsbinding/.bindings folder to /etc/flume/conf folder in my linux box which has Flume running on it.
  5. In addition to bindings file i also need the MQ client jar files. I started by copying jms.jar from C:\Program Files (x86)\IBM\WebSphere MQ\java\lib to /usr/hdp/current/flume-server/lib/ folder in my Hadoop installation, but i kept getting ClassNotFoundException and to deal with that i copied more and more jars from my MQ Client into Flume
    
    jms.jar
    fscontext.jar
    jndi.jar
    providerutil.jar
    com.ibm.mq.jar
    com.ibm.mqjms.jar
    com.ibm.mq.pcf.jar
    connector.jar
    dhbcore.jar
    com.ibm.mq.jmqi.jar
    com.ibm.mq.headers.jar
    
  6. Once the Flume MQ setup was in place, last step was to create Flume Configuration that points to your bindings file and also points to your MQ server like this
    
    # Flume agent config
    #st the sources, channels, and sinks for the agent
    ggflume.sources = jms
    ggflume.channels = memory
    ggflume.sinks = hadoop
    
    ggflume.sources.jms.channels=memory
    ggflume.sinks.hadoop.channel=memory
    
    ggflume.sources.jms.type = jms
    ggflume.sources.jms.providerURL = file:///etc/flume/conf
    ggflume.sources.jms.initialContextFactory = com.sun.jndi.fscontext.RefFSContextFactory
    ggflume.sources.jms.destinationType=QUEUE
    ggflume.sources.jms.destinationName=<channelName>
    ggflume.sources.jms.connectionFactory=myConnectionFactory
    ggflume.sources.jms.batchSize=1
    
    ggflume.channels.memory.type = memory
    ggflume.channels.memory.capacity = 1000
    ggflume.channels.memory.transactionCapacity = 100
    
    ggflume.sinks.hadoop.type=hdfs
    ggflume.sinks.hadoop.hdfs.path=/data/mq/xml
    ggflume.sinks.hadoop.hdfs.filePrefix=sample
    
    
  7. Now start flume server by executing following flume command flume-ng agent --conf conf --conf-file mqflume.conf --name ggflume -Dflume.root.logger=DEBUG,console
Now you should see the existing messages from MQ being dumped into HDFS

Moving data from Avro to ORC files

In the Importing data from Sqoop into Hive External Table with Avro encoding i blogged about how to sqoop data from RDBMS into Hive. But i wanted to take it to next step by moving the data downloaded to ORC table. I followed these steps to achieve that
  1. First thing is to find out the schema of the table in Avro and you can get that by executing following statement in hive
    
    show create table CUSTOMER;
    
    You will get output that looks something like this, it contains schema of the table
    
    CREATE EXTERNAL TABLE `CUSTOMER`(
      `contactid` int COMMENT 'from deserializer',
      `firstname` string COMMENT 'from deserializer',
      `lastname` string COMMENT 'from deserializer',
      `email` string COMMENT 'from deserializer')
    ROW FORMAT SERDE
      'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
    STORED AS INPUTFORMAT
      'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
    OUTPUTFORMAT
      'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
    LOCATION
      'hdfs://sandbox.hortonworks.com:8020/tmp/customer/data'
    TBLPROPERTIES (
      'avro.schema.url'='hdfs:///tmp/customer/schema/customer.avsc',
      'transient_lastDdlTime'='1431719666')
    
  2. Copy the schema from last step and remove the part about format and table properties and replace it with part that highlighted in this code, execute this in Hive to create customer table in ORC format
    
    CREATE EXTERNAL TABLE `CUSTOMER_ORC`(
      `contactid` int  ,
      `firstname` string  ,
      `lastname` string  ,
      `email` string  )
    ROW FORMAT DELIMITED
    FIELDS TERMINATED BY '\001'
    LOCATION
      'hdfs://sandbox.hortonworks.com:8020/tmp/customer/data_orc'
    STORED AS ORC tblproperties ("orc.compress"="SNAPPY","orc.row.index.stride"="20000"); 
    
  3. Last step is to copy data from avro table to ORC table, you can achieve that by using following command
    
    insert into table CUSTOMER_ORC select * from customer;
    

Importing data from Sqoop into Hive External Table with Avro encoding

I wanted to figure out how to import content of RDBMS table into Hive with Avro encoding, during this process i wanted to use external hive tables so that i have complete control over the location of files.
Note: I have a different/easier method for doing this in Importing data from Sqoop into Hive External Table with Avro encoding updated
First i did create following table in the mysql database which is on the same machine as that of my HortonWorks Sandbox
  1. First create CUSTOMER table like this in mysql
    
    CREATE TABLE CUSTOMER (       
    contactid INTEGER NOT NULL ,       
    firstname VARCHAR(50),       
    lastname  VARCHAR(50),       
    email varchar(50) );
    
  2. After creating table add couple of records in it by executing following insert statement insert into customer values(1,'Sachin','Tendulark','sachin@gmail.com');
  3. Next step is to run sqoop query that downloads records of the table into HDFS at /tmp/customer/sample. In real world you might want to download only first 10 records or so into Hive, because you need few sample records just to create avro schema
    
    sqoop import --connect jdbc:mysql://localhost/test --table CUSTOMER --username sqoop1 
    --password sqoop -m 1 --create-hive-table 
    --hive-table CONTACT --as-avrodatafile  --target-dir /tmp/customer/sample
    
  4. Running sqoop command it will dump records in HDFS, so first download the avro file generated by sqoop
    
    hdfs dfs -get /tmp/customer/sample/part-m-00000.avro
    
  5. Use the avro-tools-*.jar, to read schema of the file generated by sqoop. by executing following command
    
    java -jar avro-tools-1.7.5.jar getschema part-m-00000.avro > customer.avsc
    
    This is how the customer.avsc file looks like in my case
    
    {
      "type" : "record",
      "name" : "CUSTOMER",
      "doc" : "Sqoop import of CUSTOMER",
      "fields" : [ {
        "name" : "contactid",
        "type" : [ "int", "null" ],
        "columnName" : "contactid",
        "sqlType" : "4"
      }, {
        "name" : "firstname",
        "type" : [ "string", "null" ],
        "columnName" : "firstname",
        "sqlType" : "12"
      }, {
        "name" : "lastname",
        "type" : [ "string", "null" ],
        "columnName" : "lastname",
        "sqlType" : "12"
      }, {
        "name" : "email",
        "type" : [ "string", "null" ],
        "columnName" : "email",
        "sqlType" : "12"
      } ],
      "tableName" : "CUSTOMER"
    }
    
  6. Next step is to upload the avro schema file that you created in the last step back to HDFS, in my case i had HDFS folder called /tmp/customer/schema and i uploaded the avro schema file in it
    
    hdfs dfs -put customer.avsc /tmp/customer/schema/
    
  7. Now go to hive and execute the following command to define External Customer Hive table with avro schema defined in last step
    
    CREATE EXTERNAL TABLE CUSTOMER
    ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
    STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
    OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
    LOCATION '/tmp/customer/data'
    TBLPROPERTIES ('avro.schema.url'='hdfs:///tmp/customer/schema/customer.avsc');
    
  8. Last step is to run sqoop again but this time with all the data in the external directory that Customer hive table is pointing to.
    
    sqoop import --connect jdbc:mysql://localhost/test --table CUSTOMER --username sqoop1 
    --password sqoop -m 1 --as-avrodatafile 
     --target-dir /tmp/customer/data --compression-codec snappy
    
Now if you run select query on the CUSTOMER table you should be able to get all the data that you see in your RDBMS

Running oozie job on Hortonworks Sandbox

In the Enabling Oozie console on Cloudera VM 4.4.0 and executing examples i blogged about how to run oozie job in Cloudera Sandbox. It seems this process is little bit easier in HortonWorks 2.2 sandbox. So first i had brand new HDP 2.2 image and i tried running oozie example on it by executing

oozie job -oozie http://localhost:11000/oozie -config examples/apps/map-reduce/job.properties -run
But when i tried running it i got following error

Error: E0501 : E0501: Could not perform authorization operation, Call From sandbox.hortonworks.com/10.0.2.15 to localhost:8020 failed on connection exception: java.net.ConnectException: Connection refused; For more details see:  http://wiki.apache.org/hadoop/ConnectionRefused
So i looked into /var/log/oozie/oozie.log and i saw following error

2015-05-01 20:34:39,195  WARN V1JobsServlet:546 - SERVER[sandbox.hortonworks.com] USER[root] GROUP[-] TOKEN[-] APP[-] JOB[-] ACTION[-] URL[POST http://sandbox.hortonworks.com:11000/oozie/v2/jobs?action=start] error[E0501], E0501: Could not perform authorization operation, Call From sandbox.hortonworks.com/10.0.2.15 to localhost:8020 failed on connection exception: java.net.ConnectException: Connection refused; For more details see:  http://wiki.apache.org/hadoop/ConnectionRefused
org.apache.oozie.servlet.XServletException: E0501: Could not perform authorization operation, Call From sandbox.hortonworks.com/10.0.2.15 to localhost:8020 failed on connection exception: java.net.ConnectException: Connection refused; For more details see:  http://wiki.apache.org/hadoop/ConnectionRefused
 at org.apache.oozie.servlet.BaseJobServlet.checkAuthorizationForApp(BaseJobServlet.java:240)
 at org.apache.oozie.servlet.BaseJobsServlet.doPost(BaseJobsServlet.java:96)
 at javax.servlet.http.HttpServlet.service(HttpServlet.java:727)
 at org.apache.oozie.servlet.JsonRestServlet.service(JsonRestServlet.java:287)
 at javax.servlet.http.HttpServlet.service(HttpServlet.java:820)
 at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:290)
 at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206)
 at org.apache.oozie.servlet.AuthFilter$2.doFilter(AuthFilter.java:143)
 at org.apache.hadoop.security.authentication.server.AuthenticationFilter.doFilter(AuthenticationFilter.java:572)
 at org.apache.hadoop.security.authentication.server.AuthenticationFilter.doFilter(AuthenticationFilter.java:542)
 at org.apache.oozie.servlet.AuthFilter.doFilter(AuthFilter.java:148)
 at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:235)
 at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206)
 at org.apache.oozie.servlet.HostnameFilter.doFilter(HostnameFilter.java:84)
 at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:235)
 at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206)
 at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:233)
 at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:191)
 at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:127)
 at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:103)
 at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:109)
 at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:293)
 at org.apache.coyote.http11.Http11Processor.process(Http11Processor.java:861)
 at org.apache.coyote.http11.Http11Protocol$Http11ConnectionHandler.process(Http11Protocol.java:606)
 at org.apache.tomcat.util.net.JIoEndpoint$Worker.run(JIoEndpoint.java:489)
 at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.oozie.service.AuthorizationException: E0501: Could not perform authorization operation, Call From sandbox.hortonworks.com/10.0.2.15 to localhost:8020 failed on connection exception: java.net.ConnectException: Connection refused; For more details see:  http://wiki.apache.org/hadoop/ConnectionRefused
 at org.apache.oozie.service.AuthorizationService.authorizeForApp(AuthorizationService.java:399)
 at org.apache.oozie.servlet.BaseJobServlet.checkAuthorizationForApp(BaseJobServlet.java:229)
 ... 25 more
Caused by: java.net.ConnectException: Call From sandbox.hortonworks.com/10.0.2.15 to localhost:8020 failed on connection exception: java.net.ConnectException: Connection refused; For more details see:  http://wiki.apache.org/hadoop/ConnectionRefused
 at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
 at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
 at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
 at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
 at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:791)
 at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:731)
 at org.apache.hadoop.ipc.Client.call(Client.java:1472)
 at org.apache.hadoop.ipc.Client.call(Client.java:1399)
 at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232)
 at com.sun.proxy.$Proxy29.getFileInfo(Unknown Source)
 at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:752)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
 at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
 at com.sun.proxy.$Proxy30.getFileInfo(Unknown Source)
 at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1988)
 at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1118)
 at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1114)
 at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
 at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1114)
 at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1400)
 at org.apache.oozie.service.AuthorizationService.authorizeForApp(AuthorizationService.java:371)
 ... 26 more
Caused by: java.net.ConnectException: Connection refused
 at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
 at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
 at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206)
 at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:530)
 at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:494)
 at org.apache.hadoop.ipc.Client$Connection.setupConnection(Client.java:607)
 at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:705)
 at org.apache.hadoop.ipc.Client$Connection.access$2800(Client.java:368)
 at org.apache.hadoop.ipc.Client.getConnection(Client.java:1521)
 at org.apache.hadoop.ipc.Client.call(Client.java:1438)
 ... 44 more
In order to solve these issues i had to make changes in examples/apps/map-reduce/job.properties, to replace localhost with sandbox.hortonworks.com


nameNode=hdfs://sandbox.hortonworks.com:8020
jobTracker=sandbox.hortonworks.com:8032

queueName=default
examplesRoot=examples

oozie.wf.application.path=${nameNode}/user/${user.name}/${examplesRoot}/apps/map-reduce
outputDir=map-reduce

Exporting data from Hive table to RDBMS

In the Importing data from RDBMS into Hadoop using sqoop i blogged about how to import data from RDBMS to Hive, but now i wanted to figure out how to export data from Hive back to RDBMS, Sqoop has export feature that allows you to export data from Hadoop directory(CSV files in a directory) to RDBMS, I wanted to try exporting data from sqoop so first i created a simple contact_hive table and populated some data in it, then i used sqoop to export the content of contact_hive table into contact table in MySQL, i followed these steps, if you already have a hive table populated then you can skip first 5 steps and go to step 6.
  1. Create contacthive.csv file which has simple data with 4 columns separated by comma
    
    1,MahendraSingh,Dhoni,mahendra@bcci.com
    2,Virat,Kohali,virat@bcci.com
    5,Sachin,Tendulkar,sachin@bcci.com
    
  2. Upload the contacthive.csv that you created in last step in HDFS at /tmp folder using following command
    
    hdfs dfs -put contacthive.csv /tmp
    
  3. Define a contact_hive table that will have 4 columns, contactId, firstName, lastName and email, execute this command in hive console
    
    CREATE TABLE contact_hive(contactId Int, firstName String, lastName String, email String) row format delimited fields terminated by "," stored as textfile;
    
  4. In this step populate the contact_hive table that you created in the last step with the data from contacthive.csv file created in step 1. Execute this command in Hive console to populate contact_hive table
    
    LOAD DATA INPATH  "/tmp/contacthive.csv" OVERWRITE INTO TABLE contact_hive;
    
  5. Since i am using Hive managed table, it will move the contacthive.csv file to Hive managed directory in case of Hortonworks that directory is /apps/hive/warehouse, You can verify that by executing following command on HDFS
    
    hdfs dfs -ls /apps/hive/warehouse/contact_hive
    
  6. Before you export data into RDBMS, you will have to create the table in mysql, use following command to create the CONTACT table in mysql.
    
    
    CREATE TABLE CUSTOMER (
          contactid INTEGER NOT NULL ,
          firstname VARCHAR(50),
          lastname  VARCHAR(50),
          email varchar(50)
    );
    
  7. Now last step is to execute sqoop export command that exports data from hive/hdfs directory to database
    
    sqoop export --connect jdbc:mysql://localhost/test --table CONTACT --export-dir /apps/hive/warehouse/contact_hive
    

ERROR org.apache.pig.tools.grunt.Grunt - ERROR 1070: Could not resolve

I was trying out Pig UDF samples from Hadoop definitive guide. Every time i tried executing com.hadoopbook.pig.IsGoodQuality UDF like this i got ERROR org.apache.pig.tools.grunt.Grunt - ERROR 1070: Could not resolve com.hadoop.pig.IsGoodQuality using imports: error

grunt< filtered_records = filter records by temp != 9999 and com.hadoop.pig.IsGoodQuality(quality);

2014-05-17 15:39:10,445 [main] ERROR org.apache.pig.tools.grunt.Grunt - ERROR 1070: Could not resolve com.hadoop.pig.IsGoodQuality using imports: [, java.lang., org.apache.pig.builtin., org.apache.pig.impl.builtin.]
Details at logfile: /usr/lib/hue/pig_1400366300986.log
The way to fix that problem is by using -Dpig.additional.jars=pig-examples.jar while starting pig

pig -Dpig.additional.jars=pig-examples.jar