Delete entities of Datastore in bulk with Dataflow implemented in Python
At times, we may need to delete entities of Google Cloud Datastore in bulk. For example, when customers have stopped using our service we may have to delete their data on our Datastore. In this case, Google Cloud Dataflow is suited for bulk deletion of entities according to this GCP document. Dataflow helps us to develop a data pipeline based on Apache Beam SDK. This also supports Java and Python.
In this article, I’m going to introduce a simple way to delete entities in bulk with Apache Beam Python SDK on Dataflow. Then, I’m going to introduce the way to run a pipeline with runtime parameters.
Development Environment
These below python packages are used in the sample code of this article.
REQUIRED_PACKAGES = [
'apache-beam[gcp]==2.17.0',
'googledatastore==7.1.0'
]
Bulk delete entities with Beam
In this article, I used the modules which are implemented in datastore.v1new package to delete entities. In this sample code, the User entities which have the name column named “foo” would be deleted. The pipeline is executed with following these steps:
- Read the entities based on the query
- Convert the entity into one’s key
- Delete the entities based on the keys
import apache_beam as beam
from apache_beam.io.gcp.datastore.v1new.types import Query
from apache_beam.io.gcp.datastore.v1new.datastoreio import ReadFromDatastore, DeleteFromDatastorewith beam.Pipeline(options=options) as p:
filters = [("name", "=", "foo")]
query = Query(kind="User", project=project_id, filters=filters)
(p
| "Read" >> ReadFromDatastore(query=query)
| "ConvertToKey" >> beam.Map(lambda entity: entity.key)
| "Delete" >> DeleteFromDatastore(project=project_id))
Filter with Key
The above code deletes entities that have a specific property without a key. If a developer wants the entities to be filtered by key, The Key class in the google.cloud.datastore package would achieve that.
from google.cloud.datastore.key import Key
filters = [("__key__", "=", Key("User", element, project=self.project_id))]
Create template
Although a developer can run the pipeline locally, The Dataflow template can be used flexibly. You can check the link.
The template workflow separates the development step from the staging and execution steps.
Once the template is created, the pipeline can be executed with the template by using the Google Cloud Console, REST API, or gcloud command-line tool.
To create the template, use the following command.
python pipeline.py --runner DataflowRunner --project [PROJECT] --temp_location gs://[BUCKET]/[PATH]/temp --template_location gs://[BUCKET]/[PATH]/delete_datastore
Execute the template
To run this pipeline with the template, use the following command. As it’s explained before, the developer also can run this pipeline by using the Google Cloud Console, REST API.
gcloud dataflow jobs run delete_datastore -project [PROJECT] -gcs-location gs://[BUCKET]/[PATH]/delete_datastore -region [REGION]
Once the job is created in Dataflow UI, it will look like this.
It’s very simple, isn’t it? Dataflow provides a very simple way to delete entities in bulk. Also, this deletion would be executed scalably on Dataflow.
The entire code is below link:
Execution with runtime parameters
At times we need to run the pipeline with runtime parameters. The Dataflow template can use runtime parameters. You can check the link.
With runtime primitive parameters
In the Python SDK, the add_value_provider_argument method can accept runtime parameters.
from apache_beam.options.pipeline_options import PipelineOptionsclass DeleteOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument('--name')
To run this pipeline with the template and runtime parameters, use the following command.
gcloud dataflow jobs run delete_datastore -project [PROJECT] -gcs-location gs://[BUCKET]/[PATH]/delete_datastore -region [REGION] -parameters name=foo
But, as it’s written in the above document, you cannot get runtime parameters before the pipeline is executed.
You can use
isAccessible()
to check if the value of aValueProvider
is available. If you callget()
before pipeline execution, Apache Beam returns an error:Value only available at runtime, but accessed from a non-runtime context.
def run(argv=None):
options = DeleteOptions(flags=argv) with beam.Pipeline(options=options) as p:
# This error happens: Value only available at runtime, but accessed from a non-runtime context.
filters=[("name", "=", options.name)])
So, you have to call get() method within the context like the below code. Now it works.
with beam.Pipeline(options=options) as p:
(p
| "Read" >> ReadFromDatastore(query=Query(kind="User", project=project_id, filters=[("name", "=", options.name)]))
| "ConvertToKey" >> beam.Map(lambda entity: entity.key)
| "Delete" >> DeleteFromDatastore(project=project_id))
The entire code is below link:
With runtime array parameters
Let’s think about an advanced case, in which the pipeline accepts array runtime parameters. Unfortunately, PipelineOptions cannot accept array value. So, in this article, the other way to accept array runtime parameters is introduced. In the below code, the pipeline accepts array value with the following these steps:
- Pipeline accepts a comma-separated with add_value_provider_argument.
from apache_beam.options.pipeline_options import PipelineOptionsclass DeletionOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
# Ex: names=foo,bar
parser.add_value_provider_argument('--names')
2. In the pipeline context, this string is changed into the string array divided with a comma. In the expand method, the PBegin object becomes PCollection by calling the Impulse() method, then the comma-separated string is divided with a comma within the runtime context.
class SplitNames(PTransform):
def __init__(self, names_opt):
self.names_opt = names_opt def expand(self, pbegin):
import apache_beam as beam
from apache_beam.transforms import Impulse
return pbegin | Impulse() | beam.FlatMap(lambda _: self.names_opt.get().split(","))
3. Based on the string array, the pipeline deletes the entities.
def run(argv=None):
options = DeleteOptions(flags=argv) with beam.Pipeline(options=options) as p:
(p
| "SplitNames" >> SplitNames(options.names)
| "Query" >> beam.ParDo(CreateQuery(project_id))
| "Read" >> beam.ParDo(ReadFromDatastore._QueryFn())
| "ConvertToKey" >> beam.Map(lambda entity: entity.key)
| "Delete" >> DeleteFromDatastore(project=project_id))class CreateQuery(DoFn):
def __init__(self, project_id):
self.project_id = project_id def process(self, element):
from apache_beam.io.gcp.datastore.v1new.types import Query
filters = [("name", "=", element)]
return [Query(kind="User", project=self.project_id, filters=filters)]
To run this pipeline with runtime parameter which is a comma-separated string, use the following command.
gcloud dataflow jobs run delete_datastore -project [PROJECT] -gcs-location gs://[BUCKET]/[PATH]/delete_datastore -region [REGION] -parameters=^:^names=foo,bar
Once the job is created in Dataflow UI, it will look like this.
The entire code is below link:
I hope this article helps your work.
Thank you for reading!