Apache Spark Interview Questions with Answers – 3

Advertisements

List of Data Engineer Interview questions for Apache Spark…

61. What are Higher-order functions in Scala?

Higher-order functions take other functions as parameters or return a function as a result. This is possible because functions are first-class values in Scala. The terminology can get a bit confusing at this point, and we use the phrase “higher-order function” for both methods and functions that take functions as parameters or return a function.

In Higher-Order function will make the features are, Functions that accept another function & A Functions that return to a function to reduce redundant code.

One of the most common examples is the higher-order function map which is available for collections in Scala.

val salaries = Seq(20000, 70000, 40000)val doubleSalary = (x: Int) => x * 2val newSalaries = salaries.map(doubleSalary) // List(40000, 140000, 80000)

62. What is the Currying function or multiple parameter lists in Scala?

Methods may define multiple parameter lists. When a method is called with a fewer number of parameter lists, then this will yield a function taking the missing parameter lists as its arguments. This is formally known as currying.

Example:

def foldLeft[B](z: B)(op: (B, A) => B): Bval numbers = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)val res = numbers.foldLeft(0)((m, n) => m + n) print(res) // 55

63. What is Pattern matching in Scala?

Pattern matching is a mechanism for checking a value against a pattern. It is a more powerful version of the switch statement in Java, and it can likewise be used in place of a series of if/else statements. A successful match can also deconstruct a value into its constituent parts.

import scala.util.Randomval x: Int = Random.nextInt(10)x match {case 0 => “zero”case 1 => “one”case 2 => “two”case _ => “many”}def matchTest(x: Int): String = x match {case 1 => “one”case 2 => “two”case _ => “many”}matchTest(3) // manymatchTest(1) // one

64. What are the basic properties available in Spark?

It may be useful to provide some simple definitions for the Spark nomenclature:

  • Worker Node: A server that is part of the cluster and is available to run Spark jobs Master Node: The server coordinates the Worker nodes.
  • Executor: A sort of virtual machine inside a node. One Node can have multiple Executors.
  • Driver Node: The node that initiates the Spark session. Typically, this will be the server where context is located.
  • Driver (Executor): The Driver Node will also show up in the Executor list.

65. What are the Configuration properties in Spark?

  • spark.executor.memory:- The maximum possible is managed by the YARN cluster, which can not exceed the actual RAM available.
  • spark.executor.cores:- Number of cores assigned per Executor, which cannot be higher than the cores available in each worker.
  • spark.executor.instances:- Number of executors to start. The cluster acknowledges this property if spark.dynamicAllocation.enabled is set to “false.”
  • spark.memory.fraction:- The default is set to 60% of the requested memory per executor.
  • spark.dynamicAllocation.enabled:- Overrides the mechanism that Spark provides to adjust resources dynamically. Disabling it provides more control over the number of Executors that can be started, which impacts the amount of storage available for the session. For more information, please see the Dynamic Resource Allocation page on the official Spark website.

66. What are Sealed classes?

Traits and classes can be marked sealed, which means all subtypes must be declared in the same file.

This is useful for pattern matching because we don’t need a “catch-all” case. This assures that all subtypes are known.

Example:

sealed abstract class Furniturecase class Couch() extends Furniturecase class Chair() extends Furnituredef findPlaceToSit(piece: Furniture): String = piece match { case a: Couch => “Lie on the couch”case b: Chair => “Sit on the chair”}

67. What is Type Inference?

The Scala compiler can often infer the type of an expression, so you don’t have to declare it explicitly.

Example:

val Name = “Dineshkumar S” // it consider as String val id = 1234 // considered as int

68. When not to rely on default type inference?

The type inferred for obj was Null. Since the only value of that type is null, assigning a different value by default is impossible.

69. How can we debug spark applications locally?

Actually, we can do that in local debugging, setting breakpoints, inspecting variables, etc. set spark submission in deploy mode like below –

spark-submit — name CodeTestDinesh — class DineshMainClass — master local[2] DineshApps.jar

then spark driver to pause and wait for a connection from a debugger when it starts up by adding an option like –

the following:

 — conf spark.driver.extraJavaOptions=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005

where agentlib:jdwp is the Java Debug Wire Protocol option, followed by a comma-separated list of sub-options:

  • transport: defines the connection protocol used between debugger and debuggee — either socket or “shared
  • memory” — you almost always want socket (dt_socket) except I believe in some cases on Microsoft Windows
  • server: whether this process should be the server when talking to the debugger (or conversely, the client) — you always need one server and one client. In this case, we’re going to be the server and wait for a connection from the debugger
  • suspend: whether to pause execution until a debugger has successfully connected. We turn this on so the driver won’t start until the debugger connects
  • address: This is the port to listen on (for incoming debugger connection requests). You can set it to any available port (you just have to make sure the debugger is configured to connect to this same port)

70. Map collection has Key and value the Key should be mutable or immutable?

The behavior of a Map is not specified if the value of an object is changed in a manner that affects equals comparison during the object with the key. So Key should be immutable.

71. What is OFF_HEAP persistence in spark?

One of the most important capabilities in Spark is persisting (or caching) datasets in memory across operations. Each persisted RDD can be stored using a different storage level. One of the possibilities is to store RDDs in serialized format off-heap. Compared to storing data in the Spark JVM, off-heap storage reduces garbage collection overhead and allows executors to be smaller and tshare a memory pool This makes it attractive in environments with large heaps or multiple concurrent applications.

72. What is the difference between Apache Spark and Apache Flink?

  • Stream Processing:While Spark is a batch-oriented system that operates on chunks of data, called RDDs, Apache Flink is a stream processing system able to process row after row in real-time.
  • Iterations: By exploiting its streaming architecture, Flink allows you to iterate over data natively, something Spark also supports only as batches
  • Memory Management: Spark jobs have to be optimized and adapted to specific datasets because you need to control partitioning and caching to get it right manually.
  • Maturity: Flink is still in its infancy and has but a few production deployments.
  • Data Flow: In contrast to the procedural programming paradigm, Flink follows a distributed data flow approach. For data set operations where intermediate results are required and the regular input of an operation, broadcast variables are used to distribute the pre-calculated results to all worker nodes.

73. How do we Measuring the impact of Garbage Collection?

GC has happened due to using too much memory on a driver or some executors, or it might be where garbage collection becomes extremely costly and slow as large numbers of objects are created in the JVM. You can do by this validation ‘-verbose:GC -XX:+PrintGCDetails-XX:+PrintGCTimeStamps’ to Spark’s JVM options using the `spark.executor.extraJavaOptions` configuration parameter.

74. Apache Spark vs. Apache Storm?

Apache Spark is an in-memory distributed data analysis platform — primarily targeted at speeding up batch analysis jobs, iterative machine learning jobs, interactive queries, and graph processing.

One of Spark’s primary distinctions is its use of RDDs or Resilient Distributed Datasets. RDDs are great for pipelining parallel operators for computation and are, by definition, immutable, which allows Spark a unique form of fault tolerance based on lineage information. If you are interested in, for example, executing a Hadoop.

MapReduce job much faster, Spark is a great option (although memory requirements must be considered).

Apache Storm is focused on stream processing or what some call complex event processing. Storm implements a fault-tolerant method for performing a computation or pipelining multiple computations on an event as it flows into a system. One might use Storm to transform unstructured data as it flows into a system into the desired format.

Storm and Spark are focused on fairly different use cases. The more “apples-to-apples” comparison would be between Storm Trident and Spark Streaming. Since Spark’s RDDs are inherently immutable, Spark Streaming implements a method for “batching” incoming updates in user-defined time intervals that get transformed into their own RDDs. Spark’s parallel operators can then perform computations on these RDDs. This is different from Storm, which deals with each event individually.

One key difference between these two technologies is that Spark performs Data-Parallel computations while Storm performs Task-Parallel computations. Either design makes trade-offs that are worth knowing.

75. How to overwrite the output directory in spark?

refer below command using Dataframes,

df.write.mode(SaveMode.Overwrite).parquet(path)

76. How to read multiple text files into a single RDD?

You can specify whole directories, use wildcards, and even CSV of directories and wildcards like below.

val rdd = sc.textFile(“file:///D:/Dinesh.txt, file:///D:/Dineshnew.txt”)

77. Can we run SPARK without a base of HDFS?

Apache Spark is a fast and general-purpose cluster computing system. It is not a data storage system. It uses an external storage system for storing and reading data. So we can run Spark without HDFS in distributed mode using any HDFS compatible file systems like S3, GPFS, GlusterFs, Cassandra, etc.

There is another file system called Tachyon. It is an in-memory file system to run spark in distributed mode.

78. Define generic classes in scala?

Generic classes are classes that take a type as a parameter. They are beneficial for collection classes.

Generic classes take a type as a parameter within square brackets []. One convention uses the letter A as a parameter identifier, though any parameter name may be used.

Example: The instance stack can only take Int values. val stack = new Stack[Int]

stack.push(1)stack.push(2)println(stack.pop) // prints 2println(stack.pop) // prints 1

79. How to enable tungsten sort shuffle in Spark 2.x?

SortShuffleManager is the one and only ShuffleManager in Spark with the short name sort or tungsten-sort.

In other words, there’s no way you could use any other ShuffleManager but SortShuffleManager (unless you enabled one using spark.shuffle.manager property).

80. How to prevent Spark Executors from getting Lost when using YARN client mode?

The solution, if you’re using yarn, was to set

 — conf spark.yarn.executor.memoryOverhead=600,

81. What is the relationship between the YARN Containers and the Spark Executors.?

First, the important thing is that the number of containers will always be the same as the executors created by a Spark application, e.g., via — num-executors parameter in spark-submit.

Set by the yarn.scheduler.minimum-allocation-mb every container always allocates at least this amount of memory. This means if parameter — executor-memory is set to, e.g., only 1g but yarn.scheduler.minimum-allocation-mb is, e.g., 6g, the container is much bigger thanthe Spark application needsn.

The other way round, if the parameter — executor-memory is set to something higher than the yarn.scheduler.minimum-allocation-Mb value, e.g., 12g, the Container will allocate more memory dynamically, but only if the requested amount of memory is smaller or equal to yarn.scheduler.maximum-allocation-mb value.

The value of yarn.node manager.resource.memory-Mb determines how much memory can be allocated in sum by all containers of one host!

So setting yarn.scheduler.minimum-allocation-mb allows you to run smaller containers, e.g., for smaller executors (else, it would be a waste of memory).

Setting yarn.scheduler.maximum-allocation-mb to the maximum value (e.g., equal to yarn.node manager.resource.memory-mb) allows you to define bigger executors (more memory is allocated if needed, e.g., by — executor-memory parameter).

82. How to allocate the memory sizes for the spark jobs in the cluster?

Before we are answering this question, we have to concentrate on the three main features.

which are –

  • NoOfExecutors,
  • Executor-memory
  • Number of executor-cores

Let us go with an example now. Let’s imagine we have a cluster with six nodes running NodeManagers, each with 16 cores and 64GB RAM. The NodeManager sizes, yarn.node manager.resource.memory-mb and yarn.nodemanager.resource.cpu-vcores, should be set to 63 * 1024 = 64512 (megabytes) and 15 respectively. We never provide 100% allocation of each resource to YARN containers because the node needs some resources to run the OS processes and Hadoop. In this case, we leave a gigabyte and a core for these system processes.

Cloudera Manager helps by accounting for these and configuring these YARN properties automatically. So the allocation likely matched as — num-executors 6 — executor-cores 15 — executor-memory 63G.

However, this is the wrong approach because 63GB more on the executor memory overhead won’t fit within the 63GB RAM of the NodeManagers. The application master will cover up a core on one of the nodes, meaning that there won’t be room for a 15-core executor on that node. 15 cores per executor can lead to bad HDFS I/O throughput. So the best option would be to use — num-executors 17 — executor-cores 5 — executor-memory 19G.

This configuration results in three executors on all nodes except for the Application Master, with two executors. — executor-memory was derived as (63/3 executors per node) = 21. 21 * 0.07 = 1.47. 21 — 1.47 ~ 19.

83. How autocompletion tab can enable in pyspark.?

import the below libraries in pyspark shell

import rlcompleter, readlinereadline.parse_and_bind(“tab: complete”)

84. Can we execute two transformations on the same RDD in parallel in Apache Spark?

All standard RDD methods are blocking (except AsyncRDDActions), so actions will be evaluated sequentially.

It is possible to execute multiple actions concurrently using non-blocking submission (threads, Futures) with the correct configuration of the in-application scheduler or explicitly limited resources for each action.

Example:

val df = spark.range(100000)val df1= df.filter(‘id < 1000)val df2= df.filter(‘id >= 1000)print(df1.count() + df2.count()) //100000

Regarding cache, it is impossible to answer without knowing the context. Depending on the cluster configuration, storage, and data locality, it might be cheaper to load data from disk again, especially when resources are limited, and subsequent actions might trigger cache cleaner.

85. Which cluster type should I choose for Spark?

  • Standalone — meaning Spark will manage its own cluster
  • YARN — using Hadoop’s YARN resource manager

Start with a standalone cluster if this is a new deployment. Standalone mode is the easiest to set up and will provide almost all the same features as the other cluster managers if you are only running Spark.

If you would like to run Spark alongside other applications or use richer resource scheduling capabilities (e.g., queues), both YARN and Mesos provide these features. Of these, YARN will likely be preinstalled in many.

Hadoop distributions.

One advantage of Mesos over both YARN and standalone mode is its fine-grained sharing option, which lets interactive applications such as the Spark shell scale down their CPU allocation between commands. This makes it attractive in environments where multiple users are running interactive shells.

In all cases, it is best to run Spark on the same nodes as HDFS for fast access to storage. You can install Mesos or the standalone cluster manager on the same nodes manually, or most Hadoop distributions already install YARN and HDFS together.

86. What is DStreams in Spark Streaming?

  • Spark streaming uses a micro-batch architecture where the incoming data is grouped into micro-batches called Discretized Streams (DStreams), which also serves as the basic programming abstraction.
  • The DStreams internally have Resilient Distributed Datasets (RDD), and as a result of this standard, RDD transformations and actions can be done.

87. What is Stateless Transformation .?

The processing of each batch has no dependency on the data of previous batches called Stateless .Transformation. Stateless transformations are simple RDD transformations. It applies to every batch meaning every RDD in a DStream. It includes common RDD transformations like map(), filter(), reduceByKey() etc.

88. What is Stateful Transformation .?

It uses data or intermediate results from previous batches and computes the result of the current batch called Stateful Transformation. Stateful transformations are operations on DStreams that track data across time. Thus it makes use of some data from previous batches to generate the results for a new batch.

In streaming, if we have a use case to track data across batches, then we need state-full DStreams.

For example, we may track a user’s interaction in a website during the user session, or we may track a particular Twitter hashtag across time and see which users across the globe are talking about it.

Types of stateful transformation.

  • Stateful DStreams are of two types — window-based tracking and full session tracking.

For stateful tracking, all incoming data should be transformed to key-value pairs such that the key states can be tracked across batches. This is a precondition.

  • Window-based tracking

In window-based tracking, the incoming batches are grouped in time intervals, i.e., every ‘x’ seconds. Further computations on these batches are done using slide intervals.