Building Reliability in On-prem Data Upload Jobs Through Log Monitoring

Monitoring, GCP, 2024

image

Pub/Sub does not natively log message data access events. Instead, it logs administrative actions and performance metrics. For monitoring the actual message flow (such as ensuring that messages are being published and processed), you’ll need to implement a logging mechanism that writes to Cloud Logging whenever a message is published or processed. We will use Cloud Funcion which is triggered by the Pub/Sub message is received and it will be used to log messages. We can expand this Cloud Function to customize messaging for multiple scenarios.

We typically know how to handle job failures messages by using exception handling. However, we can expect on-prem uplaod job success or failure only if the job was initited.

We would need to be able to register whether or not a job was initiated. In this scenario we would want to know that the on-prem job was initiated successfully. If we do not receive a job initaited message, it means that the Job scheduler failed and we would need to fix the problem.

Objectives

  1. Cloud Function to log the message using Cloud Logging
  2. Deploy Cloud Function to be triggered by Pub/Sub when the a message is received
  3. Use Cloud Monitoring to create a Metric and Policy to monitor for a message arrival.
  4. A Cloud Alert would need to send a email notificaiton when a message does not arrive within a timeframe.
  5. Workflow Process to create an incident and close the incident when the condition is resolved.

1. Cloud Function to log the message using Cloud Logging

import base64
import google.cloud.logging
from cloudevents.http import CloudEvent
import functions_framework

# Triggered from a message on a Cloud Pub/Sub topic.
@functions_framework.cloud_event
def subscribe(cloud_event: CloudEvent) -> None:
    # Create a Cloud Logging Client
    client = google.cloud.logging.Client()
    client.setup_logging()
    # Create a Logger
    logger = client.logger("cronjob_erp_salesorderitems_pubsub_logger")
    # Create a Log Message
    message = base64.b64decode(cloud_event.data["message"]["data"]).decode()
    # Log the message
    logger.log_text(message)

This Cloud Function requires 2 libraries which we would need to configure in a requirements.txt

google-cloud-logging
pytest==8.2.0

2. Deploy Cloud Function to be triggered by Pub/Sub

gcloud functions deploy python-onprem-logreader-pubsub-function \
--gen2 \
--runtime=python312 \
--region=us-west1 \
--source=. \
--entry-point=subscribe \
--trigger-topic=topic-play-kfnstudy-trigger-cloudFunction

Running a test on Cloud Function by publishing a message to pub-sub

We are running a client from the demo where we used a cron job to schedule a gcloud python client to copy files to Google storage and sent a notification to Pub/Sub that the client was triggered.

Message published from client:

    INFO: 20240804165246:durable-pipe-431319-g7:Scheduled Job Initated; Ingest ERP Data - SalesOrderItems;20240804165246 Published message ID: 11920180892590542

Pub/Sub Subscription

image

3. Cloud Monitoring to create a Metric and Policy

We can confirm that the message was logged by Cloud Logging by Google Cloud Monitoring

image

Create a metric

resource.type="cloud_function"
resource.labels.function_name="python-onprem-logreader-pubsub-function"
resource.labels.project_id="durable-pipe-431319-g7"
resource.labels.region="us-west1"
logName="projects/durable-pipe-431319-g7/logs/cronjob_erp_salesorderitems_pubsub_logger"
textPayload:"Scheduled Job Initated; Ingest ERP Data - SalesOrderItems"

image

image

Creating a policy on the metric

image

We also configure a alert to be triggered based on a metric absence. If there is no alert we will trigger a notification

image

I defined a notification channnel based on sending an email.

image

4. Alert and Notification

There was no alert in 5 mins and an alert was crated

image

An email was also triggered

image

5. Workflow Process to create an incident and close the incident

image

The incident can be acknowledged.

image

The incident only be resolved when the condition is met. A pubsub message is required to close the message.

image

Publishing a message and closing the incindent.

image

image

We are able to close the incident once we receive a message.