Dataflow is the Google's autoscaling, serverless way of processing both batch and streaming data, a fully-managed cloud version of Apache Beam (Batch & Stream) on Google Cloud Plateform (GCP).

This tutorial is about learning tools for reproducibility of data processing.
It covers a basic introduction of what Dataflow can do and how to operationalize your local data preparation to build a scalable dat a processing workflow on Google Cloud.

What you will learn

In this tutorial you will learn to do the following:

You will need a python environment for going through this tutorial. For example:

conda create -n env_beam python=2.7
conda activate env_beam

You will need the Python package manager pip which can be installed with:

sudo apt install python-pip
pip install -U pip

Then Dataflow and Oauth2 to authenticate to your GCP account. They can be installed as Python packages:

pip install google-cloud-dataflow oauth2client==3.0.0`

Finally another dependency:

pip install --force six==1.10  # downgrade as 1.11 breaks apitools`
>>> import apache_beam as beam


>>> [3, 8, 12] | beam.Map(lambda x : 3*x)
# [9, 24, 36]

>>> [("Jan",3), ("Jan",8), ("Feb",12)] | beam.GroupByKey()
[('Jan', [3, 8]), ('Feb', [12])]


>>> [("Jan",3), ("Jan",8), ("Feb",12)] | beam.GroupByKey() | beam.Map(lambda (mon,days) : (mon,len(days)))
[('Jan', 2), ('Feb', 1)]

Let's start with a simple sequence of tasks for which we will use Apache Beam. This section is actually a summary of this Google codelabs.

Here we will:

Pipeline running locally

Get data from some files from GitHub:

wget https://github.com/GoogleCloudPlatform/training-data-analyst/archive/master.zip

Then navigate to this folder training-data-analyst-master/courses/data_analysis/lab2/python/:

cd master/training-data-analyst-master/courses/data_analysis/lab2/python/

What about dealing with a few java files:

$ ls -1 ../javahelp/src/main/java/com/google/cloud/training/dataanalyst/javahelp/*.java | xargs -L1 wc -l
58 ../javahelp/src/main/java/com/google/cloud/training/dataanalyst/javahelp/Grep.java
129 ../javahelp/src/main/java/com/google/cloud/training/dataanalyst/javahelp/IsPopular.java
212 ../javahelp/src/main/java/com/google/cloud/training/dataanalyst/javahelp/JavaProjectsThatNeedHelp.java
109 ../javahelp/src/main/java/com/google/cloud/training/dataanalyst/javahelp/StreamDemoConsumer.java

ls -1 ../javahelp/src/main/java/com/google/cloud/training/dataanalyst/javahelp/*.java | xargs -L1 wc -l

Here is Google's first example,
grep.py
:

#!/usr/bin/env python

"""
Copyright Google Inc. 2016
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

import apache_beam as beam
import sys

def my_grep(line, term):
   if line.startswith(term):
      yield line

if __name__ == '__main__':
   p = beam.Pipeline(argv=sys.argv)
   input = '../javahelp/src/main/java/com/google/cloud/training/dataanalyst/javahelp/*.java'
   output_prefix = '/tmp/output'
   searchTerm = 'import'

   # find all lines that contain the searchTerm
   (p
      | 'GetJava' >> beam.io.ReadFromText(input)
      | 'Grep' >> beam.FlatMap(lambda line: my_grep(line, searchTerm) )
      | 'write' >> beam.io.WriteToText(output_prefix)
   )

   p.run().wait_until_finish()

what it does:

  1. read data: ReadFromText all input files in .../dataanalyst/javahelp/*.java
  2. process data: Grep every line of each file for a searchTerm, here using searchTerm = 'import'
  3. use output: WriteToText in a given folder: output_prefix = '/tmp/output'
  4. finally run the pipeline to the end wait_until_finish.

Notice the 3 respective transforms in the pipeline: GetJava, Grep and write.

Execute this Beam pipeline simply by running this Python file: python grep.py

$ ls /tmp/output*
ls: cannot access '/tmp/output*': No such file or directory
$ python grep.py 
$ ls /tmp/output*
/tmp/output-00000-of-00001
$ 

Which produced:

$ cat /tmp/output-00000-of-00001 
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import com.google.api.services.bigquery.model.TableRow;
...
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.Top;
import org.apache.beam.sdk.values.KV;
$

Pipeline running on the cloud

Create a bucket storage

Let's do that programmatically:

$ gsutil ls
gs://demo-bucket-patrick/
gs://iris_models/
gs://pme-cx/
$ 
$ BUCKET_NAME="demo-bucket-patrick"
$ 
$ gsutil cp ../javahelp/src/main/java/com/google/cloud/training/dataanalyst/javahelp/*.java gs://${BUCKET_NAME}/javahelp
Copying file://../javahelp/src/main/java/com/google/cloud/training/dataanalyst/javahelp/Grep.java [Content-Type=text/x-java]...
Copying file://../javahelp/src/main/java/com/google/cloud/training/dataanalyst/javahelp/IsPopular.java [Content-Type=text/x-java]...
Copying file://../javahelp/src/main/java/com/google/cloud/training/dataanalyst/javahelp/JavaProjectsThatNeedHelp.java [Content-Type=text/x-java]...
Copying file://../javahelp/src/main/java/com/google/cloud/training/dataanalyst/javahelp/StreamDemoConsumer.java [Content-Type=text/x-java]...
| [4 files][ 16.7 KiB/ 16.7 KiB]                                                
Operation completed over 4 objects/16.7 KiB.  
$
#!/usr/bin/env python

"""
Copyright Google Inc. 2016
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

import apache_beam as beam

def my_grep(line, term):
   if line.startswith(term):
      yield line

PROJECT='pme-cx'
BUCKET='demo-bucket-patrick'

def run():
   argv = [
      '--project={0}'.format(PROJECT),
      '--job_name=examplejob2',
      '--save_main_session',
      '--staging_location=gs://{0}/staging/'.format(BUCKET),
      '--temp_location=gs://{0}/staging/'.format(BUCKET),
      '--runner=DataflowRunner'
   ]

   p = beam.Pipeline(argv=argv)
   input = 'gs://{0}/javahelp/*.java'.format(BUCKET)
   output_prefix = 'gs://{0}/javahelp/output'.format(BUCKET)
   searchTerm = 'import'

   # find all lines that contain the searchTerm
   (p
      | 'GetJava' >> beam.io.ReadFromText(input)
      | 'Grep' >> beam.FlatMap(lambda line: my_grep(line, searchTerm) )
      | 'write' >> beam.io.WriteToText(output_prefix)
   )

   p.run()

if __name__ == '__main__':
   run()

Running it will give you:

$ python grepc.py 
/home/patrick/anaconda3/envs/ml/lib/python2.7/site-packages/apache_beam/io/gcp/gcsio.py:176: DeprecationWarning: object() takes no parameters
  super(GcsIO, cls).__new__(cls, storage_client))
Collecting apache-beam==2.5.0
  Using cached https://files.pythonhosted.org/packages/c6/96/56469c57cb043f36bfdd3786c463fbaeade1e8fcf0593ec7bc7f99e56d38/apache-beam-2.5.0.zip
  Saved /tmp/tmpqZhUbM/apache-beam-2.5.0.zip
Successfully downloaded apache-beam
Collecting apache-beam==2.5.0
  Using cached https://files.pythonhosted.org/packages/ff/10/a59ba412f71fb65412ec7a322de6331e19ec8e75ca45eba7a0708daae31a/apache_beam-2.5.0-cp27-cp27mu-manylinux1_x86_64.whl
  Saved /tmp/tmpqZhUbM/apache_beam-2.5.0-cp27-cp27mu-manylinux1_x86_64.whl
Successfully downloaded apache-beam

$ gsutil ls gs://demo-bucket-patrick/javahelp/ 
gs://demo-bucket-patrick/javahelp/Grep.java
gs://demo-bucket-patrick/javahelp/IsPopular.java
gs://demo-bucket-patrick/javahelp/JavaProjectsThatNeedHelp.java
gs://demo-bucket-patrick/javahelp/StreamDemoConsumer.java
gs://demo-bucket-patrick/javahelp/output-00000-of-00004
gs://demo-bucket-patrick/javahelp/output-00001-of-00004
gs://demo-bucket-patrick/javahelp/output-00002-of-00004
gs://demo-bucket-patrick/javahelp/output-00003-of-00004

This example creates training/test datasets for a Machine Learning model using Dataflow.

While Pandas is fine for experimenting, it is better for the operationalization of your workflow to do the preprocessing in Beam/Dataflow. This will also help if you need to preprocess data on-the-fly, since Beam/Dataflow can also process streaming data.

This section is built from this great notebook by Lak Valliappa

Local installation

In this section, you will:

Start by creating a new Python environment:

conda activate ml
pip uninstall -y google-cloud-dataflow
conda install -y pytz==2018.4
pip install apache-beam[gcp]

Setup your GCP resources

Create a Cloud Storage bucket

PROJECT='pme-cx'
BUCKET='cloud-training-demos-ml_02'
REGION='us-central1'
gsutil mb -l ${REGION} gs://${BUCKET}

Save the SQL query to use later by querying it from BigQuery in Python

The data is natality data (record of births in the US).

The goal is to:

# Create SQL query using natality data after the year 2000
query = """
SELECT
  weight_pounds,
  is_male,
  mother_age,
  plurality,
  gestation_weeks,
  ABS(FARM_FINGERPRINT(CONCAT(CAST(YEAR AS STRING), CAST(month AS STRING)))) AS hashmonth
FROM
  publicdata.samples.natality
WHERE year > 2000
"""
pip install datalab
pip install --upgrade google-cloud-bigquery[pandas]
$ ipython

In [1]: import google.datalab.bigquery as bq
In [3]: 
   ...: # Create SQL query using natality data after the year 2000
   ...: query = """
   ...: SELECT
   ...:   weight_pounds,
   ...:   is_male,
   ...:   mother_age,
   ...:   plurality,
   ...:   gestation_weeks,
   ...:   ABS(FARM_FINGERPRINT(CONCAT(CAST(YEAR AS STRING), CAST(month AS STRING)))) AS hashmonth
   ...: FROM
   ...:   publicdata.samples.natality
   ...: WHERE year > 2000
   ...: """

In [4]: df = bq.Query(query + " LIMIT 100").execute().result().to_dataframe()
   ...: df.head()
   ...: 

Out[4]: 
   weight_pounds  is_male  mother_age  plurality  gestation_weeks            hashmonth
0       8.818490    False          17          1               42  1403073183891835564
1       8.141671    False          29          1               38  8904940584331855459
2       5.948072     True          38          1               38  7108882242435606404
3       8.838332     True          27          1               39  3408502330831153141
4       9.259415     True          28          1               38  1088037545023002395

In [5]:

Create ML dataset using Dataflow

Let's use Cloud Dataflow to read in the BigQuery data, do some preprocessing, and write it out as CSV files.

Instead of using Beam/Dataflow, I had three other options:

However, in this case, I want to do some preprocessing: modifying data so that we can simulate what is known if no ultrasound has been performed.

If I didn't need preprocessing, I could have used the web console. Also, I prefer to script it out rather than run queries on the user interface, so I am using Cloud Dataflow for the preprocessing.

Apache Beam script:
DataFlow_Baby_weight.py
.

Note that after you launch the script above, the actual processing is happening on the cloud. Go to the GCP webconsole to the Dataflow section and monitor the running job. It took about 20 minutes for me.

Finally download the generated .csv files:

gsutil ls gs://${BUCKET}/babyweight/preproc/*-00000*

Preprocessing using tf.transform and Dataflow: Use the flexibility and scalability of Beam for doing your data processing, and use TensorFlow Transforms for its efficiency.

TensorFlow Transforms is a hybrid between Apache Beam and TensorFlow, able to process Beam data transformation on the TensorFlow stack, able to use different hardware (CPUs, GPUs, TPUs).


Coming...


Coming...

In addition to worker resource usage (that are really cheap), a job might consume the following resources, each billed at their own pricing, including but not limited to

How to delete the associated Cloud Storage bucket and its contents

Use gsutil:

BUCKET_NAME="my-cool-bucket"

# You can remove a Cloud Storage bucket by invoking the following command:
gsutil rb gs://${BUCKET_NAME}

How to delete the associated data in BigQuery

According to Google Best practices, deleting BigQuery data is useful if you only work on the most recent data. It is also useful if you are experimenting with data and do not need to preserve it.

If your tables are partitioned by date, the dataset's default table expiration applies to the individual partitions. You can also control partition expiration using the time_partitioning_expiration flag in the CLI or the expirationMs configuration setting in the API. For example, the following command would expire partitions after 3 days:

bq mk --time_partitioning_type=DAY --time_partitioning_expiration=259200 [DATASET].[TABLE]

Here are a few references to read or watch: