Spark Scala process base 64 message Stream processing

Event hub messages with base 64 encoded message.

Use case

  • Iot Hub and Event Hub uses Avro format and store message with base 64 encoded message

Pre requisite

  • Azure subscription

Simulator to create and send data to event hub

  • https://eventhubdatagenerator.azurewebsites.net/

Azure data bricks code

import org.apache.spark.eventhubs import org.apache.spark.eventhubs.{ ConnectionStringBuilder, EventHubsConf, EventPosition } import org.apache.spark.eventhubs.{ ConnectionStringBuilder, EventHubsConf, EventPosition } import org.apache.spark.sql.functions.{ explode, split }
  • setup event hub config
import org.apache.spark.eventhubs
import org.apache.spark.eventhubs.{ ConnectionStringBuilder, EventHubsConf, EventPosition }
import org.apache.spark.eventhubs.{ ConnectionStringBuilder, EventHubsConf, EventPosition }
import org.apache.spark.sql.functions.{ explode, split }
  • setup event hub config
# Start from beginning of stream
val eventhubsas = dbutils.secrets.get(scope = "allsecrects", key = "schregistryevethub")
  • Create a connectionString variable and store connection string
import org.apache.spark.eventhubs.ConnectionStringBuilder

val connectionString = ConnectionStringBuilder()
.setNamespaceName("acceventschema")
.setEventHubName("schemasample")
.setSasKeyName("adbaccess")
.setSasKey(eventhubsas)
.build
  • setup the configuration for read stream
val eventHubsConf = EventHubsConf(connectionString)
.setStartingPosition(EventPosition.fromStartOfStream)
  • Setup Stream now
var streamingInputDF = 
spark.readStream
.format("eventhubs")
.options(eventHubsConf.toMap)
.load()
  • Now parse the json propertise
var streamingSelectDF = 
streamingInputDF
.select(get_json_object(($"body").cast("string"), "$.sensor_id").alias("sensor_id"),
get_json_object(($"body").cast("string"), "$.sensor_temp").alias("sensor_temp"),
get_json_object(($"body").cast("string"), "$.sensor_status").alias("sensor_status"))
  • Display the variables
display(streamingSelectDF)

Original Story in Github