Due to its fast, easy-to-use capabilities, Apache Spark helps to Enterprises process data faster, solving complex data problems quickly.
We all know that during the development of any program, taking care of the performance is equally important. A Spark job can be optimized by many techniques so let’s dig deeper into those techniques one by one. Apache Spark optimization helps with in-memory data computations. The bottleneck for these spark optimization computations can be CPU, memory or any resource in the cluster.
1. Serialization
- Serialization plays an important role in the performance for any distributed application. By default, Spark uses Java serializer.
- Spark can also use another serializer called ‘Kryo’ serializer for better performance.
- Kryo serializer is in compact binary format and offers processing 10x faster than Java serializer.
- To set the serializer properties:
conf.set(“spark.serializer”, “org.apache.spark.serializer.KryoSerializer”)
Code:
val conf = new SparkConf().setMaster(…).setAppName(…)
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
val sc = new SparkContext(conf)
Serialization plays an important role in the performance of any distributed application and we know that by default Spark uses the Java serializer on the JVM platform. Instead of Java serializer, Spark can also use another serializer called Kryo. The Kryo serializer gives better performance as compared to the Java serializer.
Kryo serializer is in a compact binary format and offers approximately 10 times faster speed as compared to the Java Serializer. To set the Kryo serializer as part of a Spark job, we need to set a configuration property, which is org.apache.spark.serializer.KryoSerializer.
2. API selection
- Spark introduced three types of API to work upon – RDD, DataFrame, DataSet
- RDD is used for low level operation with less optimization
- DataFrame is best choice in most cases due to its catalyst optimizer and low garbage collection (GC) overhead.
- Dataset is highly type safe and use encoders. It uses Tungsten for serialization in binary format.
We know that Spark comes with 3 types of API to work upon -RDD, DataFrame and DataSet.
RDD is used for low-level operations and has less optimization techniques.
DataFrame is the best choice in most cases because DataFrame uses the catalyst optimizer which creates a query plan resulting in better performance. DataFrame also generates low labor garbage collection overhead.
DataSets are highly type safe and use the encoder as part of their serialization. It also uses Tungsten for the serializer in binary format.
Code:
val df = spark.read.json(“examples/src/main/resources/people.json”)
case class Person(name: String, age: Long)
// Encoders are created for case classes
val caseClassDS = Seq(Person(“Andy”, 32)).toDS()
// Encoders for most common types are automatically provided by importing spark.implicits._
val primitiveDS = Seq(1, 2, 3).toDS()
primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)
// DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name
val path = “examples/src/main/resources/people.json”
val peopleDS = spark.read.json(path).as[Person]
3. Advance Variable
- Broadcasting plays an important role while tuning Spark jobs.
- Broadcast variable will make small datasets available on nodes locally.
- When you have one dataset which is smaller than other dataset, Broadcast join is highly recommended.
- To use the Broadcast join: (df1. join(broadcast(df2)))
Spark comes with 2 types of advanced variables – Broadcast and Accumulator.
Broadcasting plays an important role while tuning your spark job. Broadcast variable will make your small data set available on each node, and that node and data will be treated locally for the process.
Want more?
Subscribe to receive articles on topics of your interest, straight to your inbox.
Suppose you have a situation where one data set is very small and another data set is quite large, and you want to perform the join operation between these two. In that case, we should go for the broadcast join so that the small data set can fit into your broadcast variable. The syntax to use the broadcast variable is df1.join(broadcast(df2)). Here we have a second dataframe that is very small and we are keeping this data frame as a broadcast variable.
Code:
val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)
val accum = sc.longAccumulator(“My Accumulator”)
sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
accum.value
res2: Long = 10
4. Cache and Persist
- Spark provides its own caching mechanisms like persist() and cache().
- cache() and persist() will store the dataset in memory.
- When you have a small dataset which needs be used multiple times in your program, we cache that dataset.
- Cache() – Always in Memory
- Persist() – Memory and disks
Spark provides its own caching mechanism like Persist and Caching. Persist and Cache mechanisms will store the data set into the memory whenever there is requirement, where you have a small data set and that data set is being used multiple times in your program. If we apply RDD.Cache() it will always store the data in memory, and if we apply RDD.Persist() then some part of data can be stored into the memory some can be stored on the disk.
5. ByKey Operation
- Shuffles are heavy operation which consume a lot of memory.
- While coding in Spark, the user should always try to avoid shuffle operation.
- High shuffling may give rise to an OutOfMemory Error; To avoid such an error, the user can increase the level of parallelism.
- Use reduceByKey instead of groupByKey.
- Partition the data correctly.
As we know during our transformation of Spark we have many ByKey operations. ByKey operations generate lot of shuffle. Shuffles are heavy operation because they consume a lot of memory. While coding in Spark, a user should always try to avoid any shuffle operation because the shuffle operation will degrade the performance. If there is high shuffling then a user can get the error out of memory. Inthis case, to avoid that error, a user should increase the level of parallelism. Instead of groupBy, a user should go for the reduceByKey because groupByKey creates a lot of shuffling which hampers the performance, while reduceByKey does not shuffle the data as much. Therefore, reduceByKey is faster as compared to groupByKey. Whenever any ByKey operation is used, the user should partition the data correctly.
6. File Format selection
- Spark supports many formats, such as CSV, JSON, XML, PARQUET, ORC, AVRO, etc.
- Spark jobs can be optimized by choosing the parquet file with snappy compression which gives the high performance and best analysis.
- Parquet file is native to Spark which carries the metadata along with its footer.
Spark comes with many file formats like CSV, JSON, XML, PARQUET, ORC, AVRO and more. A Spark job can be optimized by choosing the parquet file with snappy compression. Parquet file is native to Spark which carry the metadata along with its footer as we know parquet file is native to spark which is into the binary format and along with the data it also carry the footer it’s also carries the metadata and its footer so whenever you create any parquet file, you will see .metadata file on the same directory along with the data file.
Code:
val peopleDF = spark.read.json(“examples/src/main/resources/people.json”)
peopleDF.write.parquet(“people.parquet”)
val parquetFileDF = spark.read.parquet(“people.parquet”)
val usersDF = spark.read.format(“avro”).load(“examples/src/main/resources/users.avro”)
usersDF.select(“name”, “favorite_color”).write.format(“avro”).save(“namesAndFavColors.avro”)
7. Garbage Collection Tuning
- JVM garbage collection can be a problem when you have large collection of unused objects.
- The first step in GC tuning is to collect statistics by choosing – verbose while submitting spark jobs.
- In an ideal situation we try to keep GC overheads < 10% of heap memory.
As we know underneath our Spark job is running on the JVM platform so JVM garbage collection can be a problematic when you have a large collection of an unused object so the first step in tuning of garbage collection is to collect statics by choosing the option in your Spark submit verbose. Generally, in an ideal situation we should keep our garbage collection memory less than 10% of heap memory.
8. Level of Parallelism
- Parallelism plays a very important role while tuning spark jobs.
- Every partition ~ task requires a single core for processing.
- There are two ways to maintain the parallelism:
- Repartition: Gives equal number of partitions with high shuffling
- Coalesce: Generally reduces the number of partitions with less shuffling.
In any distributed environment parallelism plays very important role while tuning your Spark job. Whenever a Spark job is submitted, it creates the desk that will contain stages, and the tasks depend upon partition so every partition or task requires a single core of the system for processing. There are two ways to maintain the parallelism – Repartition and Coalesce. Whenever you apply the Repartition method it gives you equal number of partitions but it will shuffle a lot so it is not advisable to go for Repartition when you want to lash all the data. Coalesce will generally reduce the number of partitions and creates less shuffling of data.
These factors for spark optimization, if properly used, can –
- Eliminate the long-running job process
- Correction execution engine
- Improve performance time by managing resources