Delete entities of Datastore in bulk with Dataflow implemented in Python

Hiroki Fujino
Level Up Coding
Published in
5 min readFeb 28, 2020

--

At times, we may need to delete entities of Google Cloud Datastore in bulk. For example, when customers have stopped using our service we may have to delete their data on our Datastore. In this case, Google Cloud Dataflow is suited for bulk deletion of entities according to this GCP document. Dataflow helps us to develop a data pipeline based on Apache Beam SDK. This also supports Java and Python.

In this article, I’m going to introduce a simple way to delete entities in bulk with Apache Beam Python SDK on Dataflow. Then, I’m going to introduce the way to run a pipeline with runtime parameters.

Development Environment

These below python packages are used in the sample code of this article.

REQUIRED_PACKAGES = [
'apache-beam[gcp]==2.17.0',
'googledatastore==7.1.0'
]

Bulk delete entities with Beam

In this article, I used the modules which are implemented in datastore.v1new package to delete entities. In this sample code, the User entities which have the name column named “foo” would be deleted. The pipeline is executed with following these steps:

  1. Read the entities based on the query
  2. Convert the entity into one’s key
  3. Delete the entities based on the keys
import apache_beam as beam
from apache_beam.io.gcp.datastore.v1new.types import Query
from apache_beam.io.gcp.datastore.v1new.datastoreio import ReadFromDatastore, DeleteFromDatastore
with beam.Pipeline(options=options) as p:
filters = [("name", "=", "foo")]
query = Query(kind="User", project=project_id, filters=filters)
(p
| "Read" >> ReadFromDatastore(query=query)
| "ConvertToKey" >> beam.Map(lambda entity: entity.key)
| "Delete" >> DeleteFromDatastore(project=project_id))

Filter with Key

The above code deletes entities that have a specific property without a key. If a developer wants the entities to be filtered by key, The Key class in the google.cloud.datastore package would achieve that.

from google.cloud.datastore.key import Key
filters = [("__key__", "=", Key("User", element, project=self.project_id))]

Create template

Although a developer can run the pipeline locally, The Dataflow template can be used flexibly. You can check the link.

The template workflow separates the development step from the staging and execution steps.

Once the template is created, the pipeline can be executed with the template by using the Google Cloud Console, REST API, or gcloud command-line tool.

To create the template, use the following command.

python pipeline.py --runner DataflowRunner --project [PROJECT] --temp_location gs://[BUCKET]/[PATH]/temp --template_location gs://[BUCKET]/[PATH]/delete_datastore

Execute the template

To run this pipeline with the template, use the following command. As it’s explained before, the developer also can run this pipeline by using the Google Cloud Console, REST API.

gcloud dataflow jobs run delete_datastore -project [PROJECT] -gcs-location gs://[BUCKET]/[PATH]/delete_datastore -region [REGION]

Once the job is created in Dataflow UI, it will look like this.

The pipeline to delete entities

It’s very simple, isn’t it? Dataflow provides a very simple way to delete entities in bulk. Also, this deletion would be executed scalably on Dataflow.

The entire code is below link:

Execution with runtime parameters

At times we need to run the pipeline with runtime parameters. The Dataflow template can use runtime parameters. You can check the link.

With runtime primitive parameters

In the Python SDK, the add_value_provider_argument method can accept runtime parameters.

from apache_beam.options.pipeline_options import PipelineOptionsclass DeleteOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument('--name')

To run this pipeline with the template and runtime parameters, use the following command.

gcloud dataflow jobs run delete_datastore -project [PROJECT] -gcs-location gs://[BUCKET]/[PATH]/delete_datastore -region [REGION] -parameters name=foo

But, as it’s written in the above document, you cannot get runtime parameters before the pipeline is executed.

You can use isAccessible() to check if the value of a ValueProvider is available. If you call get() before pipeline execution, Apache Beam returns an error:
Value only available at runtime, but accessed from a non-runtime context.

def run(argv=None):
options = DeleteOptions(flags=argv)
with beam.Pipeline(options=options) as p:
# This error happens: Value only available at runtime, but accessed from a non-runtime context.
filters=[("name", "=", options.name)])

So, you have to call get() method within the context like the below code. Now it works.

with beam.Pipeline(options=options) as p:
(p
| "Read" >> ReadFromDatastore(query=Query(kind="User", project=project_id, filters=[("name", "=", options.name)]))
| "ConvertToKey" >> beam.Map(lambda entity: entity.key)
| "Delete" >> DeleteFromDatastore(project=project_id))

The entire code is below link:

With runtime array parameters

Let’s think about an advanced case, in which the pipeline accepts array runtime parameters. Unfortunately, PipelineOptions cannot accept array value. So, in this article, the other way to accept array runtime parameters is introduced. In the below code, the pipeline accepts array value with the following these steps:

  1. Pipeline accepts a comma-separated with add_value_provider_argument.
from apache_beam.options.pipeline_options import PipelineOptionsclass DeletionOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
# Ex: names=foo,bar
parser.add_value_provider_argument('--names')

2. In the pipeline context, this string is changed into the string array divided with a comma. In the expand method, the PBegin object becomes PCollection by calling the Impulse() method, then the comma-separated string is divided with a comma within the runtime context.

class SplitNames(PTransform):
def __init__(self, names_opt):
self.names_opt = names_opt
def expand(self, pbegin):
import apache_beam as beam
from apache_beam.transforms import Impulse
return pbegin | Impulse() | beam.FlatMap(lambda _: self.names_opt.get().split(","))

3. Based on the string array, the pipeline deletes the entities.

def run(argv=None):
options = DeleteOptions(flags=argv)
with beam.Pipeline(options=options) as p:
(p
| "SplitNames" >> SplitNames(options.names)
| "Query" >> beam.ParDo(CreateQuery(project_id))
| "Read" >> beam.ParDo(ReadFromDatastore._QueryFn())
| "ConvertToKey" >> beam.Map(lambda entity: entity.key)
| "Delete" >> DeleteFromDatastore(project=project_id))
class CreateQuery(DoFn):
def __init__(self, project_id):
self.project_id = project_id
def process(self, element):
from apache_beam.io.gcp.datastore.v1new.types import Query
filters = [("name", "=", element)]
return [Query(kind="User", project=self.project_id, filters=filters)]

To run this pipeline with runtime parameter which is a comma-separated string, use the following command.

gcloud dataflow jobs run delete_datastore -project [PROJECT] -gcs-location gs://[BUCKET]/[PATH]/delete_datastore -region [REGION] -parameters=^:^names=foo,bar

Once the job is created in Dataflow UI, it will look like this.

The pipeline to delete entities with runtime array parameters

The entire code is below link:

I hope this article helps your work.

Thank you for reading!

--

--