Spark Scala process base 64 message Stream processing
2 min readNov 1, 2020
Event hub messages with base 64 encoded message.
Use case
- Iot Hub and Event Hub uses Avro format and store message with base 64 encoded message
- Parse the data using structed streaming
- Parse the body column which is base 64 encoded
Pre requisite
- Azure subscription
- Create a Event hub name space
- Select Standard since schema registry is not available in basic
- create a event hub with 1 partition
- create a consumer group called sample1
- Create Azure Data bricks workspace
- Spark 3.0
- Create a Event hub cluster
- Install event hub library jar from Maven: com.microsoft.azure:azure-eventhubs-spark_2.12:2.3.17
Simulator to create and send data to event hub
- https://eventhubdatagenerator.azurewebsites.net/
- Copy the Event hub connection string
- Copy the name of event hub and paste it here
- leave the JSON message as like how it is
- change the number of messages to 500
- Click submit
- wait for few seconds to load the data
Azure data bricks code
import org.apache.spark.eventhubs import org.apache.spark.eventhubs.{ ConnectionStringBuilder, EventHubsConf, EventPosition } import org.apache.spark.eventhubs.{ ConnectionStringBuilder, EventHubsConf, EventPosition } import org.apache.spark.sql.functions.{ explode, split }
- setup event hub config
- Configure to read the stream from beginning.
import org.apache.spark.eventhubs
import org.apache.spark.eventhubs.{ ConnectionStringBuilder, EventHubsConf, EventPosition }
import org.apache.spark.eventhubs.{ ConnectionStringBuilder, EventHubsConf, EventPosition }
import org.apache.spark.sql.functions.{ explode, split }
- setup event hub config
- Configure to read the stream from beginning.
# Start from beginning of stream
val eventhubsas = dbutils.secrets.get(scope = "allsecrects", key = "schregistryevethub")
- Create a connectionString variable and store connection string
import org.apache.spark.eventhubs.ConnectionStringBuilder
val connectionString = ConnectionStringBuilder()
.setNamespaceName("acceventschema")
.setEventHubName("schemasample")
.setSasKeyName("adbaccess")
.setSasKey(eventhubsas)
.build
- setup the configuration for read stream
val eventHubsConf = EventHubsConf(connectionString)
.setStartingPosition(EventPosition.fromStartOfStream)
- Setup Stream now
var streamingInputDF =
spark.readStream
.format("eventhubs")
.options(eventHubsConf.toMap)
.load()
- Now parse the json propertise
var streamingSelectDF =
streamingInputDF
.select(get_json_object(($"body").cast("string"), "$.sensor_id").alias("sensor_id"),
get_json_object(($"body").cast("string"), "$.sensor_temp").alias("sensor_temp"),
get_json_object(($"body").cast("string"), "$.sensor_status").alias("sensor_status"))
- Display the variables
display(streamingSelectDF)
Original Story in Github