Streaming Analytics in Google Cloud Platform (GCP) - Building Data Pipeline with Apache Beam

Building Apache Beam Data Pipeline
Building Apache Beam Data Pipeline (Source: Pixabay) 


In introduction article of this series Streaming Analytics in Google Cloud Platform (GCP) - Introduction, we have seen the basics of streaming analytics, its importance and example uses cases, and short introduction about the Google Cloud Services, we will be using to build Streaming Analytics system in Google Cloud Platform.


The second article Streaming Analytics in Google Cloud Platform (GCP) - Setting Up The Environment, covers the instructions for setting up development and deployment environment in Google Cloud Platform. In this current article, we will be building a data pipeline using Apache Beam Python SDK, we will cover the theoretical aspects of Apache Beam pipeline components first and then discuss the relevant code. 


What is a Pipeline in data processing?

In simple terms, a data pipeline is a series of connected steps or stages through which raw data is transformed or processed in order to extract insights or perform specific tasks. Data pipelines can be used to perform a wide range of tasks such as data ingestion, cleaning and transformation, analysis and visualisation. The data can be either bounded (batch) or unbounded (stream) and typically involves extracting data from one or more sources, cleaning and processing, and then storing the results in a target location.


Pipeline in Apache Beam

In Apache Beam, a pipeline is created by building a Directed Acyclic Graph (DAG) of data transformation, where the input is passed through a series of steps or transforms, before being output to the desired location.


Directed Acyclic Graph (DAG) is a graph which consists of nodes (also known as vertices) and edges, where the edges have directions and do not form any cycles or loops and the edges always point in the same direction. 


In a data processing pipeline, nodes represent the different transformations or operations that are applied to the input data and the edges represent the flow of data from one transformation to another.


As we have seen in the first part - Streaming Analytics in Google Cloud Platform (GCP) - Introduction, Beam provides many SDKs to build a pipeline and it supports various runners to execute the pipeline. The following pipeline reads data from a text file, does some transformation and writes it back to a text file.


# Import the Apache Beam
import apache_beam as beam

# Create a Pipeline object
pipeline = beam.Pipeline()


# Input: Read raw input data from source
input_data = pipeline | "read input" >> beam.io.ReadFromText('./input.txt')

# Transformation: Perform a simple transformation
transformed_data = input_data | 'transform data' >> beam.Map(lambda x: int(x) * 2)

# Output: write the transformed data to an output file
transformed_data | 'write output' >> beam.io.WriteToText('output.txt')

# Run the pipeline
pipeline.run()



This pipeline reads data from the input.txt file and performs a transformation of converting to integer and multiplying by 2 and finally stores the result in another text file output.txt


Create text file input.txt and insert a number and run the pipeline by executing this code as follows:


touch input.txt
echo 2 >> input.txt
Python first_pipeline.py



This will create an output text file output.txt, lets's read the output file - it should contain 4, lets verify:


cat output.txt



Now, you have successfully created your first pipeline in Apache Beam!!!


Let's go back to our code and review it again, you may have noticed, we have used some variables to store intermediate results. For example, input_data holds the data from the ReadFromtext() class and transformed_data holds Map() class. In Apache Beam, these variables are called pcollections and operations are called ptransforms. Having a good understanding of pcollection and transforms are very important for building a streaming pipeline, let's cover those topics, before moving to the next items.


PCollection in Apache Beam


PCollection (Parallel Collection) is an abstract representation of a distributed dataset, that can be processed in parallel. In Apache beam, it is the main dataset for representing a set of data that can be processed in parallel.


Reading from files, and reading from PubSub subscriptions are examples of creating a pcollection, also, transforming an existing pcollection also will create a new pcollection. Transformations such as filtering, mapping, grouping and aggregation are applied to the pcollection.


Example pcollection:


Reading from a file:

input_data = pipeline | "read input" >> beam.io.ReadFromText('./input.txt')


Pcollection created by applying transformation on an existing pcollection:

transformed_data = input_data | 'transform data' >> beam.Map(lambda x: int(x) * 2)



Some of the properties of pcollection are given below for your reference:


Immutable - once created, cannot be modified, to modify, apply a transformation on the original pcollection


pcollection may be distributed across multiple machines in a distributed processing environment - allowing parallel processing.


pcollection is not processed until is executed, such as writing to an output file - lazily evaluated.


pcollection can be bounded (batch - ex: file) or unbounded (stream - ex:pubsub )


pcollection can be partitioned into logical chunks called Windows based on event timestamp.


PTransforms in Apache Beam


PTransform (Parallel Transform) in Apache Beam holds the processing logic, it takes a pcollection as input, applies the transformation logic and gives a transformed pcollection as output.


pcollection1 -> ptransform1 -> pcollection2 -> ptransform2 -> pcollection3 -> ptransform3


In a data pipeline, input raw data as pcollection goes through multiple ptransform and the final ptransform produce a final pcollection which will be the output for the sink.


Example:


# Transformation: Perform a simple transformation

transformed_data = input_data | 'transform data' >> beam.Map(lambda x: int(x) * 2)



In this example, input pcollection input_data will be processed using Map transform and output will be stored in a pcollection called transformed_data.


Note in Apache Beam reading from source and writing to sink operations are ptransforms. See the below example:


# Input: Read raw input data from source

input_data = pipeline | "read input" >> beam.io.ReadFromText('./input.txt')

transformed_data | 'write output' >> beam.io.WriteToText(output.txt')




Apache Beam provides many pre-built ptransform with some of the commonly used ptransforms including:


Map: Applies a function to each element of a PCollection and returns a new PCollection containing the transformed elements.


Filter: Keeps only elements from a PCollection that satisfy a certain predicate and returns a new PCollection containing the filtered elements.


FlatMap: Applies a function to each element of a PCollection and returns a new PCollection containing the transformed elements, which may be of a different size than the input PCollection.


Combine: Aggregates elements of a PCollection using a specific combiner function and returns a single output element.


GroupByKey: Groups the elements of a PCollection by key and returns a PCollection of key-value pairs, where the values are grouped into an iterable object.


We have a good understanding of the building blocks of the Apache Beam data pipeline, let's put together all our learning till this point, and design a streaming pipeline in Apache Beam.


Streaming Pipeline in Apache Beam

A streaming pipeline is a pipeline that processes data in real-time as it is received at the system. Typically used to process data from streaming sources, such as sensors, logs, social media feeds, financial ticker logs, etc.


In the streaming pipeline, the input data is typically from an unbounded data source which has an unlimited number of elements and a series of PTransform are applied to an unbounded PCollection.


Let's build a streaming pipeline in Apache Beam, using the pcollection, and ptransform as discussed above:


import json

import apache_beam as beam

from apache_beam.options.pipeline_options import PipelineOptions

from apache_beam.options.pipeline_options import StandardOptions

from apache_beam.io.gcp.bigquery import WriteToBigQuery


# define the Pub/Sub subscription id

SUBSCRIPTION = "projects/projectname/subscriptions/subscriptionname"


#Set the BigQuery schema.

SCHEMA = 'name:STRING,age:INTEGER'


# Set the table name.

TABLE_NAME = 'project:dataset.table'


def
 parse_json_message(message):

    """Parse the input json message """

    row = json.loads(message)

    return {

        "name": row["name"],

        "age": row["age"]

    }


# Define pipeline options - to enable streaming mode

options = PipelineOptions()

options.view_as(StandardOptions).streaming = True


# Create a pipeline

pipeline = beam.Pipeline()

pipeline = beam.Pipeline(options=options)


# Read from a Cloud Pub/Sub topic and create a PCollection

input_pcoll = pipeline | 'Read message from PubSub' >> beam.io.ReadFromPubSub(subscription=SUBSCRIPTION)

message_string = input_pcoll | "UTF-8 bytes to string" >> beam.Map(lambda msg: msg.decode("utf-8"))

json_message = message_string | "Parse JSON messages" >> beam.Map(parse_json_message)

output_pcoll = json_message | 'Write to BigQuery table' >> WriteToBigQuery(table=TABLE_NAME, schema=SCHEMA)


#Run the pipeline

pipeline.run().wait_until_finish()




The above code reads messages from Pub/Sub subscription (unbounded data source) and writes to BigQuery table (sink). The code defines pcollections, ptransforms and additionally pipeline options, BigQuery streaming options. 


Pipeline options used to configure different aspects of pipeline such as runner and runner specific configurations, GCP project, region, job name, etc. In our code, we have specified options.view_as(StandardOptions).streaming = True  to enable streaming. 


We use BigQueryIO to write messages to BigQuery table, WriteToBigQuery transform requires table name, schema, table’s create disposition and tables write disposition. 


Table name - name of the destination table 

Schema - destination table schema 

Create disposition - specifies whether the destination table must exist or can be created by the write operation 

Write disposition - specifies whether write will be truncate and write or append rows to an existing table or write only to an empty table


output_pcoll  = json_message | 'Write to BigQuery table' >> WriteToBigQuery(                                                     table=TABLE_NAME                                           schema=SCHEMA, 

write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,                                            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)




Here, table name is a fully-qualified BigQuery table name and it consist of three parts:


Project ID: Google Cloud Project ID

Dataset ID: BigQuery dataset ID

Table ID: BigQuery table ID


And it is specified as follows:


[project_id]:[dataset_id].[table_id]


Deploying Apache Beam Pipeline on GCP Cloud Dataflow

You have successfully build the data pipeline using Apache Beam Python SDK, it is time to deploy the Pipeline on GCP Cloud Dataflow. In the first introductory article we have discussed about Cloud Dataflow, if you have not read that article, it is time to review quickly. To deploy a pipeline as dataflow job, we will be using command line arguments and we will pass following arguments to our pipeline as pipeline options.


Jobname - Name of dataflow job
Runner - Name of runner (dataflow runner)
Project - GCP project ID
Region - GCP Compute Engine region
Staging Location - Cloud Storage bucket name for staging binary and temporary files
Temporary LocationPath for temporary files
NetworkThe Compute Engine network for launching Compute Engine instances to run your pipeline
SubnetworkThe Compute Engine subnetwork for launching Compute Engine instances to run your pipeline


Lets update the Pipeline options in our code:

Import the Pipeline option related libraries:

from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import WorkerOptions




Import Argument Parser to get command line input to our pipeline code:

import argparse

Update our pipeline option to get Pub/Sub subscription for input, output_table to specify the BigQuery table name, and other GCP options as listed above.

parser = argparse.ArgumentParser()
parser.add_argument("--subscription",help="Input PubSub subscription of the form " '"projects/<PROJECT>/subscriptions/<SUBSCRIPTION>."')
parser.add_argument("--output_table", help="BigQuery table of the form " '"project:dataset:table"')
parser.add_argument('--project',required=True, help='Specify Google Cloud project')
parser.add_argument('--region', required=True, help='Specify Google Cloud region')
parser.add_argument('--staging_location', required=True, help='Specify Cloud Storage bucket for staging')
parser.add_argument('--temp_location', required=True, help='Specify Cloud Storage bucket for temp')
parser.add_argument('--runner', required=True, help='Specify Apache Beam Runner')
parser.add_argument('--network', required=True, help='Specify network')
parser.add_argument('--subnetwork', required=True, help='Specify subnetwork')

opts, pipeline_args = parser.parse_known_args(argv)

options = PipelineOptions(pipeline_args)
options.view_as(SetupOptions).save_main_session = True
options.view_as(StandardOptions).streaming = True
options.view_as(GoogleCloudOptions).project = opts.project
options.view_as(GoogleCloudOptions).region = opts.region
options.view_as(GoogleCloudOptions).staging_location = opts.staging_location
options.view_as(GoogleCloudOptions).temp_location = opts.temp_location
options.view_as(GoogleCloudOptions).job_name = '{0}{1}'.format('stream-analytics-pipeline',time.time_ns())
options.view_as(StandardOptions).runner = opts.runner
options.view_as(WorkerOptions).network = opts.network
options.view_as(WorkerOptions).subnetwork = opts.subnetwork
options.view_as(WorkerOptions).use_public_ips = False

p = beam.Pipeline(options=options)


Change the pipeline transformation code to get subscription name from command line argument:


input_pcoll = pipeline | 'Read message from PubSub' >> beam.io.ReadFromPubSub(subscription=opts.subscription)



Change the pipeline transformation code to get BigQuery table name from command line argument and add other BigQuery related options as seen above:


output_pcoll = json_message | 'Write to BigQuery table' >> WriteToBigQuery(
table=opts.output_table,
schema=SCHEMA,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
method='STREAMING_INSERTS')
output_pcoll = json_message | 'Write to BigQuery table' >> WriteToBigQuery(table=opts.output_table, schema=SCHEMA)


Here is our final code:

import json
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import WorkerOptions
from apache_beam.io.gcp.bigquery import WriteToBigQuery

import argparse
import logging
import time
# Set the BigQuery schema.
SCHEMA = 'name:STRING,age:INTEGER'

def parse_json_message(message):
"""Parse the input json message """
row = json.loads(message)
return {
"name": row["name"],
"age": row["age"]
}
def run(argv):
parser = argparse.ArgumentParser()

parser.add_argument("--subscription",help="Input PubSub subscription of the form " '"projects/<PROJECT>/subscriptions/<SUBSCRIPTION>."')
parser.add_argument("--output_table", help="BigQuery table of the form " '"project:dataset:table"')
parser.add_argument('--project',required=True, help='Specify Google Cloud project')
parser.add_argument('--region', required=True, help='Specify Google Cloud region')
parser.add_argument('--staging_location', required=True, help='Specify Cloud Storage bucket for staging')
parser.add_argument('--temp_location', required=True, help='Specify Cloud Storage bucket for temp')
parser.add_argument('--runner', required=True, help='Specify Apache Beam Runner')
parser.add_argument('--network', required=True, help='Specify network')
parser.add_argument('--subnetwork', required=True, help='Specify subnetwork')


opts, pipeline_args = parser.parse_known_args(argv)

options = PipelineOptions(pipeline_args)
options.view_as(SetupOptions).save_main_session = True
options.view_as(StandardOptions).streaming = True
options.view_as(GoogleCloudOptions).project = opts.project
options.view_as(GoogleCloudOptions).region = opts.region
options.view_as(GoogleCloudOptions).staging_location = opts.staging_location
options.view_as(GoogleCloudOptions).temp_location = opts.temp_location
options.view_as(GoogleCloudOptions).job_name = '{0}{1}'.format('stream-analytics-pipeline',time.time_ns())
options.view_as(StandardOptions).runner = opts.runner
options.view_as(WorkerOptions).network = opts.network
options.view_as(WorkerOptions).subnetwork = opts.subnetwork
options.view_as(WorkerOptions).use_public_ips = False


p = beam.Pipeline(options=options)

# Create a pipeline
pipeline = beam.Pipeline()
pipeline = beam.Pipeline(options=options)

# Read from a Cloud Pub/Sub topic and create a PCollection
input_pcoll = pipeline | 'Read message from PubSub' >> beam.io.ReadFromPubSub(subscription=opts.subscription)
string_message = input_pcoll | "UTF-8 bytes to string" >> beam.Map(lambda msg: msg.decode("utf-8"))
json_message = string_message | "Parse JSON messages" >> beam.Map(parse_json_message)
output_pcoll = json_message | 'Write to BigQuery table' >> WriteToBigQuery(table=opts.output_table, schema=SCHEMA)

# Run the pipeline
pipeline.run().wait_until_finish()
if __name__=="__main__":
logging.getLogger().setLevel(logging.INFO)
run()
 

Deploy our code to Google Cloud Dataflow:


export BQTABLE="your_big_query_table_name_in_fully_qualified_name"
export SUBSCRIPTION="your_pubsub_subscription"
export PROJECT="$(gcloud config get-value project)"
export BUCKET="gcs_bucket_name"
export REGION="asia-southeast1"
export NETWORK="your_network"
export SUBNETWORK="https://www.googleapis.com/compute/v1/projects/your_project_id/regions/asia-southeast1/subnetworks/your_subnet"
export TEMPLOCATION="gs://$BUCKET/tmp"
export STAGELOCATION="gs://$BUCKET/dev"
python -m streaming \
--output_table "$BQTABLE" \
--subscription "$SUBSCRIPTION" \
--runner DataflowRunner \
--project "$PROJECT" \
--region "$REGION" \
--subnetwork="$SUBNETWORK" \
--network="$NETWORK" \
--staging_location="$TEMPLOCATION" \
--temp_location="$STAGELOCATION"

After few minutes, your can check your job named stream_analytics in Google Cloud Console under Dataflow jobs section.

Summary


We have covered what is data pipelines in the context of data processing, building data pipeline in Apache Beam and various components of Apache Beam data pipeline such as pcollection, ptransform and pre-built core transforms. Also, we have covered streaming pipeline and defined a streaming pipeline using Apache Beam Python SDK and gone through the pipeline options, PubSub IO and BigQuery IO features. And finally, we have deployed our data pipeline on Google Cloud Dataflow runner. 


That’s all for this part, we will introduce new concept in next article of this series. Until then, have a wonderful day!”


References

Directed Acyclic Graph (DAG): https://en.wikipedia.org/wiki/Directed_acyclic_graph
Beam Concepts: https://beam.apache.org/documentation/
Stream Analytics in GCP - introduction: https://www.rathishkumar.com/2023/01/streaming-analytics-in-google-cloud.html
Stream Analytics in GCP - setting up the environment: https://www.rathishkumar.com/2023/01/streaming-analytics-in-gcp-setting-up-environment.html