Spark Scala process base 64 message Stream processing

Balamurugan Balakreshnan
2 min readNov 1, 2020

--

Event hub messages with base 64 encoded message.

Use case

  • Iot Hub and Event Hub uses Avro format and store message with base 64 encoded message
  • Parse the data using structed streaming
  • Parse the body column which is base 64 encoded

Pre requisite

  • Azure subscription
  • Create a Event hub name space
  • Select Standard since schema registry is not available in basic
  • create a event hub with 1 partition
  • create a consumer group called sample1
  • Create Azure Data bricks workspace
  • Spark 3.0
  • Create a Event hub cluster
  • Install event hub library jar from Maven: com.microsoft.azure:azure-eventhubs-spark_2.12:2.3.17

Simulator to create and send data to event hub

  • https://eventhubdatagenerator.azurewebsites.net/
  • Copy the Event hub connection string
  • Copy the name of event hub and paste it here
  • leave the JSON message as like how it is
  • change the number of messages to 500
  • Click submit
  • wait for few seconds to load the data

Azure data bricks code

import org.apache.spark.eventhubs import org.apache.spark.eventhubs.{ ConnectionStringBuilder, EventHubsConf, EventPosition } import org.apache.spark.eventhubs.{ ConnectionStringBuilder, EventHubsConf, EventPosition } import org.apache.spark.sql.functions.{ explode, split }
  • setup event hub config
  • Configure to read the stream from beginning.
import org.apache.spark.eventhubs
import org.apache.spark.eventhubs.{ ConnectionStringBuilder, EventHubsConf, EventPosition }
import org.apache.spark.eventhubs.{ ConnectionStringBuilder, EventHubsConf, EventPosition }
import org.apache.spark.sql.functions.{ explode, split }
  • setup event hub config
  • Configure to read the stream from beginning.
# Start from beginning of stream
val eventhubsas = dbutils.secrets.get(scope = "allsecrects", key = "schregistryevethub")
  • Create a connectionString variable and store connection string
import org.apache.spark.eventhubs.ConnectionStringBuilder

val connectionString = ConnectionStringBuilder()
.setNamespaceName("acceventschema")
.setEventHubName("schemasample")
.setSasKeyName("adbaccess")
.setSasKey(eventhubsas)
.build
  • setup the configuration for read stream
val eventHubsConf = EventHubsConf(connectionString)
.setStartingPosition(EventPosition.fromStartOfStream)
  • Setup Stream now
var streamingInputDF = 
spark.readStream
.format("eventhubs")
.options(eventHubsConf.toMap)
.load()
  • Now parse the json propertise
var streamingSelectDF = 
streamingInputDF
.select(get_json_object(($"body").cast("string"), "$.sensor_id").alias("sensor_id"),
get_json_object(($"body").cast("string"), "$.sensor_temp").alias("sensor_temp"),
get_json_object(($"body").cast("string"), "$.sensor_status").alias("sensor_status"))
  • Display the variables
display(streamingSelectDF)

Original Story in Github

--

--

No responses yet