Thursday, February 4, 2021

Python - Converting Rows values to Columns names

	import pandas as pd
    
	InputFile = 'location of txt file' 
	read_file = pd.read_csv(InputFile, sep=r'|', names = ['col1','col2'], skipinitialspace=True).fillna('')
	read_file['sno'] = read_file.groupby(['col1']).cumcount()+1
	df = read_file.pivot(index='sno', columns='col1', values='col2').reset_index()
	df.to_csv('Output Path',index=False)

Monday, March 23, 2020

Spark Core components and Execution Model


The core components of the spark application are:
* Driver
    * When we submit a Spark application in cluster mode using spark-submit, Driver will interact with the Cluster 
    Resource Manager to start the Application Master.
    * It is also convert user code into logical plan (DAG) and then convert to physical plan.
* Application Master
    * Driver request Application Master for executors for executing the user code, application Master will negotiates 
    the resources with the Resource Manager to host these executors.
* Spark Context
    * Driver will create Spark Context for each application and Spark Context is the main entry point for executing 
    any Spark functionality.
* Executors
    * Executors are processes on the worker nodes whose job is to execute the assigned tasks.

The Spark execution model can be defined in three phases: 
* Logical Plan
    * Converting user code into different steps which will be executed when an action is performed.
    * Logical plan will create DAG for how spark will execute all transformation
* Physical Plan
    * Converting Logical plan into Physical plan using Catalyst and Tungsten optimisation techniques.
    * Few methods while choosing/translating best physical plan using Catalyst optimiser.
        * Remove repeated operations(ex. adding of two numbers to perform for each row) 
        * Predicate pushdown : pushes the filters as close as possible to data sources.
        * Column pruning : only select needed columns. 
    * Tungsten: executing query plan on the actual cluster, which generate optimised code based on  query plan that 
    generated by Catalyst Optimiser
* Executions:
    * Physical plan is covered into number of stages and then convert into tasks.
    * Driver request the Cluster Manager and negotiates the resources. Cluster Manager will allocate containers and 
    launches executors on all the allocated containers and assigns tasks to run on behalf of the Driver.
 

RCFile vs ORC vs Parquet

Row major data:
 Advantages:
  1. Suppose we perform a operation select all columns from a table, then read each row one by one 
  and display the details.
 Disadvantages:
  1. If we select specific columns for a table, then we need to read all columns data and then skip unwanted 
       columns and fetch the only specific 
     column and it will continue same for all other rows. Because of read and skip unwanted data to reach 
     specific column , it will impact the 
     performance.
  2. Also because of different data types in each row, compression also not much efficient.

Column major data:
 Advantages:
  1. Store the data in columnar way. All column 1 for all records are stored together.
  2. Select specific column is vert fast, because of all data related to specific column is stored together.
  3. Storing same type of data together, Compression will be good.
 Disadvantages:
  1. If we want to selecting all columns data , then it is not much efficient

RCfile: Combination of both row major and column major formats. 
  Partition(called row group) the data into row based first, then for each partitions convert 
        into columnar file format. 
  Stored in Binary format.
  Default partition size is 4MB
 Disadvantages:
  1. No Metadata about columns
  2. Many partitions(row group) will be created for larger data sets because of default 
   partition size is 4MB , so sequential will suffer 
  3. Because of no metadata about columns, compression is less efficient

ORC: Similar to RC , ORC also divide row and columnar formats called stripes. Along with data it 
will also create indexes/statistics at stripes level as well as in file footer level.
Statistics like MIN and MAX for each column in data. 

Parquet: It is also a columnar file format, similar to RC and ORC, but parquet stores nested data 
structures in a flat columnar format. 
  Also support very good compression methods.

Tuesday, February 11, 2020

Kafka - Single broker setup

Apache Kafka

Apache Kafka -> Distributed Streaming platform -> Publish and subscribe to streams of records, similar to a message queue or enterprise message system -> Store streams of records in a fault-tolerant durable way - Keeping multiple copies of data - Replication -> Replication ( Leader - follower model) - One broker is selected as a Leader - Producer will connect to Leader and send data - Consumer request Leader for required data - Depends on Replication factor, Followers will copy data from Leader -> Process stream of records as they occur Key Components in Kafka -> Producer : An application that sends a data/message -> Message : Small piece of data( Array of bites) -> Consumer : An application that receive data/message -> Brokers : Kafka server -> Cluster : Group of instances and each will act as one broker -> Topic : Unique name to a data stream -> Partitions : Distributing topic data into multiple partitions -> Offset : Sequence number given to a message -> Consumer Group : Group of Consumers acting as single Apache Kafka setup 1. Download tar file from apache.org curl -k https://www-us.apache.org/dist/kafka/2.3.1/kafka-2.3.1-src.tgz > kafka-2.3.1-src.tgz tar -xzf kafka-2.3.1-src.tgz 2. Update server.properties files Broker.id=1 listeners = PLAINTEXT://:9092 log.dirs=/home/bigdata/opt/kafka/kafka-2.3.1/logs 3. Zookeeper setup - Coordination service for a distributed system wget https://archive.apache.org/dist/zookeeper/zookeeper-3.5.6/apache-zookeeper-3.5.6-bin.tar.gz tar -xvf apache-zookeeper-3.5.6-bin.tar.gz 4. Update Zoo.cgf file tickTime=200 dataDir=/home/bigdata/opt/zookeeper/apache-zookeeper-3.5.6/data clientPort=2181 maxClientCnxns=60 initLimit=10 syscLimit=5 5. Start Zookeeper services bin/Zookeeper-server-start.sh config/zookeeper.properties 6. Starting Kafka Broker server bin/kafka-server-start.sh config/server.properties 7. Creating a Topic bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partition 1 --topic Demo 8. Starting Producer console to publish messages into Topic bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Demo 9. Starting Consumer console to consume messages from Topic bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic Demo --from-beginning Producer - Callback and acknowledge methods 1. Send and Forget Send a message to broker and wont check whether the message is delivered successfully or not 2. Synchronous send Send a message and wait for response Success : we get record metadata object Fail : Exception 3. Asynchronous send Send a message and provide callback function to receive acknowledge. We can record message details for later analysis.

Friday, July 1, 2016

Apache Kafka

Installing Apache Kafka Steps to Install Apache Kafka and Testing 1. Download "kafka_2.11-0.10.0.0.tgz" from Apache. 2. Extract files from tgz file tar -xvzf kafka_2.11-0.10.0.0.tgz 3. Starting zookeeper server bin/zookeeper-server-start.sh config/zookeeper.properties 4. Starting Kafka Broker server bin/kafka-server-start.sh config/server.properties 5. Creating a Topic bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partition 1 --topic topictest 6. Starting Producer console to publish messages into Topic bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topictest 7. Starting Consumer console to consume messages from Topic bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic topictest --from-beginning

Wednesday, June 22, 2016

Hive - Working with JSON files

Hive has two built-in functions, get_json_object and json_tuple for dealing with JSON. There are also a couple of JSON SerDe's (Serializer/Deserializers)- OPENX, Cloudera and Hcatalog for Hive.

Handling JSON using Build in Functions { "Product": "Laptop", "ID": "123456", "Address": { "BlockNo": 12, "Place": "Bangalore" } } CREATE TABLE json_test ( json string ); LOAD DATA LOCAL INPATH '/test.json' INTO TABLE json_test; Using get_json_Object select get_json_object(json_test, '$.Product') as Product, get_json_object(json_test, '$.ID') as ID, get_json_object(json_test, '$.Address.BlockNo') as BlockNo, get_json_object(json_test, '$.Place.Bangalore') as Place from json_test; Using json_tuple select v.Product, v.ID, v.Address, v.BlockNo from json_test test LATERAL VIEW json_tuple('Product', 'ID', 'Address', 'Address.BlockNo') v as Product, ID, Address, BlockNo;

Handling JSON using SerDe's Converting JSON schema to Hive table Create Schema Step 1: Open a file in notepad++ Replace all " with empty Replace all :{ with :struct< Replace all :[ with :array< Replace all } with > Replace all { with struct< Replace all ] with > Replace all Null with STRING Replace all hive keyword ( function, group) with 'function' or 'group' Replace all field values to STRING Step 2: Example JSON file { "Product": "Laptop", "ID": "123456", "Address": { "BlockNo": 12, "Place": "Bangalore" } } Step 3:Creating Hive Serde table CREATE TABLE json_serde ( Product string, ID string, Address struct ) ROW FORMAT SERDE 'com.cloudera.hive.serde.JSONSerDe'; Step 4 : LOAD DATA LOCAL INPATH '/test.json' INTO TABLE json_serde; Step 5: SELECT Product,ID,Address.BlockNo as BlockNo, Address.Place as location from json_serde;

Saturday, May 17, 2014

SQL Server : Isolation Levels

ACID (an acronym for Atomicity Consistency Isolation Durability) is a concept that Database Professionals generally look for when evaluating databases and application architectures. For a reliable database all this four attributes should be achieved. Atomicity is an all-or-none proposition. Consistency guarantees that a transaction never leaves your database in a half-finished state. Isolation keeps transactions separated from each other until they’re finished. Durability guarantees that the database will keep track of pending changes in such a way that the server can recover from an abnormal termination.

Types of Isolation Levels

The ISO standard defines the following isolation levels, all of which are supported by the SQL Server Database Engine: 1. Read uncommitted : (the lowest level where transactions are isolated only enough to ensure that physically corrupt data is not read) 2. Read committed : (Database Engine default level) 3. Repeatable read 4. Serializable : the highest level, where transactions are completely isolated from one another) 5. Snapshot : The snapshot isolation level uses row versioning to provide transaction-level read consistency. Read operations acquire no page or row locks; only SCH-S table locks are acquired. When reading rows modified by another transaction, they retrieve the version of the row that existed when the transaction started. You can only use Snapshot isolation against a database when the ALLOW_SNAPSHOT_ISOLATION database option is set ON. By default, this option is set OFF for user databases.

Read phenomena

The ANSI/ISO standard SQL 92 refers to three different read phenomena. Dirty read (uncommitted dependency) occurs when a transaction is allowed to read data from a row that has been modified by another running transaction and not yet committed. Non-repeatable read occurs, when during the course of a transaction, a row is retrieved twice and the values within the row differ between reads. Non-repeatable reads phenomenon may occur in a lock-based concurrency control method when read locks are not acquired when performing a SELECT, or when the acquired locks on affected rows are released as soon as the SELECT operation is performed. Under the multi-version concurrency control method, non-repeatable reads may occur when the requirement that a transaction affected by a commit conflict must roll back is relaxed. Phantom read occurs when, in the course of a transaction, two identical queries are executed, and the collection of rows returned by the second query is different from the first. This can occur when range locks are not acquired on performing a SELECT ... WHERE operation. The phantom reads anomaly is a special case of Non-repeatable reads when Transaction 1 repeats a ranged SELECT ... WHERE query and, between both operations, Transaction 2 creates (i.e. INSERT) new rows (in the target table) which fulfill that WHERE clause.

Isolation Levels vs. Read Phenomena

Locks Isolation Levels Dirty reads Non-repeatable reads Phantoms
Nolock Read Uncommitted May Occur May Occur May Occur
Shared Locks Read Committed -- May Occur May Occur
Exclusive Range Repeatable Read -- -- May Occur
Exclusive lock on entire table Serializable -- -- --