@Experimental(value=SOURCE_SINK) public class DatastoreIO extends Object
PTransforms for reading and writing
Google Cloud Datastore
entities.
The DatastoreIO class provides an API to Read and Write a
PCollection of Datastore Entity. This API currently requires an
authentication workaround described below.
Datastore is a fully managed NoSQL data storage service. An Entity is an object in Datastore, analogous to a row in traditional database table. DatastoreIO supports Read/Write from/to Datastore within Dataflow SDK service.
To use DatastoreIO, users must use gcloud to get credential for Datastore:
$ gcloud auth login
Note that the environment variable CLOUDSDK_EXTRA_SCOPES must be set to the same value when executing a Datastore pipeline, as the local auth cache is keyed by the requested scopes.
To read a PCollection from a query to Datastore, use
read() and its methods {#link DatastoreIO.Read#withDataset}
and {#link DatastoreIO.Read#withQuery} to specify dataset to read, the query
to read from, and optionally DatastoreIO.Source.withHost(java.lang.String) to specify
the host of Datastore.
For example:
// Read a query from Datastore
PipelineOptions options =
PipelineOptionsFactory.fromArgs(args).create();
Pipeline p = Pipeline.create(options);
PCollection<Entity> entities = p.apply(
Read.from(DatastoreIO.read()
.withDataset(datasetId)
.withQuery(query)
.withHost(host)));
p.run();
or:
// Read a query from Datastore
PipelineOptions options =
PipelineOptionsFactory.fromArgs(args).create();
Pipeline p = Pipeline.create(options);
PCollection<Entity> entities = p.apply(DatastoreIO.readFrom(datasetId, query));
p.run();
To write a PCollection to a Datastore, use writeTo(java.lang.String),
specifying the datastore to write to:
PCollection<Entity> entities = ...;
entities.apply(DatastoreIO.writeTo(dataset));
p.run();
To optionally change the host that is used to write to the Datastore, use sink() to build a DatastoreIO Sink and write to it using the Write
transform:
PCollection<Entity> entities = ...;
entities.apply(Write.to(DatastoreIO.sink().withDataset(dataset).withHost(host)));
p.run();
Entities in the PCollection to be written must have complete keys. Complete keys specify the name/id of the entity, where incomplete keys do not. Entities will be committed as upsert (update or insert) mutations. Please read Entities, Properties, and Keys for more information about entity keys.
PipelineRunner that is
used to execute the Dataflow job. Please refer to the documentation of corresponding
PipelineRunners for more details.
Please see Cloud Datastore Sign Up for security and permission related information specific to Datastore.
| Modifier and Type | Class and Description |
|---|---|
static class |
DatastoreIO.DatastoreReader
A
Source.Reader over the records from a query of the datastore. |
static class |
DatastoreIO.Sink
|
static class |
DatastoreIO.Source
A
Source that reads the result rows of a Datastore query as Entity objects. |
| Modifier and Type | Field and Description |
|---|---|
static int |
DATASTORE_BATCH_UPDATE_LIMIT
Datastore has a limit of 500 mutations per batch operation, so we flush
changes to Datastore every 500 entities.
|
static String |
DEFAULT_HOST |
| Constructor and Description |
|---|
DatastoreIO() |
| Modifier and Type | Method and Description |
|---|---|
static DatastoreIO.Source |
read()
Returns an empty
DatastoreIO.Read builder with the default host. |
static Read.Bounded<com.google.api.services.datastore.DatastoreV1.Entity> |
readFrom(String datasetId,
com.google.api.services.datastore.DatastoreV1.Query query)
Returns a
PTransform that reads Datastore entities from the query
against the given dataset. |
static Read.Bounded<com.google.api.services.datastore.DatastoreV1.Entity> |
readFrom(String host,
String datasetId,
com.google.api.services.datastore.DatastoreV1.Query query)
Returns a
PTransform that reads Datastore entities from the query
against the given dataset and host. |
static DatastoreIO.Sink |
sink()
Returns a new
DatastoreIO.Sink builder using the default host. |
static Write.Bound<com.google.api.services.datastore.DatastoreV1.Entity> |
writeTo(String datasetId)
|
public static final String DEFAULT_HOST
public static final int DATASTORE_BATCH_UPDATE_LIMIT
public static DatastoreIO.Source read()
DatastoreIO.Read builder with the default host.
You'll need to configure the dataset and query using DatastoreIO.Source.withDataset(java.lang.String)
and DatastoreIO.Source.withQuery(com.google.api.services.datastore.DatastoreV1.Query).public static Read.Bounded<com.google.api.services.datastore.DatastoreV1.Entity> readFrom(String datasetId, com.google.api.services.datastore.DatastoreV1.Query query)
PTransform that reads Datastore entities from the query
against the given dataset.public static Read.Bounded<com.google.api.services.datastore.DatastoreV1.Entity> readFrom(String host, String datasetId, com.google.api.services.datastore.DatastoreV1.Query query)
PTransform that reads Datastore entities from the query
against the given dataset and host.public static DatastoreIO.Sink sink()
DatastoreIO.Sink builder using the default host.
You need to further configure it using DatastoreIO.Sink.withDataset(java.lang.String), and optionally
DatastoreIO.Sink.withHost(java.lang.String) before using it in a Write transform.
For example: p.apply(Write.to(DatastoreIO.sink().withDataset(dataset)));
public static Write.Bound<com.google.api.services.datastore.DatastoreV1.Entity> writeTo(String datasetId)