Saturday 6 October 2018

Debugging and Remote Submission of Spark Job in HDInsight Spark Cluster


Introduction


In this article we are trying to create a Scala program in IntelliJ and debug it locally and then debug the same program within HDInsight spark cluster and finally submit the Spark Job within HDInsight spark cluster remotely.

Steps Involved:
  1. From IntelleJ create a application that can read data from Blob storage and write count output in blob storage
  2. Run the file in HDInsight (IntelleJ HDInsight submission)
  3. Create JAR file
  4. Upload JAR file into BLOB storage
  5. Install cURL utility for command line sumbission
  6. Submit the SPARK job with cURL command line.

Our HDInsight Spark Cluster should be BLOB as Primary Storage. This Process NOT work if the HDInsight Spark cluster Primary storage is Data Lake Storage (ADLS)


Download Link:



Sunday 24 June 2018

Spark working with Unstructured data


Introduction
In my previous article with Spark, we worked with structure and semi structure data source. Here in this article we are trying to work with unstructured data source.
Hope it will be interesting.

Case Study
We have a note book and we want to find number of work count in it.



Scala Code

val dfsFilename = "D:/spark/bin/examples/src/main/resources/notebook.txt"

val text = sc.textFile(dfsFilename)
val counts = text.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_+_)
counts.collect.foreach(println)

Output






Hope it will be interesting.

Posted by: MR. JOYDEEP DAS

Wednesday 20 June 2018

Spark to Connect with Azure SQL DB and read Table


Introduction
In this article we are trying to connect spark with Azure SQL DB and just simply read a table.
Hope it will be interesting.

Scala Code
import com.microsoft.azure.sqldb.spark.bulkcopy.BulkCopyMetadata
import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._
val url = "[Enter your url here]"
val databaseName = "[Enter your database name here]"
val dbTable = "[Enter your database table here]"
val user = "[Enter your username here]"
val password = "[Enter your password here]"

// READ FROM CONFIG
val readConfig = Config(Map(
  "url"            -> url,
  "databaseName"   -> databaseName,
  "user"           -> user,
  "password"       -> password,
  "connectTimeout" -> "5",
  "queryTimeout"   -> "5",
  "dbTable"        -> dbTable
))
val df = sqlContext.read.sqlDB(readConfig)
println("Total rows: " + df.count)
df.show()
// TRADITIONAL SYNTAX
import java.util.Properties
val properties = new Properties()
properties.put("databaseName", databaseName)
properties.put("user", user)
properties.put("password", password)
properties.put("connectTimeout", "5")
properties.put("queryTimeout", "5")
val df = sqlContext.read.sqlDB(url, dbTable, properties)
println("Total rows: " + df.count)
df.show()



Hope you like it.



Posted by: MR. JOYDEEP DAS



Monday 18 June 2018

SSIS Folder Traversing in SPARK SQL


Introduction
Here in this article, we are trying to demonstrate Folder Traversing of SSIS ForEach loop container for searching a specified file.
Hope it will be interesting

Scenario
We have a folder named “Sample”. Under this folder, we have three other folder named “Sample-1”, “Sample-2” and “Sample-3”. For each folder there is a flat file named “Student-1.txt”,”Student-2.txt” and “Student-3.txt”.

We need to read the entire file from different folder location
The folder and file structure is displayed by DOS TREE command



Scala Code
//---------------------------------------
// Scala for SPARK to Read Flat File form Different Folder
// Implementation Folder Traversing of SSIS in Spark
// Creation Date: 06/18/2018
//-----------------------------------------
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.Encoder
import spark.implicits._


case class Student(roll: Long, name: String)

val employeeDF = spark.sparkContext.textFile("d:/spark/bin/examples/src/main/resources/sample/*/student-*.txt").map(_.split(",")).map(attributes=>Student(attributes(0).trim.toInt, attributes(1).trim)).toDF()


employeeDF.createOrReplaceTempView("student")


val youngstersDF = spark.sql("SELECT roll, name FROM student")

youngstersDF.show

Output






Hope you like it.



Posted By: MR. JOYDEEP DAS

Sunday 17 June 2018

SSIS Conditional Split with SPARK SQL


Introduction
Here in this article we are trying to make SSIS conditional Split Transform by using SPARK SQL. It is called Split Data Frame using Filter Transform.
Hope it will be interesting.

What We Want to Do
We have a Flat File Named Student Marks Details mentioned bellow.

101,Joydeep Das,Math,10
102,Deepasree Das,Math,89
103,Shipra Roy Chowdhury,Math,100
104,Rajesh Roy,Math,45
105,Sunita Tendon,Math,98
106,Amit Basu,Math,20
107,Debalina Bhattacharya,Math,99
108,Pritam Das,Math,40
109,Partha Deb Das,Math,89
110,Onkita Gupta,Math,100

The file contains Student Roll, Student Name, Subject and Marks. Based on Marks we need to display records

If [ Marks ] > 50
Display Records
If [ Marks ] < 50
Display Records

Scala Code
//---------------------------------------
// Scala for SPARK to Read Flat File
// Make Conditional Split Depends on Makes (Marks>50)
// Creation Date: 06/172018
//-----------------------------------------
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.Encoder
import spark.implicits._

case class Student(roll: Long, name: String, subject: String, marks: Long)

//Read FLAT File
val studentDF = spark.sparkContext.textFile("d:/spark/bin/examples/src/main/resources/studentmarksdetails.txt").map(_.split(",")).map(attributes => Student(attributes(0).trim.toInt, attributes(1).trim, attributes(2).trim, attributes(3).trim.toInt)).toDF()

studentDF.cache() // recommended to prevent repeating the calculation

val condition = col("marks") > 50 // Condition
val studentDF1 = studentDF.filter(condition)
val studentDF2 = studentDF.filter(not(condition))

//Making View for FLAT file
studentDF1.createOrReplaceTempView("studentrecord1")
studentDF2.createOrReplaceTempView("studentrecord2")

val youngstersDF1 = spark.sql("SELECT roll, name, subject, marks FROM studentrecord1")
youngstersDF1.show

val youngstersDF2 = spark.sql("SELECT roll, name, subject, marks FROM studentrecord2")
youngstersDF2.show

Output





Hope you like it.

Posted by: MR. JOYDEEP DAS

Friday 15 June 2018

Download JSON file from Azure Storage and Read it by SSIS – Part 2

Introduction

This article is the continuation of my previous article named “Download JSON file from Azure Storage and Read it by SSIS”.
In this article we are going to read the JSON file and store it in Relational Database Table object. For that we are using Script Component. Hope it will be interesting.

JSON File

The sample of the example JSON file




Data Flow Task






Script Component Settings
Connection Manager with JSON File
Input Output Columns

Edit Script
References needed Syste.Web.Entry

Name Spaces needed:
using System;
using System.Data;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
using Microsoft.SqlServer.Dts.Runtime.Wrapper;
using System.Web.Script.Serialization;

C# Code:
#region Help:  Introduction to the Script Component
/* The Script Component allows you to perform virtually any operation that can be accomplished in
 * a .Net application within the context of an Integration Services data flow.
 *
 * Expand the other regions which have "Help" prefixes for examples of specific ways to use
 * Integration Services features within this script component. */
#endregion

#region Namespaces
using System;
using System.Data;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
using Microsoft.SqlServer.Dts.Runtime.Wrapper;
using System.Web.Script.Serialization;
#endregion

/// <summary>
/// This is the class to which to add your code.  Do not change the name, attributes, or parent
/// of this class.
/// </summary>
[Microsoft.SqlServer.Dts.Pipeline.SSISScriptComponentEntryPointAttribute]
public class ScriptMain : UserComponent
{
   
    IDTSConnectionManager100 connMgr;

    /// <summary>
    /// This method is called once, before rows begin to be processed in the data flow.
    ///
    /// You can remove this method if you don't need to do anything here.
    /// </summary>
    public override void PreExecute()
    {
        base.PreExecute();
        /*
         * Add your code here
         */
        connMgr = this.Connections.MyFlatFileConnectionMgr;
    }

    /// <summary>
    /// This method is called after all the rows have passed through this component.
    ///
    /// You can delete this method if you don't need to do anything here.
    /// </summary>
    public override void PostExecute()
    {
        base.PostExecute();
        /*
         * Add your code here
         */
    }

    public override void CreateNewOutputRows()
    {
        /*
          Add rows by calling the AddRow method on the member variable named "<Output Name>Buffer".
          For example, call MyOutputBuffer.AddRow() if your output was named "MyOutput".
        */
        JavaScriptSerializer js = new JavaScriptSerializer();

        byte[] jsonbyte = System.IO.File.ReadAllBytes(connMgr.ConnectionString);
        string reviewConverted = System.Text.Encoding.ASCII.GetString(jsonbyte);


        // deserialize the string
        jData jdata = js.Deserialize<jData>(reviewConverted);

        foreach (files f in jdata.rows)
        {
            Output0Buffer.AddRow();
           
            Output0Buffer.id = string.IsNullOrEmpty(f.id) ? "" : f.id.ToString();
            Output0Buffer.dis = string.IsNullOrEmpty(f.dis) ? "" : f.dis.ToString();
            Output0Buffer.siteRef = string.IsNullOrEmpty(f.siteRef) ? "" : f.siteRef.ToString();
            Output0Buffer.assetGmars = string.IsNullOrEmpty(f.assetGmars) ? "" : f.assetGmars.ToString();
            Output0Buffer.assetVfa = string.IsNullOrEmpty(f.assetVfa) ? "" : f.assetVfa.ToString();
            Output0Buffer.equipRef = string.IsNullOrEmpty(f.equipRef) ? "" : f.equipRef.ToString();
        }
    }

}

internal class jData
{
    public files[] rows { get; set; }
}

internal class files
{
    public string id { get; set; }
    public string dis { get; set; }
    public string siteRef { get; set; }
    public string assetGmars { get; set; }
    public string assetVfa { get; set; }
    public string equipRef { get; set; }
}


Hope you like it.



Posted By: MR. JOYDEEP DAS