Azure Synapse analytics — Data Flow and Synapse Spark — End to End -TPCH data

Balamurugan Balakreshnan
4 min readMar 25, 2023

Using Dataflow and Synapse Spark to analyze data and Spark ML modelling.

Using TPCH data


  • Azure subscription
  • Azure Storage Account
  • Azure Synapse Analytics
  • Load TPCH data
  • I had to limit line items data as it was 150 billion rows.

Dataset Rows

  • Orders: 15,000,000,000
  • Customers: 1,500,000,000
  • Lineitems: 279,286,998


  • Use Dataflow to analyze data
  • Use join and create year, month and day columns
  • Do aggregation for year, month and day and calculate sum of numeric columns
  • Try with Data flow first and then Synapse Spark
  • Finally use Spark ML to do modelling with aggregated data

Data Flow

  • All the data are in parquet files
  • Connect to the storage account and select the parquet files for Customers, orders and Lineitems
  • Here is the end to end flow
  • Customers
  • Orders
  • Lineitems
  • let’s now join orders with line item
  • now create year, month and day columns
  • Now join with orders to get order details
  • Now aggregate the data by year, month and day
  • Calculate aggregates
  • finally sink into ADLS as parquet
  • now set the partition

Next to Syanpse Spark Code

  • Same above but PySpark
  • Create a spark cluster with spark version 3.2
  • Choose Extra large 5 nodes
  • now time to code
  • Let’s bring libraries
from pyspark.sql.functions import *
  • Load the data from ADLS
  • Customer
dfcustomer ='abfss://*.parquet', format='parquet')
  • Create a View
  • now orders
dforders ='abfss://*.parquet', format='parquet')
  • Create columns for year, month, day
dforders = dforders.withColumn("year", year(col("O_ORDERDATE")))
dforders = dforders.withColumn("month", month(col("O_ORDERDATE")))
dforders = dforders.withColumn("day", dayofmonth(col("O_ORDERDATE")))
  • Create a view
  • Line items load from ADLS
dflineitems ='abfss://*.parquet', format='parquet')
  • Create view
  • Now Aggregation
dfaggr = spark.sql("Select a.year, a.month,, a.O_CUSTKEY, a.O_ORDERDATE, sum(a.O_TOTALPRICE) as O_TOTALPRICE , sum(b.L_DISCOUNT) as L_DISCOUNT, sum(b.L_QUANTITY) as L_QUANTITY, sum(b.L_TAX) as L_TAX, sum(b.L_LINENUMBER) as L_LINENUMBER, sum(b.L_EXTENDEDPRICE) as L_EXTENDEDPRICE from orders a join lineitems b on a.O_ORDERKEY = b.L_ORDERKEY join customers c on a.O_CUSTKEY = c.C_CUSTKEY group by a.year, a.month,, a.O_CUSTKEY, a.O_ORDERDATE")
  • Write the aggregation
  • let’s read and check
df ='abfss://*.snappy.parquet', format='parquet')

Synapse Spark Training

  • Read the data that was stored by synapse spark
df ='abfss://*.snappy.parquet', format='parquet')
  • import libraries
import pyspark
from pyspark.sql import SparkSession
from import LinearRegression
from import VectorAssembler
  • Display schema
  • Featurization
featureassembler = VectorAssembler(inputCols = ["year","month", "day","L_DISCOUNT", "L_QUANTITY"], outputCol = "Independent Features")
  • output
output = featureassembler.transform(df)"Independent Features").show()
  • setup label
finalised_data ="Independent Features", "O_TOTALPRICE")
  • now split data
train_data, test_data = finalised_data.randomSplit([0.75, 0.25])
  • setup regression
regressor = LinearRegression(featuresCol = 'Independent Features', labelCol = 'O_TOTALPRICE')
regressor =
  • display the metric
trainingSummary = regressor.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)
  • Now score or predict
pred_results = regressor.evaluate(test_data)
  • Calculate results
lr_predictions = regressor.transform(test_data)"prediction","O_TOTALPRICE","Independent Features").show(5)
from import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions))
  • Print score or prediction metrics
print("numIterations: %d" % trainingSummary.totalIterations)
print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
  • Run the notebook

Pipeline creation

  • Create a new pipeline
  • First bring data flow
  • Runtime configuration
  • Now configure dataflow
  • Next bring in synapse spark for Syanpse spark aggregation
  • Next bring synapse spark for ML training
  • Save the pipeline and publish
  • Now run the end to end pipeline
  • Now show the lineage

Originally published at