public class DatastoreIO
extends java.lang.Object
The DatastoreIO class provides an experimental API to Read and Write a
PCollection of Datastore Entity. Currently the class supports
read operations on both the DirectPipelineRunner and DataflowPipelineRunner,
and write operations on the DirectPipelineRunner. This API is subject to
change, and currently requires an authentication workaround described below.
Datastore is a fully managed NoSQL data storage service. An Entity is an object in Datastore, analogous to the a row in traditional database table. DatastoreIO supports Read/Write from/to Datastore within Dataflow SDK service.
To use DatastoreIO, users must set up the environment and use gcloud to get credential for Datastore:
$ export CLOUDSDK_EXTRA_SCOPES=https://www.googleapis.com/auth/datastore $ gcloud auth login
Note that the environment variable CLOUDSDK_EXTRA_SCOPES must be set to the same value when executing a Datastore pipeline, as the local auth cache is keyed by the requested scopes.
To read a PCollection from a query to Datastore, use
read() and its methods {#link DatastoreIO.Read#withDataset}
and {#link DatastoreIO.Read#withQuery} to specify dataset to read, the query
to read from, and optionally DatastoreIO.Source.withHost(java.lang.String) to specify
the host of Datastore.
For example:
// Read a query from Datastore
PipelineOptions options =
PipelineOptionsFactory.fromArgs(args).create();
Pipeline p = Pipeline.create(options);
PCollection<Entity> entities = p.apply(
ReadSource.from(DatastoreIO.read()
.withDataset(datasetId)
.withQuery(query)
.withHost(host)));
p.run();
or:
// Read a query from Datastore
PipelineOptions options =
PipelineOptionsFactory.fromArgs(args).create();
Pipeline p = Pipeline.create(options);
PCollection<Entity> entities = p.apply(DatastoreIO.readFrom(datasetId, query));
p.run();
To write a PCollection to a datastore, use
DatastoreIO.Sink, specifying DatastoreIO.Sink.to(java.lang.String) to specify
the datastore to write to, and optionally TextIO.Write.named(java.lang.String) to specify
the name of the pipeline step. For example:
// A simple Write to Datastore with DirectPipelineRunner (writing is not
// yet implemented for other runners):
PCollection<Entity> entities = ...;
lines.apply(DatastoreIO.Write.to("Write entities", datastore));
p.run();
| Modifier and Type | Class and Description |
|---|---|
static class |
DatastoreIO.DatastoreReader
A reader over the records from a query of the datastore.
|
static class |
DatastoreIO.Sink
A
PTransform that writes a PCollection<Entity> containing
entities to a Datastore kind. |
static class |
DatastoreIO.Source
A source that reads the result rows of a Datastore query as
Entity objects. |
| Modifier and Type | Field and Description |
|---|---|
static int |
DATASTORE_BATCH_UPDATE_LIMIT
Datastore has a limit of 500 mutations per batch operation, so we flush
changes to Datastore every 500 entities.
|
static java.lang.String |
DEFAULT_HOST |
| Constructor and Description |
|---|
DatastoreIO() |
| Modifier and Type | Method and Description |
|---|---|
static DatastoreIO.Source |
read()
Returns an empty
DatastoreIO.Read builder with the default host. |
static ReadSource.Bound<com.google.api.services.datastore.DatastoreV1.Entity> |
readFrom(java.lang.String datasetId,
com.google.api.services.datastore.DatastoreV1.Query query)
Returns a
PTransform which reads Datastore entities from the query
against the given dataset. |
static ReadSource.Bound<com.google.api.services.datastore.DatastoreV1.Entity> |
readFrom(java.lang.String host,
java.lang.String datasetId,
com.google.api.services.datastore.DatastoreV1.Query query)
Returns a
PTransform which reads Datastore entities from the query
against the given dataset and host. |
static DatastoreIO.Sink |
write()
Returns a new
DatastoreIO.Sink builder using the default host. |
static DatastoreIO.Sink |
writeTo(java.lang.String datasetId)
Returns a new
DatastoreIO.Sink builder using the default host and given dataset. |
public static final java.lang.String DEFAULT_HOST
public static final int DATASTORE_BATCH_UPDATE_LIMIT
public static DatastoreIO.Source read()
DatastoreIO.Read builder with the default host.
You'll need to configure the dataset and query using DatastoreIO.Source.withDataset(java.lang.String)
and DatastoreIO.Source.withQuery(com.google.api.services.datastore.DatastoreV1.Query).public static ReadSource.Bound<com.google.api.services.datastore.DatastoreV1.Entity> readFrom(java.lang.String datasetId, com.google.api.services.datastore.DatastoreV1.Query query)
PTransform which reads Datastore entities from the query
against the given dataset.public static ReadSource.Bound<com.google.api.services.datastore.DatastoreV1.Entity> readFrom(java.lang.String host, java.lang.String datasetId, com.google.api.services.datastore.DatastoreV1.Query query)
PTransform which reads Datastore entities from the query
against the given dataset and host.public static DatastoreIO.Sink write()
DatastoreIO.Sink builder using the default host.
You need to further configure it using DatastoreIO.Sink.named(java.lang.String),
DatastoreIO.Sink.to(java.lang.String), and optionally DatastoreIO.Sink.withHost(java.lang.String).public static DatastoreIO.Sink writeTo(java.lang.String datasetId)
DatastoreIO.Sink builder using the default host and given dataset.
You need to further configure it using DatastoreIO.Sink.named(java.lang.String),
and optionally DatastoreIO.Sink.withHost(java.lang.String).