On-Prem gcloud SDK - Cloud Storage & Pub/Sub

gcloudSDK, GCP, 2024

image

The objective of this demo is to be able to use gcloud and simulate on-prem data engineering capabilities required in multiple scenarios. We will use gcloud python SDK to (1) upload data files to GCP Cloud Storage and (2) Publish status notificaiton to GCP Pub/Sub on whether the ingest happened.

Cloud Security - IAM Service Account

image

We woud need to create service account which will used by the client to authenticate into GCP using secret keys.

For the Service Account to be able to access Cloud Storage we would need priviledges to the destination bucket. Below are the permissions required by the service account.

  • Storage Object Creator
  • Storage Object Viewer

For the Service Account to be able to access Pub/Sub we would need to priviledges as below

  • Pub/Sub Publisher

image

This key file which will include IAM Service account detailsa and security key details will need to be download and used by the client.

image

Create a Bucket which will be used for laoding data

I am creating a base for interest of time. I would usually have the bucket name is randomized for security reasons. Also ensure that the name right security is applied on the bucket based on the recommendation of the Raw-Layer.

image

Create a Topic and Subscription

image

gcloud Python SDK

Cloud Storage. This will upload a file to the specified bucket.

json_credentials_path = '/Users/xxxxxxxxxxxxx.json
def upload_to_gcs(local_file_path, bucket_name, destination_blob_name, json_credentials_path):
    client = storage.Client.from_service_account_json(json_credentials_path)
    bucket = client.get_bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)
    blob.upload_from_filename(local_file_path)

bucket_name = 'ingest-xxxxxx'
project_id = 'durabxxxxxx-g7'
topic_id = 'topic-xxxxxxxn'

def publish_message(project_id, topic_id, message):
    # Initialize the Publisher client
    publisher = pubsub_v1.PublisherClient()
    topic_path = publisher.topic_path(project_id, topic_id)
    # Encode the message to bytes
    message_bytes = message.encode('utf-8')
    # Publish the message
    future = publisher.publish(topic_path, data=message_bytes)

Run

Data is loaded into the bucket. We can see the parquet files uploaded based on the filenames passed to the funciton defined above.

image

The subcription also has recevied the notifications as we can see below.

image