Monday, March 23, 2020

Spark Core components and Execution Model


The core components of the spark application are:
* Driver
    * When we submit a Spark application in cluster mode using spark-submit, Driver will interact with the Cluster 
    Resource Manager to start the Application Master.
    * It is also convert user code into logical plan (DAG) and then convert to physical plan.
* Application Master
    * Driver request Application Master for executors for executing the user code, application Master will negotiates 
    the resources with the Resource Manager to host these executors.
* Spark Context
    * Driver will create Spark Context for each application and Spark Context is the main entry point for executing 
    any Spark functionality.
* Executors
    * Executors are processes on the worker nodes whose job is to execute the assigned tasks.

The Spark execution model can be defined in three phases: 
* Logical Plan
    * Converting user code into different steps which will be executed when an action is performed.
    * Logical plan will create DAG for how spark will execute all transformation
* Physical Plan
    * Converting Logical plan into Physical plan using Catalyst and Tungsten optimisation techniques.
    * Few methods while choosing/translating best physical plan using Catalyst optimiser.
        * Remove repeated operations(ex. adding of two numbers to perform for each row) 
        * Predicate pushdown : pushes the filters as close as possible to data sources.
        * Column pruning : only select needed columns. 
    * Tungsten: executing query plan on the actual cluster, which generate optimised code based on  query plan that 
    generated by Catalyst Optimiser
* Executions:
    * Physical plan is covered into number of stages and then convert into tasks.
    * Driver request the Cluster Manager and negotiates the resources. Cluster Manager will allocate containers and 
    launches executors on all the allocated containers and assigns tasks to run on behalf of the Driver.
 

RCFile vs ORC vs Parquet

Row major data:
 Advantages:
  1. Suppose we perform a operation select all columns from a table, then read each row one by one 
  and display the details.
 Disadvantages:
  1. If we select specific columns for a table, then we need to read all columns data and then skip unwanted 
       columns and fetch the only specific 
     column and it will continue same for all other rows. Because of read and skip unwanted data to reach 
     specific column , it will impact the 
     performance.
  2. Also because of different data types in each row, compression also not much efficient.

Column major data:
 Advantages:
  1. Store the data in columnar way. All column 1 for all records are stored together.
  2. Select specific column is vert fast, because of all data related to specific column is stored together.
  3. Storing same type of data together, Compression will be good.
 Disadvantages:
  1. If we want to selecting all columns data , then it is not much efficient

RCfile: Combination of both row major and column major formats. 
  Partition(called row group) the data into row based first, then for each partitions convert 
        into columnar file format. 
  Stored in Binary format.
  Default partition size is 4MB
 Disadvantages:
  1. No Metadata about columns
  2. Many partitions(row group) will be created for larger data sets because of default 
   partition size is 4MB , so sequential will suffer 
  3. Because of no metadata about columns, compression is less efficient

ORC: Similar to RC , ORC also divide row and columnar formats called stripes. Along with data it 
will also create indexes/statistics at stripes level as well as in file footer level.
Statistics like MIN and MAX for each column in data. 

Parquet: It is also a columnar file format, similar to RC and ORC, but parquet stores nested data 
structures in a flat columnar format. 
  Also support very good compression methods.

Tuesday, February 11, 2020

Kafka - Single broker setup

Apache Kafka

Apache Kafka -> Distributed Streaming platform -> Publish and subscribe to streams of records, similar to a message queue or enterprise message system -> Store streams of records in a fault-tolerant durable way - Keeping multiple copies of data - Replication -> Replication ( Leader - follower model) - One broker is selected as a Leader - Producer will connect to Leader and send data - Consumer request Leader for required data - Depends on Replication factor, Followers will copy data from Leader -> Process stream of records as they occur Key Components in Kafka -> Producer : An application that sends a data/message -> Message : Small piece of data( Array of bites) -> Consumer : An application that receive data/message -> Brokers : Kafka server -> Cluster : Group of instances and each will act as one broker -> Topic : Unique name to a data stream -> Partitions : Distributing topic data into multiple partitions -> Offset : Sequence number given to a message -> Consumer Group : Group of Consumers acting as single Apache Kafka setup 1. Download tar file from apache.org curl -k https://www-us.apache.org/dist/kafka/2.3.1/kafka-2.3.1-src.tgz > kafka-2.3.1-src.tgz tar -xzf kafka-2.3.1-src.tgz 2. Update server.properties files Broker.id=1 listeners = PLAINTEXT://:9092 log.dirs=/home/bigdata/opt/kafka/kafka-2.3.1/logs 3. Zookeeper setup - Coordination service for a distributed system wget https://archive.apache.org/dist/zookeeper/zookeeper-3.5.6/apache-zookeeper-3.5.6-bin.tar.gz tar -xvf apache-zookeeper-3.5.6-bin.tar.gz 4. Update Zoo.cgf file tickTime=200 dataDir=/home/bigdata/opt/zookeeper/apache-zookeeper-3.5.6/data clientPort=2181 maxClientCnxns=60 initLimit=10 syscLimit=5 5. Start Zookeeper services bin/Zookeeper-server-start.sh config/zookeeper.properties 6. Starting Kafka Broker server bin/kafka-server-start.sh config/server.properties 7. Creating a Topic bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partition 1 --topic Demo 8. Starting Producer console to publish messages into Topic bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Demo 9. Starting Consumer console to consume messages from Topic bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic Demo --from-beginning Producer - Callback and acknowledge methods 1. Send and Forget Send a message to broker and wont check whether the message is delivered successfully or not 2. Synchronous send Send a message and wait for response Success : we get record metadata object Fail : Exception 3. Asynchronous send Send a message and provide callback function to receive acknowledge. We can record message details for later analysis.