GCP Cloud Pub/Sub Replay: Seeking to timestamp & Seeking to snapshots

Google Cloud Pub/Sub Replay (Pixabay)
Google Cloud Pub/Sub Replay (Pixabay) 

Let's assume, you have data pipeline deployed on Google Cloud Platform, events are published to Cloud Pub/Sub topic from publisher client, and subscribed by a data processing application, which reads data from the Cloud Pub/Sub subscription, process it and write it to BigQuery table.

Consider a scenario, due to a in the publisher client, the published events are corrupted and not processed by the data processing subscriber and crashing the data processing application. Also, you are observing the increase in the unacknowledged message count and event backlog size metrics. After a day you have fixed the publisher code and redeployed the publisher client. But still you are seeing the subscriber client is crashing and metrics are continue to go up, this is due to the corrupted events already published to Cloud Pub/Sub subscription and waiting for subscriber client to acknowledge. So how do we handle those unacknowledged messages?

Before discussing solution to above problem, let's consider another scenario, you have made some changes to data processing logic and redeployed the subscriber code. Now you are seeing the metrics are okay, not seeing any increase in unacknowledged message counts and backlog size, but data is not available in BigQuery table. So what is happening here, due to a bug in the subscriber code, the messages are being acknowledged by the data processing application but not actually processed and written to BigQuery table, therefore data loss. So how do we retrieve those lost messages?

In the former scenario, there are unacknowledged corrupted events, we don't want to process and skip those events and need to fast-forward/purge events until we get to the time period where we started receiving valid events (events published after the bug fixed in the publisher client). In the later scenario, we have already wrongly acknowledged events (though not processed by subscriber code due to the bugs), now we need to rewind and un-acknowledge those events process them again (we have to go back to the time period when the subscriber code was redeployed, un-acknowledge and replay those events).

Cloud Pub/Sub Seek:

In regular setup you can retrieve the acknowledged messages from Cloud Pub/Sub. However, as we have seen above, sometimes we need to retrieve the previously acknowledged messages to reprocess them (bulk un-acknowledge - change the message status to unacknowledged at once) or fast-forward to purge the unacknowledged messages (bulk acknowledge - change the message status to acknowledged at once).

GCP Cloud Pub/Sub provides Seek API to seek to a specific timestamp or seek to a snapshot to replay the message in the subscription.

Seek to a timestamp:

To seek to a timestamp in the past and unacknowledge the already acknowledged messages in bulk, you need to retain the acknowledged messages in Cloud Pub/Sub topic or subscription. Let's discuss about Cloud Pub/Sub message retention below:

Cloud Pub/Sub topic message retention:

When topic retention is enabled, Cloud Pub/Sub topic is responsible for storing acknowledged messages and topic has the full control over the message retention duration, independent of subscriber retention settings. You can enable the message retention at the topic as follows:

Create a Cloud Pub/Sub topic with message retention duration of 2 weeks:

gcloud pubsub topics create seek-demo-topic --message-retention-duration=14d

To enable message retention on existing topic:

gcloud pubsub topics update seek-demo-topic --message-retention-duration=1d

To disable message retention on a topic:

gcloud pubsub topics update seek-demo-topic --clear-message-retention-duration

Cloud Pub/Sub subscription message retention:

Pub/Sub discards a message from subscription as soon as the message is acknowledged. Unacknowledged messages are retained in subscription for 7 days. To replay acknowledged messages from Pub/Sub subscription, you need to retain the acknowledged messages at the subscription. When topic retention is set greater than 7 days, acknowledged messages are retained more than 7 days (maximum of topic & subscription retention period). 

To create subscription with acknowledged message retention:

gcloud pubsub subscriptions create seek-demo-subscription --topic=seek-demo-topic --retain-acked-messages --message-retention-duration=6d

To update existing subscription's acknowledged message retention:

gcloud pubsub subscriptions update seek-demo-subscription --message-retention-duration=1d

Acknowledged message retention: topic vs subscription:

Cloud Pub/Sub provides message retention options both at topic and subscription level. Choosing the right retention option is important as it impact the cost and maintenance as well. Lets discuss about important considerations in choosing the right retention option for you.

Topic retention: 

Configured at topic level - maintained by topic owner at topic project, pay once for all attached subscription (including the other projects subscriptions). Messages will be retained and available for replay even if there are no subscriptions attached to the topic at the time the messages are published.

Subscription retention:

Configured individually at the subscription level at each subscription projects by respective subscription owners. Pay separately for the subscription retention storage at the subscription project. Comparatively higher maintenance than topic retention, but gives more flexible to retain only the required subset of the messages.  

Seeking to a timestamp:

In our first scenario, we have seen that there are unacknowledged corrupted events, processing which will crash the subscriber code, and we discussed the possible solution of purging those events / fast-forwarding to valid events will solve the issue. Seeking to a timestamp helps to achieve this solution.

For example, let say, your publisher client publishing around 100 events per seconds and there were some changes at the publisher client at 07th March, 2023 - 05:00 PM and all the events published to Cloud Pub/Sub after this time crashing the subscriber code. You have identified the issue and implemented fix and redeployed the publisher code by 08th, Mar 2023 - 09:00 AM. But still the subscriber is getting crashed due to the unacknowledged events, that is events published between 2023-03-07T17:00:00 to 2023-03-08T09:00:00. 

We have flexibility to skip those events, so we decided to purge those events, that is changing the events status from unacknowledged to acknowledged, therefore, our subscriber will receives only valid messages.

At 2023-03-08T09:00:00 you have updated the new publisher client code and seek to this time (current datetime) to purge all unacknowledged events. That is any backlogs before 2023-03-08T09:00:00 will be purged / event's status changed to acknowledged. Command for seeking to a current timestamp (2023-03-08T09:00:00) given below:

export TS_FORMAT=%Y-%m-%dT%H:%M:%SZ
gcloud pubsub subscriptions seek seek-demo-subscription --time=$(date -u +$TS_FORMAT)

Also, you can specify the timestamp explicitly to a required time as follows:

gcloud pubsub subscriptions seek seek-demo-subscription --time=2023-03-08T09:00:00.000000Z

Therefore, now the subscriber will receive only events published after 2023-03-08T09:00:00. That is subscriber's backlog reset to current point in time. Please remember, seeking to a timestamp will work, only if the message retention is enabled at the topic or subscriber.

Seeking to a snapshot:

In our second scenario we have seen that events are already acknowledged by the subscriber but not successfully processed due to a bug in the newly deployed code. Also, we have discussed about the possible solution of rewind/replaying the acknowledged events to reprocess, that is change the event's status from acknowledged to unacknowledged, so that Cloud Pub/Sub will be deliver those messages again.

We have seen how to rewind / fast-forward using seeking to a timestamp functionality above. In this section, we will see how to replay events using seeking to a snapshot functionality.

Let say you have made some changes to data processing logic and redeploy the subscriber code at 07th March, 2023 05:00 PM. And you observed that, due to a bug in the code events are acknowledged but not processed and written to BigQuery table. Therefore you want to fix the subscriber code and seek to 2023-03-07T17:00:00, that is unacknowledge all the acknowledged events from 2023-03-07T17:00:00. 

To seek to a snapshot, first you need to create a snapshot, as we are making changes to subscriber code, create a snapshot just before the deployment (i.e 2023-03-07T17:00:00) as below:

gcloud pubsub snapshots create seek-demo-snapshot --subscription=seek-demo-subscription

You can list the created snapshots and expiration time by running below command:

gcloud pubsub snapshots list

Cloud Pub/Sub List
Cloud Pub/Sub List

Once a snapshot is created, it retains backlogs (all unacknowledged events in source subscription at the time of snapshot creation) and any events published to the topic thereafter.

Now we have a snapshot created at the time of subscriber code deployment, if there is erroneous acknowledgement at the subscriber code, we can seek back to the time period using snapshot as below.

gcloud pubsub subscriptions seek seek-demo-subscription --snapshot=seek-demo-snapshot

Now you will be able to replay the acknowledged events from the snapshot. Seek operations works based on eventual consistency, so you may not see those events instantly after the seek operation, as per documentations, it may take a minute to deliver those events.

Also, you can seek to snapshot from any of the attached subscription to the same topic or newly created subscriptions to the same topic, this will be helpful, if you would like to process those events separately. Below steps shows seek from different snapshot or newly created subscription:

Create new subscription:

gcloud pubsub subscriptions create seek-demo-sub2 --topic=seek-demo-topic --ack-deadline=10

Seek to existing snapshot from newly created subscription:

gcloud pubsub subscriptions seek seek-demo-sub2 --snapshot=seek-demo-snapshot

Snapshots are expired and deleted in 7 seven days or when it reaches the oldest unacknowledged event in the snapshot. For example, if the age of oldest unacknowledged event is 5 days, then the snapshot will expire in 2 days.

Seeking events with Filters:

Cloud Pub/Sub supports replay events from subscription with filters. When you use filters, Cloud Pub/Sub redelivers only events which matches the filter condition. Please note that, when you create a snapshot, it includes all the events, not just the events which matches the filtered condition, therefore storage cost applied to all the events, not just filtered event subset.

Let's publish few more events to our demo topic and demonstrate the seeking with filters

gcloud pubsub topics publish seek-demo-topic --message '{"""Country""": """India""", """Capital""": """Beijing"""}' --attribute=Country="India"

gcloud pubsub topics publish seek-demo-topic --message '{"""Country""": """China""", """Capital""": """Moscow"""}' --attribute=Country="China"

gcloud pubsub topics publish seek-demo-topic --message '{"""Country""": """Russia""", """Capital""": """New Delhi"""}' --attribute=Country="Russia"

As we already created our demo snapshot, which includes these messages as well. Let's create a subscription with message filter and seek filtered events from the snapshot.

Creating new subscription with message filtering:

gcloud pubsub subscriptions create seek-demo-india --topic=seek-demo-topic --ack-deadline=10 --message-filter='attributes.Country="India"'

Create one more subscription with different message filter:

gcloud pubsub subscriptions create seek-demo-russia --topic=seek-demo-topic --ack-deadline=10 --message-filter='attributes.Country="Russia"'

Seeking to snapshot from seek-demo-india subscription:

gcloud pubsub subscriptions seek seek-demo-india --snapshot=seek-demo-snapshot

Seeking to snapshot from seek-demo-russia subscription:

gcloud pubsub subscriptions seek seek-demo-russia --snapshot=seek-demo-snapshot

Pull messages from seek-demo-india:

gcloud pubsub subscriptions pull seek-demo-india


Seeking to a snapshot with message filter
Seeking to a snapshot with message filter

Pull messages from seek-demo-russia:

gcloud pubsub subscriptions pull seek-demo-russia


Seeking to a snapshot with message filter
Seeking to a snapshot with message filter

Note, as of today (Mar, 2023) Cloud Pub/Sub allows you to filter events by attributes, not by message payload. Therefore, use event attribute to filter the events.

Cloud Pub/Sub Replay - Use-cases:

We have discussed two scenarios above where Pub/Sub replay functionality is very helpful. You can use the Cloud Pub/Sub Replay functionality, when deploying new publisher client code, subscriber client code, recovering from unexpected subscriber client code, testing subscriber code on real data, and all the other scenarios, where you need either acknowledge messages in bulk (fast-foreward) or unacknowledge previously acknowledged messages (rewind). Also, as Cloud Pub/Sub seek functionality allows you to perform operation in bulk of messages, it will be huge cost and time saver.


We have discussed about the scenarios where you need to acknowledge the unsupported messages in bulk which are waiting for delivery and the scenario where you need to redeliver the messages for reprocessing which already erroneously acknowledged by the subscriber code. And we have seen how these scenarios can be handled by using Cloud Pub/Sub Seek functionality where you forward or rewind to a point in time or to a snapshot. Also, we have seen the implementation details of enabling message retention, seeking to a timestamp and seeking to a snapshot. Finally, we have seen the seeking events with filter option and its implementation and use cases of Cloud Pub/Sub Seek functionality. I hope this article is helpful and please share your thoughts in comment section. Thank you!

0 thoughts:

Post a Comment