Class UploadQueue<T,R>

java.lang.Object
com.cognite.client.queue.UploadQueue<T,R>
Type Parameters:
T - The CDF resource type to upload.
All Implemented Interfaces:
AutoCloseable

public abstract class UploadQueue<T,R> extends Object implements AutoCloseable
The UploadQueue batches together items and uploads them to Cognite Data Fusion (CDF), both to minimize the load on the API, and also to improve throughput. The queue is uploaded to CDF on three conditions: 1) When the queue is 80% full. 2) At a set interval (default is every 10 seconds). 3) When the upload() method is called. The queue is always uploaded when 80% full. This happens on a background thread so your client can keep putting items on the queue while the upload runs in the background. The upload interval trigger must be explicitly enabled by calling the start() method. This starts a background task that triggers a queue upload at a configurable interval. The default interval is every 10 seconds. The trigger interval works in combination with the fill rate interval--this is the recommended way to use the queue. When you are done using the queue, you should call stop() for proper cleanup of background tasks and draining the queue. You can also trigger an upload manually by calling upload(). This is a blocking function.
  • Field Details

    • LOG

      protected final org.slf4j.Logger LOG
    • MIN_MAX_UPLOAD_INTERVAL

      protected static final Duration MIN_MAX_UPLOAD_INTERVAL
    • DEFAULT_MAX_UPLOAD_INTERVAL

      protected static final Duration DEFAULT_MAX_UPLOAD_INTERVAL
    • MAX_MAX_UPLOAD_INTERVAL

      protected static final Duration MAX_MAX_UPLOAD_INTERVAL
    • DEFAULT_QUEUE_SIZE

      protected static final int DEFAULT_QUEUE_SIZE
      See Also:
    • QUEUE_FILL_RATE_THRESHOLD

      protected static final float QUEUE_FILL_RATE_THRESHOLD
      See Also:
    • executor

      protected final ScheduledThreadPoolExecutor executor
    • recurringTask

      protected ScheduledFuture<?> recurringTask
  • Constructor Details

    • UploadQueue

      public UploadQueue()
  • Method Details

    • of

      public static <T, R> UploadQueue<T,R> of(UpsertTarget<T,R> target)
      Builds an upload queue for batching and pushing items to the provided UpsertTarget.
      Type Parameters:
      T - The input type of elements put on the queue.
      R - The output type of elements posted to Cognite Data Fusion. I.e. the objects sent to the postUploadFunction.
      Parameters:
      target - The sink to push data items to.
      Returns:
      The UploadQueue
    • of

      public static <T, R> UploadQueue<T,R> of(UploadTarget<T,R> target)
      Builds an upload queue for batching and pushing items to the provided UploadTarget.
      Type Parameters:
      T - The input type of elements put on the queue.
      R - The output type of elements posted to Cognite Data Fusion. I.e. the objects sent to the postUploadFunction.
      Parameters:
      target - The sink to push data items to.
      Returns:
      The UploadQueue
    • withPostUploadFunction

      public UploadQueue<T,R> withPostUploadFunction(Consumer<List<R>> function)
      Add a post upload function. The post upload function will be called after the successful upload of a batch of data objects to Cognite Data Fusion. The function will be given the list of objects that were uploaded. The post upload function has the potential to block the upload thread, so you should ensure that it is lightweight. If you need to perform a costly operation, we recommend that you hand the costly operation over to a separate thread and let the post upload function return quickly.
      Parameters:
      function - The function to call for each batch of T.
      Returns:
      The UploadQueue with the function configured.
    • withExceptionHandlerFunction

      public UploadQueue<T,R> withExceptionHandlerFunction(Consumer<Exception> function)
      Add an exception handler function. The exception handler function will be called in case of an exception during uploading objects to Cognite Data Fusion. We highly recommend that you add the exception handling function--if not, you risk an upload failing silently.
      Parameters:
      function - The function to call in case of an exception during upload.
      Returns:
      The UploadQueue with the function configured.
    • withQueueSize

      public UploadQueue<T,R> withQueueSize(int queueSize)
      Sets the queue size. The queue size is the maximum number of elements that the queue can hold before starting to block on put operations. The queue will automatically be uploaded when it is 80% full, so you should set the queue size to slightly larger than your desired max batch size. The default queue size is 10k.
      Parameters:
      queueSize - The target queue size.
      Returns:
      The UploadQueue with the consumer configured.
    • withMaxUploadInterval

      public UploadQueue<T,R> withMaxUploadInterval(Duration interval)
      Sets the max upload interval. When you activate the queue background thread via start(), the queue will be uploaded to Cognite Data Fusion at least every upload interval (in addition to the upload triggered at 80% queue fill rate). The default max upload interval is 10 seconds.
      Parameters:
      interval - The target max upload interval.
      Returns:
      The UploadQueue with the upload interval configured.
    • put

      public void put(T element) throws InterruptedException
      Adds an element to the upload queue, waiting if necessary for space to become available. Under normal operating conditions, this method will add the element to the queue and return immediately to the caller. The upload queue will automatically upload its elements to Cognite Data Fusion when the queue is 80% full. The upload happens on a background thread and does not block the caller. However, if you over time add elements to the queue at a higher rate than it can drain itself, you may be blocked to wait for space to become available--this is a backpressure mechanism to prevent CDF from becoming overloaded.
      Parameters:
      element - The data element to add to the queue
      Throws:
      InterruptedException - if interrupted while waiting
      See Also:
    • start

      public boolean start()
      Start the upload thread to perform an upload every maxUploadInterval. The default upload interval is every 10 seconds. If the upload thread has already been started (for example by an earlier call to start() then this method does nothing and returns false.
      Returns:
      true if the upload thread started successfully, false if the upload thread has already been started.
    • close

      public void close() throws Exception
      A mirror of the stop() method to support auto close in a try-with-resources statement.
      Specified by:
      close in interface AutoCloseable
      Throws:
      Exception
      See Also:
    • stop

      public boolean stop() throws InterruptedException
      Stops the upload thread if it is running, waits for all current uploads to finish and ensures the upload queue is empty by calling upload() one last time after shutting down the thread.
      Returns:
      true if the upload thread stopped successfully, false if the upload thread was not started in the first place.
      Throws:
      InterruptedException - if interrupted before while processing.
    • awaitUploads

      public boolean awaitUploads(long timeout, TimeUnit unit) throws InterruptedException
      Blocks until all current data in the queue has been uploaded, or the timeout occurs, or the current thread is interrupted, whichever happens first.
      Parameters:
      timeout - the maximum timeout to wait.
      unit - the unit of the timout argument.
      Returns:
      true if all uploads have completed and false if the timeout elapsed before termination.
      Throws:
      InterruptedException - if interrupted while waiting
    • upload

      public List<R> upload() throws Exception
      Uploads the current elements in the queue. This method will block the calling thread until the upload operation completes. It does not call the postUploadFunction or exceptionHandlerFunction. The uploaded elements are returned directly from this method.
      Returns:
      The uploaded objects.
      Throws:
      Exception - in case of an error during the upload.