Building Reliability in On-prem Data Upload Jobs Through Log Monitoring
Monitoring, GCP, 2024
Pub/Sub does not natively log message data access events. Instead, it logs administrative actions and performance metrics. For monitoring the actual message flow (such as ensuring that messages are being published and processed), you’ll need to implement a logging mechanism that writes to Cloud Logging whenever a message is published or processed. We will use Cloud Funcion which is triggered by the Pub/Sub message is received and it will be used to log messages. We can expand this Cloud Function to customize messaging for multiple scenarios.
We typically know how to handle job failures messages by using exception handling. However, we can expect on-prem uplaod job success or failure only if the job was initited.
We would need to be able to register whether or not a job was initiated. In this scenario we would want to know that the on-prem job was initiated successfully. If we do not receive a job initaited message, it means that the Job scheduler failed and we would need to fix the problem.
Objectives
- Cloud Function to log the message using Cloud Logging
- Deploy Cloud Function to be triggered by Pub/Sub when the a message is received
- Use Cloud Monitoring to create a Metric and Policy to monitor for a message arrival.
- A Cloud Alert would need to send a email notificaiton when a message does not arrive within a timeframe.
- Workflow Process to create an incident and close the incident when the condition is resolved.
1. Cloud Function to log the message using Cloud Logging
import base64
import google.cloud.logging
from cloudevents.http import CloudEvent
import functions_framework
# Triggered from a message on a Cloud Pub/Sub topic.
@functions_framework.cloud_event
def subscribe(cloud_event: CloudEvent) -> None:
# Create a Cloud Logging Client
client = google.cloud.logging.Client()
client.setup_logging()
# Create a Logger
logger = client.logger("cronjob_erp_salesorderitems_pubsub_logger")
# Create a Log Message
message = base64.b64decode(cloud_event.data["message"]["data"]).decode()
# Log the message
logger.log_text(message)
This Cloud Function requires 2 libraries which we would need to configure in a requirements.txt
google-cloud-logging
pytest==8.2.0
2. Deploy Cloud Function to be triggered by Pub/Sub
gcloud functions deploy python-onprem-logreader-pubsub-function \
--gen2 \
--runtime=python312 \
--region=us-west1 \
--source=. \
--entry-point=subscribe \
--trigger-topic=topic-play-kfnstudy-trigger-cloudFunction
Running a test on Cloud Function by publishing a message to pub-sub
We are running a client from the demo where we used a cron job to schedule a gcloud python client to copy files to Google storage and sent a notification to Pub/Sub that the client was triggered.
Message published from client:
INFO: 20240804165246:durable-pipe-431319-g7:Scheduled Job Initated; Ingest ERP Data - SalesOrderItems;20240804165246 Published message ID: 11920180892590542
Pub/Sub Subscription
3. Cloud Monitoring to create a Metric and Policy
We can confirm that the message was logged by Cloud Logging by Google Cloud Monitoring
Create a metric
resource.type="cloud_function"
resource.labels.function_name="python-onprem-logreader-pubsub-function"
resource.labels.project_id="durable-pipe-431319-g7"
resource.labels.region="us-west1"
logName="projects/durable-pipe-431319-g7/logs/cronjob_erp_salesorderitems_pubsub_logger"
textPayload:"Scheduled Job Initated; Ingest ERP Data - SalesOrderItems"
Creating a policy on the metric
We also configure a alert to be triggered based on a metric absence. If there is no alert we will trigger a notification
I defined a notification channnel based on sending an email.
4. Alert and Notification
There was no alert in 5 mins and an alert was crated
An email was also triggered
5. Workflow Process to create an incident and close the incident
The incident can be acknowledged.
The incident only be resolved when the condition is met. A pubsub message is required to close the message.
Publishing a message and closing the incindent.
We are able to close the incident once we receive a message.