Apache Spark Interview Questions with Answers – 2

Advertisements

List of Data Engineer Interview questions for Apache Spark…

Apache Spark

31. What is Dataset?

A Dataset is an immutable collection of objects those are mapped to a relational schema. They are strongly typed in nature.

There is an encoder at the core of the Dataset API. That Encoder is responsible for converting between JVM objects and tabular representation. Using Spark’s internal binary format, the tabular representation is stored to carry out operations on serialized data and improve memory utilization. It offers many functional transformations (e.g., map, flatMap, filter). It also supports automatically generating encoders for various types, including primitive types (e.g., String, Integer, Long) and Scala case classes.

32. What are the benefits of Datasets?

  1. Static typing- With the Static typing feature of Dataset, a developer can catch errors at compile-time (saves time and costs).

2. Run-time Safety:- Dataset APIs are all expressed as lambda functions and JVM typed objects, any mismatch of typed parameters will be detected at compile time. Also, analysis errors can be detected at compile time when using Datasets, saving developer time and costs.

3. Performance and Optimization- Dataset APIs are built on top of the Spark SQL engine; it uses Catalyst to generate an optimized logical and physical query plan providing the space and speed efficiency.

4. For processing demands like high-level expressions, filters, maps, aggregation, averages, sum, SQL queries, columnar access, and lambda functions on semi-structured data, DataSets are best.

5. Datasets provides rich semantics, high-level abstractions, and domain-specific APIs

33. What is a Shared variable in Apache Spark?

Shared variables are nothing but variables that can be used in parallel operations.

Spark supports two types of shared variables: broadcast variables, which can be used to cache a value in memory on all nodes, and accumulators, which are variables that are only “added” to, such as counters and sums.

34. How to accumulated Metadata in Apache Spark?

Metadata accumulates on the driver as a consequence of shuffle operations. It becomes particularly tedious during long-running jobs.

To deal with the issue of accumulating metadata, there are two options:

  • First, set the spark.cleaner.ttl parameter to trigger automatic cleanups. However, this will vanish any persisted RDDs.
  • The other solution is to simply split long-running jobs into batches and write intermediate results to disk. This facilitates a fresh environment for every batch and doesn’t have to worry about metadata build-up.

35. What is the Difference between DSM and RDD?

Based on several features, the difference between RDD and DSM is:

  • Read

RDD — The read operation in RDD is either coarse-grained or fine-grained. Coarse-grained meaning we can transform the whole dataset but not an individual element on the dataset. While fine-grained means we can transform individual elements on the dataset.

DSM — The read operation in Distributed shared memory is fine-grained.

  • Write

RDD — The write operation in RDD is coarse-grained.

DSM — The Write operation is fine-grained in distributed shared system.

  • Consistency

RDD — The consistency of RDD is trivial, meaning it is immutable in nature. We can not realtor the content of RDD, i.e., any changes on RDD are permanent. Hence, The level of consistency is very high.

DSM — The system guarantees that if the programmer follows the rules, the memory will be consistent. Also, the results of memory operations will be predictable.

  • Fault-Recovery Mechanism

RDD — By using a lineage graph at any moment, the lost data can be easily recovered in Spark RDD. Therefore, for each transformation, a new RDD is formed. As RDDs are immutable in nature, hence, it is easy to recover.

DSM — Fault tolerance is achieved by a checkpointing technique that allows applications to roll back to a recent checkpoint rather than restarting.

  • Straggler Mitigation

Stragglers, in general, are those that take more time to complete than their peers. This could happen due to many reasons such as load imbalance, I/O blocks, garbage collections, etc.

An issue with the stragglers is that when the parallel computation is followed by synchronizations such as reductions, that causes all the parallel tasks to wait for others.

RDD — It is possible to mitigate stragglers by using backup tasks in RDDs. DSM — To achieve straggler mitigation is quite difficult.

  • Behavior if not enough RAM

RDD — As there is not enough space to store RDD in RAM, the RDDs are shifted to disk. DSM — If the RAM runs out of storage, the performance decreases in this type of system.

36. What is Speculative Execution in Spark and how to enable it?

One more point is, Speculative execution will not stop the slow running task, but it launches the new task in parallel.

Tabular Form :

Spark Property >> Default Value >> Description

spark.speculation >> false >> enables ( true ) or disables ( false ) speculative execution of tasks.

spark.speculation.interval >> 100ms >> The time interval to use before checking for speculative tasks.

spark.speculation.multiplier >> 1.5 >> How many times slower a task is than the median to be for speculation.

spark.speculation.quantile >> 0.75 >> The percentage of tasks that has not finished yet at which to start speculation.

37. How is fault tolerance achieved in Apache Spark?

The basic semantics of fault tolerance in Apache Spark is, all the Spark RDDs are immutable. It remembers the dependencies between every RDD involved in the operations through the lineage graph created in the DAG. In any failure, Spark refers to the lineage graph to apply the same operations to perform the tasks.

There are two types of failures — Worker or driver failure. If the worker fails, the executors in that worker node will be killed, along with the data in their memory. Using the lineage graph, those tasks will be accomplished in any other worker nodes. The data is also replicated to other worker nodes to achieve fault tolerance. There are two cases:

  • Data received and replicated — Data is received from the source and replicated across worker nodes. In the case of any failure, the data replication will help achieve fault tolerance.
  • Data received but not yet replicated — Data is received from the source but buffered for replication. In the case of any failure, the data needs to be retrieved from the source.

For stream inputs based on receivers, the fault tolerance is based on the type of receiver:

  1. Reliable receiver — Once the data is received and replicated, an acknowledgment is sent to the source. In case if the receiver fails, the source will not receive acknowledgment for the received data. When the receiver is restarted, the source will resend the data to achieve fault tolerance.
  2. Unreliable receiver — The source will not acknowledge the received data. In this case of any failure, the source will not know if the data has been received or not, and it will not resend the data, so there is data loss.

To overcome this data loss scenario, Write-Ahead Logging (WAL) has been introduced in Apache Spark 1.2. With WAL enabled, the intention of the operation is first noted down in a log file, such that if the driver fails and is restarted, the noted operations in that log file can be applied to the data. For sources that read streaming data, like Kafka or Flume, receivers will breceive the data, which will be stored in the executor’s memory. With WAL enabled, these received data will also be stored in the log files.

WAL can be enabled by performing the below:

Setting the checkpoint directory by using streamingContext.checkpoint(path)

Enabling the WAL logging by setting spark.stream.receiver.WriteAheadLog.enable to True.

38. Explain the difference between reduceByKey, groupByKey, aggregateByKey, and combineByKey?

  • groupByKey: groupByKey can cause disk problems as data is sent over the network and collected on the reduced workers.

Example:-

sc.textFile(“hdfs://”).flatMap(line => line.split(“ “) ).map(word => (word,1)) .groupByKey().map((x,y) => (x,sum(y)) )

  • reduceByKey: Data are combined at each partition, with only one output for one key to send over the network. reduceByKey required combining all your values into another value with the same type.

Example:-

sc.textFile(“hdfs://”).flatMap(line => line.split(“ “) ).map(word => (word,1)) .reduceByKey((x,y)=> (x+y))

  • aggregateByKey:same as reduceByKey, which takes an initial value. thee parameters as input,
  • 1). initial value 2). Combiner logic function 3). merge Function

Example:-

val inp =Seq(“dinesh=70”,”kumar=60″,”raja=40″,”ram=60″,”dinesh=50″,”dinesh=80″,”kumar=40″ ,”raja=40″)

val rdd=sc.parallelize(inp,3)

val pairRdd=rdd.map(_.split(“=”)).map(x=>(x(0),x(1)))

val initial_val=0

val addOp=(intVal:Int,StrVal: String)=> intVal+StrVal.toInt val mergeOp=(p1:Int,p2:Int)=>p1+p2

val out=pairRdd.aggregateByKey(initial_val)(addOp,mergeOp) out.collect.foreach(println)

4.combineByKey: combineByKey values are merged into one value at each partition then each partition value is merged into a single value. It’s worth noting that the type of the combined value does not have to match the original value type, and often it won’t be.

Three parameters as input

  • 1. create combiner
  • 2. mergeValue
  • 3. mergeCombiners

Example:

val inp = Array((“Dinesh”, 98.0), (“Kumar”, 86.0), (“Kumar”, 81.0), (“Dinesh”, 92.0), (“Dinesh”, 83.0), (“Kumar”, 88.0))

val rdd = sc.parallelize(inp,2)

//Create the combiner

val combiner = (inp:Double) => (1,inp)

//Function to merge the values within a partition.Add 1 to the # of entries and inp to the existing inp val mergeValue = (PartVal:(Int,Double),inp:Double) =>{

(PartVal._1 + 1, PartVal._2 + inp)

}

//Function to merge across the partitions

val mergeCombiners = (PartOutput1:(Int, Double) , PartOutput2:(Int, Double))=>{

(PartOutput1._1+PartOutput2._1 , PartOutput1._2+PartOutput2._2)

}

//Function to calculate the average.Personinps is a custom type val CalculateAvg = (personinp:(String, (Int, Double)))=>{

val (name,(numofinps,inp)) = personinp

(name,inp/numofinps)

}

val rdd1=rdd.combineByKey(combiner, mergeValue, mergeCombiners) rdd1.collect().foreach(println)

val rdd2=rdd.combineByKey(combiner, mergeValue, mergeCombiners).map( CalculateAvg) rdd2.collect().foreach(println)

39. Explain the mapPartitions() and mapPartitionsWithIndex() ?

mapPartitions() and mapPartitionsWithIndex() are both transformation.

mapPartitions() :

  • It runs one at a time on each partition or block of the Rdd, so the function must be of type iterator<T>. It improves performance by reducing the creation of objects in the map function.
  • mapPartitions() can be used as an alternative to map() and foreach().
  • mapPartitions() can be called for each partitions while map() and foreach() is called for each elements in an RDD

Hence one can do the initialization on a pre-partition basis rather than each element basis.

mapPartionwithIndex():

  • It is similar to MapPartition, but it takes two parameters; the first parameter is the index, and the second is an iterator through all items within this partition (Int, Iterator<T>).

mapPartitionsWithIndex is similar to mapPartitions(), but it provides a second parameter index that keeps track of the partition.

40. Explain fold() operation in Spark?

  • fold() is an action. It is a wide operation (i.e., shuffle data across multiple partitions and output a single value). It takes functions as an input with two parameters of the same type and outputs a single input type value.
  • It is similar to reduce but has one more argument, ‘ZERO VALUE’ (say initial value), which will be used in the initial call on each partition.

def fold(zeroValue: T)(op: (T, T) ⇒ T): T

  • Aggregate the elements of each partition and then the results for all the partitions, using a given associative function and a neutral “zero value.” The function op(t1, t2) can modify t1 and return it as its result value to avoid object allocation; however, it should not modify t2.
  • This behaves somewhat differently from fold operations implemented for non-distributed collections in functional languages like Scala. This fold operation may be applied to partitions individually. Then, those results into the final result, rather than apply the fold to each element sequentially in some defined ordering. For functions that are not commutative, the result may differ from that of a fold applied to a non-distributed collection.
  • zeroValue: The initial value for the accumulated result of each partition for the op operator, and also the initial value for the combined results from different partitions for the op operator — this will typically be the neutral element (e.g., Nil for list concatenation or 0 for summation)
  • Op: an operator used to both accumulate results within a partition and combine results from different partitions

Example :

val rdd1 = sc.parallelize(List(1,2,3,4,5),3) rdd1.fold(5)(_+_)

Output : Int = 35

val rdd1 = sc.parallelize(List(1,2,3,4,5)) rdd1.fold(5)(_+_)

Output : Int = 25

val rdd1 = sc.parallelize(List(1,2,3,4,5),3) rdd1.fold(3)(_+_)

Output : Int = 27

41. Difference between textFile Vs. wholeTextFile ?

Both are the method of org.apache.spark.SparkContext.

textFile() :

def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String]

Read a text file from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI, and return it as an RDD of Strings

For example, sc.textFile(“/home/hdadmin/wc-data.txt”) will create RDD in each line an element.

Everyone knows the use of textFile.

wholeTextFiles() :

def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)]

Read a directory of text files from HDFS, a local file system (available on all nodes), or any Hadoop- supported file system URI.Rather than create basic RDD, the wholeTextFile() returns pairRDD.

For example, you have few files in a directory so by using wholeTextFile() method creates a pair RDD with a filename with the path as key and value being the whole file as a string.

Example:-

val myfilerdd = sc.wholeTextFiles(“/home/hdadmin/MyFiles”)

val keyrdd = myfilerdd.keys

keyrdd.collect

val filerdd = myfilerdd.values

filerdd.collect

42. What is cogroup() operation.?

  • It’s a transformation.
  • It’s in package org.apache.spark.rdd.PairRDDFunctions

def cogroup[W1, W2, W3](other1: RDD[(K, W1)], other2: RDD[(K, W2)], other3: RDD[(K, W3)]): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))]

For each key k in this or other1 or other2 or other3, return a resulting RDD that contains a tuple with the list of values for that key in this, other1, other2, and other3.

Example:

val myrdd1 = sc.parallelize(List((1,”spark”),(2,”HDFS”),(3,”Hive”),(4,”Flink”),(6,”HBase”)))

val myrdd2= sc.parallelize(List((4,”RealTime”),(5,”Kafka”),(6,”NOSQL”),(1,”stream”),(1,”MLlib”))) val result = myrdd1.cogroup(myrdd2)

result.collect

Output :

Array[(Int, (Iterable[String], Iterable[String]))] =

Array((4,(CompactBuffer(Flink),CompactBuffer(RealTime))),

(1,(CompactBuffer(spark),CompactBuffer(stream, MLlib))),

(6,(CompactBuffer(HBase),CompactBuffer(NOSQL))),

(3,(CompactBuffer(Hive),CompactBuffer())),

(5,(CompactBuffer(),CompactBuffer(Kafka))),

(2,(CompactBuffer(HDFS),CompactBuffer())))

43. Explain pipe() operation?

Return an RDD created by piping elements to a forked external process.

def pipe(command: String): RDD[String]

In general, Spark is using Scala, Java, and Python to write the program. However, if that is not enough, and one wants to pipe (inject) the data written in other languages like ‘R,’ Spark provides a general mechanism in the form of pipe() method.

Spark provides the pipe() method on RDDs.

With Spark’s pipe() method, one can transform an RDD that can read each element in the RDD from standard input as String.

It can write the results as String to the standard output.

Example: test.py

#!/usr/bin/python

import sys

for line in sys.stdin:

print “hello “ + line

spark-shell Scala:

val data = List(“john”,”paul”,”george”,”ringo”) val dataRDD = sc.makeRDD(data)

val scriptPath = “./test.py”

val pipeRDD = dataRDD.pipe(scriptPath) pipeRDD.foreach(println)

44.Explain coalesce() operation.?

It’s in a package org.apache.spark.rdd.ShuffledRDD

def coalesce(numPartitions: Int, shuffle: Boolean = false, partitionCoalescer: Option[PartitionCoalescer] = Option.empty)(implicit ord: Ordering[(K, C)] = null): RDD[(K, C)]

Return a new RDD that is reduced into numPartitions partitions.

Example:

val myrdd1 = sc.parallelize(1 to 1000, 15) myrdd1.partitions.length

val myrdd2 = myrdd1.coalesce(5,false) myrdd2.partitions.length

Int = 5

45. Explain the repartition() operation?

  • repartition() is a transformation.
  • This function changes the number of partitions mentioned in parameter numPartitions(numPartitions : Int)
  • It’s in package org.apache.spark.rdd.ShuffledRDD

def repartition(numPartitions: Int)(implicit ord: Ordering[(K, C)] = null): RDD[(K, C)]

  • Return a new RDD that has exactly numPartitions partitions.
  • Can increase or decrease the level of parallelism in this RDD. Internally, this uses a shuffle to redistribute data.

If you decrease the number of partitions in this RDD, consider using coalesce to avoid shuffling.

Example :

val rdd1 = sc.parallelize(1 to 100, 3) rdd1.getNumPartitions

val rdd2 = rdd1.repartition(6)

rdd2.getNumPartitions

46. Explain the top() and takeOrdered() operation.?

  • Both top() and takeOrdered() are actions.
  • Both returns then elements of RDD based on default ordering or the user’s custom orders.

def top(num: Int)(implicit ord: Ordering[T]): Array[T]

Returns the top k (largest) elements from this RDD as defined by the specified implicit Ordering[T] and maintains the ordering. This does the opposite of takeOrdered.

def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]

Returns the first k (smallest) elements from this RDD as defined by the specified implicit Ordering[T] and maintains the ordering. This does the opposite of top.

Example :

val myrdd1 = sc.parallelize(List(5,7,9,13,51,89))

myrdd1.top(3) //Array[Int] = Array(89, 51, 13)

myrdd1.takeOrdered(3) //Array[Int] = Array(5, 7, 9)

myrdd1.top(3) //Array[Int] = Array(89, 51, 13)

47. Explain the lookup() operation.?

  • It is an action
  • It returns the list of values in the RDD for key ‘key’

Example:

val rdd1 = sc.parallelize(Seq((“myspark”,78),(“Hive”,95),(“spark”,15),(“HBase”,25),(“spark”,39), (“BigData”,78),(“spark”,49)))

rdd1.lookup(“spark”)

rdd1.lookup(“Hive”)

rdd1.lookup(“BigData”)

Output:

Seq[Int] = WrappedArray(15, 39, 49)Seq[Int] = WrappedArray(95)Seq[Int] = WrappedArray(78)

48. How to Kill Spark Running Application.?

Get the application Id from the spark scheduler, for instance, application_743159779306972_1234 and then, run the command in the terminal like below.

yarn application -kill application_743159779306972_1234

49. How to stop INFO messages displaying on spark console?

Edit spark conf/log4j.properties file and change the following line:

log4j.rootCategory=INFO, console

to

log4j.rootCategory=ERROR, console

for Spark 2.X, please import the below commands,

import org.apache.log4j.{Logger,Level}Logger.getLogger(“org”).setLevel(Level.ERROR)

for Python: spark.sparkContext.setLogLevel(“ERROR”)

50. Where are the logs available in Spark on YARN?

we can access logs through the command

Syntax:-

yarn logs -applicationId <application ID>

51. How to find out the different values in between two spark dataframes.?

Simply we can achieve by except operation

Example:-

scala> customerDF.show+ — -+ — — — — + — -+ — — — +|cId| name|age|gender|+ — -+ — — — — + — -+ — — — +| 1| James| 21| M|| 2| Liz| 25| F|| 3| John| 31| M|| 4|Jennifer| 45| F|| 5| Robert| 41| M|| 6| Sandra| 45| F|+ — -+ — — — — + — -+ — — — +scala> custDF.show+ — -+ — — -+ — -+ — — — +|cId| name|age|gender|+ — -+ — — -+ — -+ — — — +| 1|James| 21| M|| 2| Liz| 25| F|+ — -+ — — -+ — -+ — — — +scala> customerDF.except(custDF).show + — -+ — — — — + — -+ — — — +|cId| name|age|gender|+ — -+ — — — — + — -+ — — — +| 5| Robert| 41| M|| 3| John| 31| M|| 4|Jennifer| 45| F|| 6| Sandra| 45| F|+ — -+ — — — — + — -+ — — — +

52. What are security options in Apache Spark?

Spark currently supports authentication via a shared secret. Authentication can be configured to be on via the spark.authenticate configuration parameter. This parameter controls whether the Spark communication protocols do authentication using the shared secret. This authentication is a basic handshake to make sure both sides have the same shared secret and can communicate. If the shared secret is not identical, they will not be allowed to communicate. The shared secret is created as follows:

For Spark on YARN deployments, configuring spark.authenticate to true will automatically generate and distribute the shared secret. Each application will use a unique shared secret.

For other tpark deployments, the Spark parameter spark.authenticate.secret should be configured on each onode. This secret will be used by all the Master/Workers and applications.

53. What is Scala?

Scala is a modern multi-paradigm programming language designed to express common programming patterns concisely, elegantly, and type-safe. It smoothly integrates features of object-oriented and functional languages.

54. What are the Types of Variable Expressions available in Scala?

val (aka Values):

You can name the results of expressions with the val keyword. Once refer a value, it does not re-compute it.

Example:

val x = 1 + 1x = 3 // This does not compile.

var (aka Variables):

Variables are like values, except you can re-assign them. You can define a variable with the var keyword.

Example:

var x = 1 + 1x = 3 // This can compile.

55. What is the difference between method and functions in Scala..?

Methods:-

Methods look and behave very similar to functions, but there are a few key differences between them.

Methods are defined with the def keyword. def is followed by a name, parameter lists, a return type, and a body.

Example:

def add(x: Int, y: Int): Int = x + y println(add(1, 2)) // 3

Functions:-

Functions are expressions that take parameters.

Bigdata Hadoop: Spark Interview Questions with Answers

You can define an anonymous function (i.e. no name) that returns a given integer plus one: (x: Int) => x + 1

You can also name functions. like

val addOne = (x: Int) => x + 1println(addOne(1)) // 2

56. What are the case classes in Scala?

Scala has a special type of class called a “case” class. By default, case classes are immutable and compared by value. You can define case classes with the case class keywords.

case class Point(x: Int, y: Int)val point = Point(1, 2)val anotherPoint = Point(1, 2)val yetAnotherPoint = Point(2, 2)

57. What is Traits in Scala?

Traits are used to share interfaces and fields between classes. They are similar to Java 8’s interfaces. Classes and objects can extend traits, but traits cannot be instantiated and therefore have no parameters.

Traits are types containing certain fields and methods. Multiple traits can be combined.

A minimal trait is simply the keyword trait and an identifier:

Example:

trait Greeter { def greet(name: String): Unit = println(“Hello, “ + name + “!”) }

58. What is a singleton object in scala?

An object is a class that has exactly one instance is called a singleton object. Here’s an example of a singleton object with a method:

object Logger {def info(message: String): Unit = println(“Hi i am Dineshkumar”)}

59. What is Companion objects in scala?

An object with the same name as a class is called a companion object. Conversely, the class is the object’s companion class. A companion class or object can access the private members of its companion. Use a companion object for methods and values which are not specific to instances of the companion class.

Example:

import scala.math._case class Circle(radius: Double) { import Circle._def area: Double = calculateArea(radius)}object Circle {private def calculateArea(radius: Double): Double = Pi * pow(radius, 2.0)}val circle1 = new Circle(5.0)circle1.area

60. What are the special data type available in Scala?

Any:

Any is the supertype of all types, also called the top type. It defines certain universal methods such as equals, hashCode, and toString. Any has two direct subclasses: AnyVal and AnyRef.

Sample Example:

val list: List[Any] = List(“a string”,732, // an integer‘c’, // a charactertrue, // a boolean value() => “an anonymous function returning a string”)list.foreach(element => println(element))

AnyVal:

AnyVal represents value types. Nine predefined value types are non-nullable: Double, Float, Long, Int, Short, Byte, Char, Unit, and Boolean. Unit is a value type that carries no meaningful information. There is exactly one instance of a Unit that can be declared literally like so: (). All functions must return something, so sometimes, Unit is a useful return type.

AnyRef:

AnyRef represents reference types. All non-value types are defined as reference types. Every user-defined type in Scala is a subtype of AnyRef. If Scala is used in a Java runtime environment, AnyRef corresponds to java.lang.Object.

Nothing:

Nothing is a subtype of all types, also called the bottom type. No value has type Nothing. A common use is to signal non-termination such as a thrown exception, program exit, or an infinite loop (i.e., it is the type of an expression that does not evaluate a value or a method that does not return normally).

Null:

Null is a subtype of all reference types (i.e., any subtype of AnyRef). It has a single value identified by the keyword literal null. Null is provided mostly for interoperability with other JVM languages and should rarely be used in Scala code. We’ll cover alternatives to null later in the tour.