How to stream data from Pub/Sub to Big Query using Dataflow and Terraform templates
You’ve deployed a message queue in Pub/Sub, congrats! Now you want to analyze this data, so you’ve decided to move it into a data lake or warehouse. You’ve been using Big query and followed the google docs to get it working via the UI. Now it’s time to turn it to infra as code and deploy some terraform.
In the tutorial below, we’ll go through each part of the code on how to do the deployment. The assumption is you have experience using terraform and the gcp resources.
Useful Resources
- Google template example
- Terraform provider
- Accompanying Github Repo
- Dataflow docs
The Project Structure
The sample project structure is fairly simple, a terraform folder with the terraform files, and a schemas folder with the two schemas we use.
|____terraform
| |____project.tf
| |____dataflow.tf
| |____pubsub.tf
| |____storage.tf
| |____providers.tf
| |____variables.tf
| |____bq.tf
| |____schemas
| | |____ml_model_topic_feedback_schema.json
| | |____bq_feedback_schema.json
We're deploying 4 main components:
- The project
- The Pub/Sub topic/schema/subscription
- The temp storage bucket
- The Biquery dataset/table
Writing The Terraform
To start off we need to create the project and enable the main apis we will use.
Enabling APIs
locals {
gcp_service_list = ["bigquery.googleapis.com", "bigquerystorage.googleapis.com","iam.googleapis.com", "iamcredentials.googleapis.com","pubsub.googleapis.com", "dataflow.googleapis.com"]
}
resource "google_project" "env" {
name = var.project_name
project_id = var.project_id
billing_account = var.billing_account
folder_id = var.folder_id
}
resource "google_project_service" "gcp_services" {
count = length(local.gcp_service_list)
project = google_project.env.project_id
service = local.gcp_service_list[count.index]
disable_dependent_services = true
depends_on = [
google_project.env
]
}
Creating the Pub/Sub Components
Next we need to deploy the Pub/Sub components that will transport the data
resource "google_pubsub_schema" "feedback_schema" {
name = "sample_topic"
type = "AVRO"
definition = file("./schemas/topic_feedback_schema.json")
depends_on = [google_project.env,google_project_service.gcp_services]
}
resource "google_pubsub_topic" "feedback_topic" {
name = "sample_topic"
depends_on = [google_pubsub_schema.feedback_schema]
schema_settings {
schema = "projects/${var.project_id}/schemas/${google_pubsub_schema.feedback_schema.name}"
encoding = "JSON"
}
}
resource "google_pubsub_subscription" "feedback_sub" {
name = "sample_topic"
topic = google_pubsub_topic.feedback_topic.name
depends_on = [ google_project.env, google_pubsub_schema.feedback_schema]
}
And the schema in the schemas folder
{
"type": "record",
"name": "Avro",
"fields": [
{
"name": "feedback_guid",
"type": "string"
},
{
"name":"req_guid",
"type":"string"
},
{
"name":"model_feedback",
"type":"string"
}
]
}
Deploying Temp Storage
For our dataflow job, we need to specify a location to temporarily write results.
resource "google_storage_bucket" "bucket" {
name = var.data_flow_bucket
location = "US"
force_destroy = true
depends_on = [
google_project.env, google_project_service.gcp_services
]
}
Deploying BigQuery Table
Afterwards we need to deploy our Big Query table that will host the message.
resource "google_bigquery_dataset" "ml" {
dataset_id = "feedback"
friendly_name = "feedback"
description = "All data"
location = "US"
depends_on = [
google_project.env
]
deletion_protection=false
}
resource "google_bigquery_table" "feedback" {
dataset_id = google_bigquery_dataset.ml.dataset_id
table_id = "feedback"
depends_on = [
google_project_service.gcp_services, google_bigquery_dataset.ml
]
schema = file("./schemas/bq_feedback_schema.json")
deletion_protection=false
}
We will specify the schema in the schemas folder
[
{
"name": "feedback_guid",
"type": "STRING",
"mode": "Required",
"description": "The guid of the feedback request"
},
{
"name": "req_guid",
"type": "STRING",
"mode": "Required",
"description": "The guid of the initial request"
},
{
"name": "model_feedback",
"type": "STRING",
"mode": "Required",
"description": "The JSON encoded response of the model"
}
]
Deploying The Dataflow Job
Finally we will deploy the Dataflow job that depends on the source (Pub/Sub), and the sink (Bigquery). We will be using a google pre-created job that does simple transformation and validation for us.
locals {
bq_table = "${var.project_id}:${google_bigquery_dataset.ml.friendly_name}.${google_bigquery_table.feedback.table_id}"
sub = google_pubsub_subscription.feedback_sub.id
}
resource "google_dataflow_job" "pubsub_stream" {
name = "df-pubsub-biquery"
region = var.region
template_gcs_path = "gs://dataflow-templates/2022-01-24-00_RC00/PubSub_Subscription_to_BigQuery"
temp_gcs_location = "gs://${var.data_flow_bucket}/dataflow"
enable_streaming_engine = true
parameters = {
inputSubscription= local.sub
outputTableSpec= local.bq_table
}
# serverless option
additional_experiments = ["enable_prime"]
depends_on = [google_project_service.gcp_services,google_bigquery_table.feedback, google_pubsub_schema.feedback_schema, google_pubsub_subscription.feedback_sub, google_pubsub_topic.feedback_topic]
}
A couple things to point out on the code block
- Our dataflow template is coming from a public gcp bucket, which can be found here.
- The additional experiments allows us to deploy dataflow prime, which is a v2 with server less billing.
- There are many more parameters, this is just a simple example.
Common Errors
- IAM permissions. Make sure the dataflow job has permissions to access pub sub and Big Query. Usually the permissions are assigned to the default account but double check!
- Error table created in Big Query. If there’s an issue with mapping column names,
- Instance not created. If you add components make sure your dependencies allow terraform to plan your code appropriately. If you try to deploy the dataflow job before Pub/Sub has been deployed, you will get issues.
Conclusion
Thanks for reading, feel free to fork the github repo with the complete project https://github.com/Diophontine/pubsub-dataflow-bq-terraform