BeatBlast-RealTime-Streaming-Pipeline

BeatBlast: Stream Processing Analysis & Report

Author: Tejashwini Saravanan
Course: ISM6362 Big Data and Cloud-Based Tools
Date: July 19, 2025

👉 Click here to view the original PDF Report

👉 View the Full Structured Streaming Script (PDF)


1. Project Objective

The goal of this project was to simulate a real-time event stream for a music platform (BeatBlast) and build a processing pipeline using Apache Spark Structured Streaming. The system handles high-velocity JSON data, performs windowed aggregations, and persists results into a partitioned Data Lake for long-term analytics.


2. Technical Justifications

Why a 10-minute watermark?

I chose a 10-minute watermark to provide a realistic buffer for late-arriving data. This ensures the system can account for minor network latencies while still being able to clear old state from memory to maintain high performance.

Partitioning Strategy: year/month/day/country

Structured Streaming vs. DStreams

Structured Streaming was preferred over the older DStream API because it offers:


3. Real-Time Results & Output

| Window Start | Window End | songId | play_count | | :— | :— | :— | :— | | 2025-07-18 22:50:00 | 2025-07-18 23:00:00 | song_002 | 13 | | 2025-07-18 22:50:00 | 2025-07-18 23:00:00 | song_001 | 12 |

Data Lake Storage Structure

/beatblast_datalake/song_plays/
 └── year=2025/
     └── month=07/
         └── day=18/
             ├── country=CA/
             ├── country=GB/
             ├── country=IN/
             └── country=US/

🛠️ Implementation Details

Schema Definition

To ensure data integrity, a strict schema was applied to the incoming JSON stream:

from pyspark.sql.types import StructType, StructField, StringType, TimestampType

event_schema = StructType([
    StructField("eventType", StringType(), True),
    StructField("eventTimestamp", StringType(), True),
    StructField("songId", StringType(), True),
    StructField("userId", StringType(), True),
    StructField("sessionId", StringType(), True),
    StructField("platform", StringType(), True),
    StructField("country", StringType(), True)
])

popular_songs = ( song_plays_df .withWatermark(“eventTimestamp”, “10 minutes”) .groupBy(window(“eventTimestamp”, “5 minutes”), “songId”) .count() )