public class TriggerExample extends Object
windows to be processed, and demonstrates using various kinds of triggers to control when the results for each window are emitted.
This example uses a portion of real traffic data from San Diego freeways. It contains readings from sensor stations set up along each freeway. Each sensor reading includes a calculation of the 'total flow' across all lanes in that freeway direction.
Concepts:
1. The default triggering behavior 2. Late data with the default trigger 3. How to get speculative estimates 4. Combining late data and speculative estimates
Before running this example, it will be useful to familiarize yourself with Dataflow triggers and understand the concept of 'late data', See: https://cloud.google.com/dataflow/model/triggers and https://cloud.google.com/dataflow/model/windowing#Advanced
The example pipeline reads data from a Pub/Sub topic. By default, running the example will
also run an auxiliary pipeline to inject data from the default --input file to the
--pubsubTopic. The auxiliary pipeline puts a timestamp on the injected data so that the
example pipeline can operate on event time (rather than arrival time). The auxiliary
pipeline also randomly simulates late data, by setting the timestamps of some of the data
elements to be in the past. You may override the default --input with the file of your
choosing or set --input="" which will disable the automatic Pub/Sub injection, and allow
you to use a separate tool to publish to the given topic.
The example is configured to use the default Pub/Sub topic and the default BigQuery table
from the example common package (there are no defaults for a general Dataflow pipeline).
You can override them by using the --pubsubTopic, --bigQueryDataset, and
--bigQueryTable options. If the Pub/Sub topic or the BigQuery table do not exist,
the example will try to create them.
The pipeline outputs its results to a BigQuery table.
Here are some queries you can use to see interesting results:
Replace <enter_table_name> in the query below with the name of the BigQuery table.
Replace <enter_window_interval> in the query below with the window interval.
To see the results of the default trigger,
Note: When you start up your pipeline, you'll initially see results from 'late' data. Wait after
the window duration, until the first pane of non-late data has been emitted, to see more
interesting results.
SELECT * FROM enter_table_name WHERE trigger_type = "default" ORDER BY window DESC
To see the late data i.e. dropped by the default trigger,
SELECT * FROM <enter_table_name> WHERE trigger_type = "withAllowedLateness" and
(timing = "LATE" or timing = "ON_TIME") and freeway = "5" ORDER BY window DESC, processing_time
To see the the difference between accumulation mode and discarding mode,
SELECT * FROM <enter_table_name> WHERE (timing = "LATE" or timing = "ON_TIME") AND
(trigger_type = "withAllowedLateness" or trigger_type = "sequential") and freeway = "5" ORDER BY
window DESC, processing_time
To see speculative results every minute,
SELECT * FROM <enter_table_name> WHERE trigger_type = "speculative" and freeway = "5"
ORDER BY window DESC, processing_time
To see speculative results every five minutes after the end of the window
SELECT * FROM <enter_table_name> WHERE trigger_type = "sequential" and timing != "EARLY"
and freeway = "5" ORDER BY window DESC, processing_time
To see the first and the last pane for a freeway in a window for all the trigger types,
SELECT * FROM <enter_table_name> WHERE (isFirst = true or isLast = true) ORDER BY window
To reduce the number of results for each query we can add additional where clauses.
For examples, To see the results of the default trigger,
SELECT * FROM <enter_table_name> WHERE trigger_type = "default" AND freeway = "5" AND
window = "<enter_window_interval>"
The example will try to cancel the pipelines on the signal to terminate the process (CTRL-C) and then exits.
| Modifier and Type | Class and Description |
|---|---|
static class |
TriggerExample.InsertDelays
Add current time to each record.
|
static interface |
TriggerExample.TrafficFlowOptions
Inherits standard configuration options.
|
| Modifier and Type | Field and Description |
|---|---|
static Duration |
FIVE_MINUTES |
static Duration |
ONE_DAY |
static Duration |
ONE_MINUTE |
static int |
WINDOW_DURATION |
| Constructor and Description |
|---|
TriggerExample() |
public static final int WINDOW_DURATION
public static final Duration ONE_MINUTE
public static final Duration FIVE_MINUTES
public static final Duration ONE_DAY