Azure Synapse analytics — Data Flow and Synapse Spark — End to End -TPCH data

Balamurugan Balakreshnan
4 min readMar 25, 2023

Using Dataflow and Synapse Spark to analyze data and Spark ML modelling.

Using TPCH data

Pre-requisites

  • Azure subscription
  • Azure Storage Account
  • Azure Synapse Analytics
  • Load TPCH data
  • I had to limit line items data as it was 150 billion rows.

Dataset Rows

  • Orders: 15,000,000,000
  • Customers: 1,500,000,000
  • Lineitems: 279,286,998

Goal

  • Use Dataflow to analyze data
  • Use join and create year, month and day columns
  • Do aggregation for year, month and day and calculate sum of numeric columns
  • Try with Data flow first and then Synapse Spark
  • Finally use Spark ML to do modelling with aggregated data

Data Flow

  • All the data are in parquet files
  • Connect to the storage account and select the parquet files for Customers, orders and Lineitems
  • Here is the end to end flow
  • Customers
  • Orders
  • Lineitems
  • let’s now join orders with line item
  • now create year, month and day columns
  • Now join with orders to get order details
  • Now aggregate the data by year, month and day
  • Calculate aggregates
  • finally sink into ADLS as parquet
  • now set the partition

Next to Syanpse Spark Code

  • Same above but PySpark
  • Create a spark cluster with spark version 3.2
  • Choose Extra large 5 nodes
  • now time to code
  • Let’s bring libraries
from pyspark.sql.functions import *
  • Load the data from ADLS
  • Customer
%%pyspark
dfcustomer = spark.read.load('abfss://container@storageaccount.dfs.core.windows.net/tpch/CUSTOMER/*.parquet', format='parquet')
display(dfcustomer.limit(10))
  • Create a View
dfcustomer.createOrReplaceTempView("customers")
  • now orders
dforders = spark.read.load('abfss://containername@storageaccount.dfs.core.windows.net/tpch/ORDERS/*.parquet', format='parquet')
display(dforders.limit(10))
  • Create columns for year, month, day
dforders = dforders.withColumn("year", year(col("O_ORDERDATE")))
dforders = dforders.withColumn("month", month(col("O_ORDERDATE")))
dforders = dforders.withColumn("day", dayofmonth(col("O_ORDERDATE")))
  • Create a view
dforders.createOrReplaceTempView("orders")
  • Line items load from ADLS
dflineitems = spark.read.load('abfss://containername@storageaccount.dfs.core.windows.net/tpch1/LINEITEM/*.parquet', format='parquet')
display(dflineitems.limit(10))
  • Create view
dflineitems.createOrReplaceTempView("lineitems")`
  • Now Aggregation
dfaggr = spark.sql("Select a.year, a.month, a.day, a.O_CUSTKEY, a.O_ORDERDATE, sum(a.O_TOTALPRICE) as O_TOTALPRICE , sum(b.L_DISCOUNT) as L_DISCOUNT, sum(b.L_QUANTITY) as L_QUANTITY, sum(b.L_TAX) as L_TAX, sum(b.L_LINENUMBER) as L_LINENUMBER, sum(b.L_EXTENDEDPRICE) as L_EXTENDEDPRICE from orders a join lineitems b on a.O_ORDERKEY = b.L_ORDERKEY join customers c on a.O_CUSTKEY = c.C_CUSTKEY group by a.year, a.month, a.day, a.O_CUSTKEY, a.O_ORDERDATE")
  • Write the aggregation
dfaggr.repartition(6).write.mode("overwrite").parquet('abfss://root@synpasedlstore.dfs.core.windows.net/tpchoutputsparksql/')
  • let’s read and check
%%pyspark
df = spark.read.load('abfss://containername@storageaccount.dfs.core.windows.net/tpchoutputsparksql/*.snappy.parquet', format='parquet')
display(df.limit(10))

Synapse Spark Training

  • Read the data that was stored by synapse spark
%%pyspark
df = spark.read.load('abfss://containername@storageaccount.dfs.core.windows.net/tpchoutputsparksql/*.snappy.parquet', format='parquet')
display(df.limit(10))
  • import libraries
import pyspark
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
  • Display schema
df.printSchema()
  • Featurization
featureassembler = VectorAssembler(inputCols = ["year","month", "day","L_DISCOUNT", "L_QUANTITY"], outputCol = "Independent Features")
  • output
output = featureassembler.transform(df)
output.select("Independent Features").show()
  • setup label
finalised_data = output.select("Independent Features", "O_TOTALPRICE")
  • now split data
train_data, test_data = finalised_data.randomSplit([0.75, 0.25])
  • setup regression
regressor = LinearRegression(featuresCol = 'Independent Features', labelCol = 'O_TOTALPRICE')
regressor = regressor.fit(train_data)
  • display the metric
trainingSummary = regressor.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)
  • Now score or predict
pred_results = regressor.evaluate(test_data)
pred_results.predictions.show()
  • Calculate results
lr_predictions = regressor.transform(test_data)
lr_predictions.select("prediction","O_TOTALPRICE","Independent Features").show(5)
from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
labelCol="O_TOTALPRICE",metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions))
  • Print score or prediction metrics
print("numIterations: %d" % trainingSummary.totalIterations)
print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
trainingSummary.residuals.show()
  • Run the notebook

Pipeline creation

  • Create a new pipeline
  • First bring data flow
  • Runtime configuration
  • Now configure dataflow
  • Next bring in synapse spark for Syanpse spark aggregation
  • Next bring synapse spark for ML training
  • Save the pipeline and publish
  • Now run the end to end pipeline
  • Now show the lineage

Originally published at https://github.com.

--

--