How to build an event driven application on Google Cloud using cloud functions

Cloud Functions: A simple approach to building an event driven system

Dries De Rydt - 21/2/2022

This blog post outlines an approach we took when we built a video analysis tool using Google Cloud Functions. Some of the analysis is done through code of our own and some of these functions delegate to Google's AI services. We'll go over some useful technologies and practices, as well as a few code samples.

If you want more information about the case itself, check out Deevid's non-technical blog post where he outlines how we started the case and what the goals were.

What is a cloud function?

The concept of a Cloud function is to have a specialised service with exactly one purpose. This module can scale to zero in order to save on costs, and can scale horizontally as the load increases. Each of these functions can be developed in either Node.js, Go or Python and can be maintained by different developers. If a module needs to be updated, no other code is ever affected. Each cloud function can scale independently from the others.

Cloud functions can be triggered in several ways, often through a queue topic, REST call or a file upload. The focus of this post will be to process files when they are uploaded to Google storage buckets, through its built-in event system.

Technologies used

Serverless - Framework for managing cloud functions

Google Cloud Functions - Google's cloud function provider

Google Cloud Storage - Google's object store, capable of triggering events to Google cloud functions

OpenPose - Body pose estimation module

Google AI API's - We used various AI API's that google providers, more on these later!

The architecture

As you can see below, the full architecture looks intimidating, but ends up being simple and maintainable.

The architecture, showing what cloud functions are triggered by what events
In purple: Cloud Functions. In bold you can see what trigger results in what function being activated. The frontend is a dashboard that shows the analysis.

In the application itself, the user uploads a video, which triggers all functions related to video analysis. One of the functions creates an audio file using ffmpeg and stores it in a separate bucket.

Storing this file activates the audio trigger. This triggers each cloud function related to the audio (pitch analysis, volume analysis,...). One of these cloud functions will create a text file by calling Google's speech to text Service. The text file is stored in a separate bucket, and this action triggers more analysis.

Each Cloud function stores its results in a central database. This database is polled by the API to show the user the results (or potential errors) when processing is complete.

The application itself exists of two App engines, and a VM with GPU which is spun up on demand. This means that if the app is not in use, the only hosting cost is the storage on the bucket, and the Database).

Building the cloud functions

Triggering from an upload

Here's an example of a cloud function that triggers on video, and extracts audio from that file.

import ffmpeg
from google.cloud import storage

client = storage.Client()
AUDIO_BUCKET = 'blog-audio-bucket'

def extract_wav_from_video(data, context):
dl_bucket = client.get_bucket(data['bucket'])
dl_blob = dl_bucket.get_blob(data['name'])
dl_blob.download_to_filename(f'/tmp/temp_{data["name"]}')
try:

ffmpeg.input(f'/tmp/temp_{data["name"]}', ss=1)
.output('/tmp/temp.wav', format='wav', bits_per_raw_sample=16, ac=1, ar=16000)
.overwrite_output()
.run(capture_stdout=True, capture_stderr=True)

except ffmpeg.Error as e:
print(e.stderr.decode())
exit(0)
ul_bucket = client.get_bucket(AUDIO_BUCKET)
filename = data['name'].split('.')[0]
newblob = ul_bucket.blob(f'{filename}.wav')
newblob.upload_from_filename('/tmp/temp.wav')

All you need is to do is add the following lines to your requirements file:

ffmpeg-python
google-cloud-storage

As you can see, the incoming event contains the filename and the bucket. You can then download the file with Google's storage API and process it as you like. This is the core of most of our cloud functions.

Authenticating with a Google API

Each cloud function has a service account linked to it (by default, it's the app engine service account). You can use this service account, or add custom ones to make sure your cloud functions have the access rights needed to communicate with storage or a service.

You can also give your cloud function access to the Google APIs of your project. Here is a sample of the Speech to Text API call, triggered by the upload of a new audio file:

const speech = require('@google-cloud/speech');
const gcs = require('@google-cloud/storage')();

exports.transcribe = async ({bucket: fileBucket, name: fileName, contentType}, context) => {
// Exit if this is triggered on a file that is not audio.
if (!contentType.startsWith('audio/')) {
return null;
}
// Creates a client for api access
const client = new speech.SpeechClient();
const gcsUri = `gs://${fileBucket}/${fileName}`;
const sampleRateHertz = 16000;
// const languageCode = 'BCP-47 language code, e.g. en-US';
const config = {
sampleRateHertz,
languageCode: ["en-US"]
};
const audio = {uri: gcsUri};
const request = {config, audio};

// Detects speech in the audio file. This creates a recognition job that you
// can wait for now, or get its result later.
const [operation] = await client.longRunningRecognize(request);
// Get a Promise representation of the final result of the job
const [response] = await operation.promise();
const transcription = response.results
.map(result => result.alternatives[0].transcript)
.join('\n');

// Uploading the audio. We create a new filename based on the original filename
const targetFileName = fileName.replace(/\.[^/.]+$/, '') + '.txt';
const base_name = targetFileName.replace('.txt', '');
const target_bucket = gcs.bucket("blog-transcripts");
const file = target_bucket.file(targetFileName);

// TODO: Store result to the Database. Removed this for the Blog

//Save the transcript in the bucket
return file.save(transcription, {resumable: false}).catch(console.error);
};

As you can see, the bucket url is passed through from the event, and the Google API accepts a Bucket URI. This means the file does not even need to downloaded, we simply need to delegate the Speech to Text to the API. The result will then be stored to the transcripts bucket, triggering additional analysis.

Deploying cloud functions

Cloud functions are very easy to deploy, and they've become my go-to when it comes to quickly getting something available online.

On Google cloud you can even develop in your browser. We've used this in the past to make a simple Slack notification system for example. For a mature development setup, we recommend serverless. Serverless lets you define everything locally, makes it easy to deploy your functions, define your environments,...

Alternatively, you can set a Google cloud repository as the source for your cloud function. This gives you continuous deployment out of the box.

Using GPUs

For some AI tasks, the use of at least one GPU is necessary to ensure reasonable inference times. Since one of the advantages of cloud functions is keeping costs low, and paying for exactly what you use, it would be shame to keep a GPU instance running at all times.

Since cloud functions do not support GPU access, we use events to spin up Compute instances when needed.

Here is an example of a Node.js based cloud function that spins up a compute engine on demand:

var Compute = require('@google-cloud/compute');
var compute = Compute();
exports.startInstance = function startInstance(req, res) {
var zone = compute.zone('europe-west1-b');
var vm = zone.vm('openpose');
vm.start(function(err, operation, apiResponse) {
console.log('instance start successfully');
});
res.status(200).send('VM has Started!');
};

Then, if the compute instance has nothing left to process, it shuts itself back down.

Conclusion

Google Cloud functions are a very powerful tool for building event-driven systems.

The challenges of building an application with cloud functions are the same as with most microservice architectures. First of all, it can be hard to see the impact of a trigger. For example, what happens exactly when you upload a new file? You need a good overview of your triggers and processes to avoid a large amount of code duplication.
Additionally, you'll need solid error logging to keep track of the data in your pipeline. If something goes wrong, it needs to be tracked.

On the flip side, here are a few advantages we've encountered:

  • Permissive free tiers, pay only for what you use, scales to 0
  • Scalability: every event is guaranteed to be handled
  • Triggering functions from storage means they are shielded: they are as secure as your storage system
  • Every module is highly specialised
  • If a file needs to be analysed in multiple ways, the cloud functions all perform their actions in parallel
  • Every module can be written in the most appropriate language for its function (as long as it's supported by Cloud Functions, otherwise, check out Cloud Run)
  • You don't have to worry about integration, the event format from storage is a reliable constant

The most important conclusion is the same as for every system: you need to make sure the solution fits your case. For our case, processing files as they are uploaded, it was a perfect fit!

Overview