How to read and write JSON files with Spark

I wanted to build a Spark program that would read text file where every line in the file was a Complex JSON object like this. I wanted to parse the file and filter out few records and write output back as file. You can download the project from GitHub

{
   "first":"Sachin",
   "last":"Tendulkar",
   "address":{
      "line1":"1 main street",
      "city":"mumbai",
      "state":"ms",
      "zip":"12345"
   }
}
There are lot of options when it comes to parsing JSON, but i decide to use Jackscon parser as i am comfortable with it. So add the Jackson dependencies in the pom.xml Next i did create Spark Driver program like this, it takes 2 parameters. One is inputFile which is path to input file that contains JSON and other is outputFile which is path to output Folder where the output should be saved. In my code i did define 2 classes Person and Address which are simple beans with json related annotations. Most of the Spark code is similar to what it would look like for dealing with say comma separate file or simple text file. Only difference is line 52 to 60, where i am calling the ObjectMapper.readValue() method to convert the line of input into object of Person class. Now if the JSON is invalid it throws exception like you can see at the bottom of the post. So to deal with it i am catching exception and every time there is exception i am increment errorRecords by one

2015-12-31 07:52:44 ERROR TaskSetManager:75 - Task 0 in stage 0.0 failed 1 times; aborting job
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): com.fasterxml.jackson.databind.JsonMappingException: Unexpected end-of-input: was expecting closing quote for a string value
 at [Source: {"first":"VVS","last":"Laxman" , "address": {"line1":"1 main street", "city":"Hyderabad","state":"AN","zip":"121212}}; line: 1, column: 235] (through reference chain: com.spnotes.spark.Person["address"]->com.spnotes.spark.Address["zip"])
 at com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:210)
 at com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:177)
 at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.wrapAndThrow(BeanDeserializerBase.java:1474)
 at com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:260)
 at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:125)
 at com.fasterxml.jackson.databind.deser.SettableBeanProperty.deserialize(SettableBeanProperty.java:520)
 at com.fasterxml.jackson.databind.deser.impl.FieldProperty.deserializeAndSet(FieldProperty.java:101)
 at com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:258)
 at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:125)
 at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3736)
 at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2726)
 at com.spnotes.spark.NetcatStreamClient$$anonfun$4.apply(NetcatStreamClient.scala:74)
 at com.spnotes.spark.NetcatStreamClient$$anonfun$4.apply(NetcatStreamClient.scala:72)
 at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:396)
 at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:413)
 at scala.collection.Iterator$class.foreach(Iterator.scala:742)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
 at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:798)
 at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:798)
 at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498)
 at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
 at org.apache.spark.scheduler.Task.run(Task.scala:64)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)
Caused by: com.fasterxml.jackson.core.JsonParseException: Unexpected end-of-input: was expecting closing quote for a string value
 at [Source: {"first":"VVS","last":"Laxman" , "address": {"line1":"1 main street", "city":"Hyderabad","state":"AN","zip":"121212}}; line: 1, column: 235]
 at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1581)
 at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:533)
 at com.fasterxml.jackson.core.base.ParserMinimalBase._reportInvalidEOF(ParserMinimalBase.java:470)
 at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._finishString2(ReaderBasedJsonParser.java:1760)
 at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._finishString(ReaderBasedJsonParser.java:1747)
 at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.getText(ReaderBasedJsonParser.java:233)
 at com.fasterxml.jackson.databind.deser.std.StringDeserializer.deserialize(StringDeserializer.java:32)
 at com.fasterxml.jackson.databind.deser.std.StringDeserializer.deserialize(StringDeserializer.java:11)
 at com.fasterxml.jackson.databind.deser.SettableBeanProperty.deserialize(SettableBeanProperty.java:520)
 at com.fasterxml.jackson.databind.deser.impl.FieldProperty.deserializeAndSet(FieldProperty.java:101)
 at com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:258)
 ... 23 more

Driver stacktrace:
 at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
 at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)
 at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
 at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
 at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)
 at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
 at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
 at scala.Option.foreach(Option.scala:257)
 at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
 at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
 at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
 at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

Process finished with exit code 1

Hello Spark Streaming

In the WordCount program built using Apache Spark in Java , i built simple Spark program that takes name of the file as input, reads the file and performs word count on the file. Now Spark also has concept of Spark Streaming which allows you to read file as stream of real time events instead of one time load of input file. But the API for transforming the data, in both cases Spark converts the input in RDD. In case of Spark Streaming it would convert the input events into Micro RDD which is nothing but collecting all the incoming data for certain duration (microBatchTime) and then exposes it as RDD. I built this simple NetcatStreamClient Streaming application that listens for incoming data on netcat port, once it has data it performs wordcount on it and prints that to console. You can download the full source code from GitHub

package com.spnotes.spark

import com.typesafe.scalalogging.Logger
import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.slf4j.LoggerFactory

object NetcatStreamClient{
  val logger = Logger(LoggerFactory.getLogger("NetcatStreamClient"))

  def main(argv:Array[String]): Unit ={
    logger.debug("Entering NetcatStreamClient.main")
    if(argv.length != 3){
      println("Please provide 3 parameters   ")
      System.exit(1)
    }
    val hostName =argv(0)
    val port = argv(1).toInt
    val microBatchTime = argv(2).toInt

    logger.debug(s"Listening on $hostName at $port batching records every $microBatchTime")

    //Create Spark Configuration
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
 
 
    //Create SparkStreamingContext with microBatchTime which specifies how long spark should collect data
    val sparkStreamingContext = new StreamingContext(sparkConf,Seconds(microBatchTime))
    //Start listening for data on given host and port
    val lines = sparkStreamingContext.socketTextStream(hostName,port)
// Logic for implementing word count on the input batch
    lines.flatMap(_.split(" ")).map(word => (word,1)).reduceByKey(_+_).print()
 
    logger.debug("Number of words " + lines.count())

    //Start the stream so that data starts flowing, you must define transformation logic before calling start()
    sparkStreamingContext.start()
    sparkStreamingContext.awaitTermination()
    logger.debug("Exiting NetcatStreamClient.main")
  }
}
Once your project is built using mvn clean compile assembly:single, first thing you should do is executing following command to start netcat on localhost at port 9999 nc -l 9999
Next execute following code to start Spark Driver that takes 3 parameters host and port where netcat is listening and last parameter is how the batch duration should be bin/spark-submit ~/HelloSparkStreaming-1.0-SNAPSHOT-jar-with-dependencies.jar localhost 9999 5

How to create Spark Scala fat jar with dependencies using Maven

If your developing a Spark application in Scala or your developing a Standalone Scala application and you want to create a fat jar that includes dependencies you can use following Maven Script as template for your build file. Couple of things are different here you must include scala-library as one of the library and then also include maven-scala-plugin that takes care of compiling the scala code. The value of sourceDirectory specifies the directory that contains your scala code. Then for packaging the project as a fat jar you can use maven-assembly-plugin as you would for any other java project built using Maven.

Configuring Flume to use Twitter as Source

I wanted to figure out how to Configure Twitter as source for Flume so i tried these steps
  1. First go to Twitter Application Management page and configure application. This should give you consumerKey, consumerSecret, accessToken and accessTokenSecret
  2. Next create twitterflume.properties, that looks like this. You should create source of org.apache.flume.source.twitter.TwitterSource type and use the 4 values you got in the last step to configure access to twitter
    
    agent1.sources = twitter1
    agent1.sinks = logger1
    agent1.channels = memory1
    
    
    agent1.sources.twitter1.type = org.apache.flume.source.twitter.TwitterSource
    agent1.sources.twitter1.consumerKey =<consumerkey>
    agent1.sources.twitter1.consumerSecret =<consumerSecret>
    agent1.sources.twitter1.accessToken =<accessToken>
    agent1.sources.twitter1.accessTokenSecret =<accessTokenSecret>
    agent1.sources.twitter1.keywords = bigdata, hadoop
    agent1.sources.twitter1.maxBatchSize = 10
    agent1.sources.twitter1.maxBatchDurationMillis = 200
    
    
    # Describe the sink
    agent1.sinks.logger1.type = logger
    
    # Use a channel which buffers events in memory
    agent1.channels.memory1.type = memory
    agent1.channels.memory1.capacity = 1000
    agent1.channels.memory1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    agent1.sources.twitter1.channels = memory1
    agent1.sinks.logger1.channel = memory1
    
  3. Now last step is to run the flume agent and you should see twitter messages being dumped to console bin/flume-ng agent --conf conf --conf-file conf/twitterflume.properties --name agent1 -Dflume.root.logger=DEBUG,console
Note: When i tried this in the Hadoop Sandbox i started getting following authentication error, it seems the problem is that if your VM time is in the past then this causes this issue. Ex. when i did execute the date command on my sandbox i got date which was 3 days in the past. So i did restart the VM and after restart when i tried date command it gave me accurate time and the following error went away

[Twitter Stream consumer-1[Establishing connection]] ERROR   
org.apache.flume.source.twitter.TwitterSource (TwitterSource.java:331) -   
Exception while streaming tweets
stream.twitter.com
Relevant discussions can be found on the Internet at:
    http://www.google.co.jp/search?q=d0031b0b or
    http://www.google.co.jp/search?q=1db75522
TwitterException{exceptionCode=[d0031b0b-1db75522 db667dea-99334ae4],    
statusCode=-1, message=null, code=-1, retryAfter=-1, rateLimitStatus=null,   
version=3.0.3}
    at   
twitter4j.internal.http.HttpClientImpl.request(HttpClientImpl.java:192)
    at   
twitter4j.internal.http.HttpClientWrapper.request(HttpClientWrapper.java:61)
    at   
twitter4j.internal.http.HttpClientWrapper.get(HttpClientWrapper.java:89)
    at  
twitter4j.TwitterStreamImpl.getSampleStream(TwitterStreamImpl.java:176)
    at twitter4j.TwitterStreamImpl$4.getStream(TwitterStreamImpl.java:164)
    at  
   twitter4j.TwitterStreamImpl$TwitterStreamConsumer.run
(TwitterStreamImpl.java:462)
Caused by: java.net.UnknownHostException: stream.twitter.com
    at   
java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:178)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    at java.net.Socket.connect(Socket.java:579)
    at sun.security.ssl.SSLSocketImpl.connect(SSLSocketImpl.java:637)
    at sun.net.NetworkClient.doConnect(NetworkClient.java:175)
    at sun.net.www.http.HttpClient.openServer(HttpClient.java:432)
    at sun.net.www.http.HttpClient.openServer(HttpClient.java:527)
    at sun.net.www.protocol.https.HttpsClient.(HttpsClient.java:264)
    at sun.net.www.protocol.https.HttpsClient.New(HttpsClient.java:367)
    at  
   sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.
getNewHttpClient
(AbstractDelegateHttpsURLConnection.java:191)
    at  sun.net.www.protocol.http.HttpURLConnection.plainConnect
(HttpURLConnection.java:933)
    at  
sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.connect
(AbstractDelegateHttpsURLConnection.java:177)
    at sun.net.www.protocol.http.HttpURLConnection.getInputStream
(HttpURLConnection.java:1301)
    at java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:468)
    at sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode
(HttpsURLConnectionImpl.java:338)
    at twitter4j.internal.http.HttpResponseImpl.    
(HttpResponseImpl.java:34)
    at  
twitter4j.internal.http.HttpClientImpl.request(HttpClientImpl.java:156)

Simple Java Program for publishing Syslog Events

In the Using Syslog as source in Flume i blogged about how to configure flume to listen for Syslog event on particular UDP port. I wanted to test that configuration so i built this simple java program that can publish Syslog event on given host and port no. You can download the source code for this project from GitHub This program takes 3 arguments first is hostname for the syslog server, second is the port on which the server is listening and third is the actual message that you want to send.

Using Syslog as source in Flume

I wanted to figure out how to use Flume for receiving Syslog message. So i tried 2 different configurations one is using Syslog server on TCP port and other on UDP port. This is the flume configuration for listening on UDP port Copy the flumesyslogudp.properties file in the conf directory of your flume server and use following command to start flume server

bin/flume-ng agent --conf conf --conf-file conf/flumesyslogudp.properties --name agent1 
-Dflume.root.logger=DEBUG,console
Or you can configure flume to listen on TCP port. Only difference is the source type is syslogtcp instead of syslogudp

bin/flume-ng agent --conf conf --conf-file conf/flumesyslogtcp.properties --name agent1 
-Dflume.root.logger=DEBUG,console