public final class LcKafkaConsumerBuilder<K,V> extends Object
| Modifier and Type | Method and Description |
|---|---|
<K1 extends K,V1 extends V> |
buildAsync() |
<K1 extends K,V1 extends V> |
buildAuto()
Build a consumer which commit offset automatically in fixed interval.
|
<K1 extends K,V1 extends V> |
buildPartialAsync() |
<K1 extends K,V1 extends V> |
buildPartialSync() |
<K1 extends K,V1 extends V> |
buildSync() |
LcKafkaConsumerBuilder<K,V> |
gracefulShutdownMs(Duration duration) |
LcKafkaConsumerBuilder<K,V> |
gracefulShutdownMs(long gracefulShutdownMs) |
LcKafkaConsumerBuilder<K,V> |
maxPendingAsyncCommits(int maxPendingAsyncCommits)
When using async consumer to commit offset asynchronously, this argument can force consumer to do a synchronous
commit after there's already
maxPendingAsyncCommits async commits on the fly without response from broker. |
LcKafkaConsumerBuilder<K,V> |
messageHandler(MessageHandler<K,V> messageHandler)
Change the
MessageHandler to handle the consumed msg from kafka. |
static <K,V> LcKafkaConsumerBuilder<K,V> |
newBuilder(Map<String,Object> kafkaConfigs,
MessageHandler<K,V> messageHandler)
Create a
LcKafkaConsumerBuilder used to build LcKafkaConsumer. |
static <K,V> LcKafkaConsumerBuilder<K,V> |
newBuilder(Map<String,Object> kafkaConfigs,
MessageHandler<K,V> messageHandler,
org.apache.kafka.common.serialization.Deserializer<K> keyDeserializer,
org.apache.kafka.common.serialization.Deserializer<V> valueDeserializer)
Create a
LcKafkaConsumerBuilder used to build LcKafkaConsumer. |
LcKafkaConsumerBuilder<K,V> |
pollTimeout(Duration pollTimeout)
The pollTimeout is the maximum time spent waiting in polling data from kafka broker if data is not available in
the buffer.
|
LcKafkaConsumerBuilder<K,V> |
pollTimeoutMs(long pollTimeoutMs)
The pollTimeout is the maximum time spent waiting in polling data from kafka broker if data is not available in
the buffer.
|
LcKafkaConsumerBuilder<K,V> |
workerPool(ExecutorService workerPool,
boolean shutdownOnStop)
The thread pool used by consumer to handle the consumed messages from kafka.
|
public static <K,V> LcKafkaConsumerBuilder<K,V> newBuilder(Map<String,Object> kafkaConfigs, MessageHandler<K,V> messageHandler)
LcKafkaConsumerBuilder used to build LcKafkaConsumer.kafkaConfigs - the kafka consumer configs. Please refer
this document for
valid configurations.messageHandler - a MessageHandler to handle the consumed msg from kafkaLcKafkaConsumerBuilderpublic static <K,V> LcKafkaConsumerBuilder<K,V> newBuilder(Map<String,Object> kafkaConfigs, MessageHandler<K,V> messageHandler, org.apache.kafka.common.serialization.Deserializer<K> keyDeserializer, org.apache.kafka.common.serialization.Deserializer<V> valueDeserializer)
LcKafkaConsumerBuilder used to build LcKafkaConsumer.kafkaConfigs - the kafka consumer configs. Please refer
this document for
valid configurations.messageHandler - a MessageHandler to handle the consumed msg from kafkakeyDeserializer - The deserializer for key that implements DeserializervalueDeserializer - The deserializer for value that implements DeserializerLcKafkaConsumerBuilderpublic LcKafkaConsumerBuilder<K,V> pollTimeoutMs(long pollTimeoutMs)
If 0, poll operation will return immediately with any records that are available currently in the buffer, else returns empty.
Must not be negative.
pollTimeoutMs - the poll timeout in millisecondspublic LcKafkaConsumerBuilder<K,V> pollTimeout(Duration pollTimeout)
If 0, poll operation will return immediately with any records that are available currently in the buffer, else returns empty.
Must not be negative.
pollTimeout - the poll timeout durationpublic LcKafkaConsumerBuilder<K,V> gracefulShutdownMs(long gracefulShutdownMs)
public LcKafkaConsumerBuilder<K,V> gracefulShutdownMs(Duration duration)
public LcKafkaConsumerBuilder<K,V> maxPendingAsyncCommits(int maxPendingAsyncCommits)
maxPendingAsyncCommits async commits on the fly without response from broker.maxPendingAsyncCommits - do a synchronous commit when pending async commits beyond this limitpublic LcKafkaConsumerBuilder<K,V> messageHandler(MessageHandler<K,V> messageHandler)
MessageHandler to handle the consumed msg from kafka.messageHandler - the handler to handle consumed msgpublic LcKafkaConsumerBuilder<K,V> workerPool(ExecutorService workerPool, boolean shutdownOnStop)
workerPool - a thread pool to handle consumed messagesshutdownOnStop - true to shutdown the input worker pool when this consumer closedpublic <K1 extends K,V1 extends V> LcKafkaConsumer<K1,V1> buildAuto()
while (true) {
final ConsumerRecords records = consumer.poll(pollTimeout);
for (ConsumerRecord record : records) {
handler.handleMessage(record.topic(), record.value());
}
}
Please note that this consumer requires these kafka configs must be set, otherwise
IllegalArgumentException will be thrown:
max.poll.interval.msmax.poll.recordsauto.commit.interval.ms
Though all of these configs have default values in kafka, we still require every user to set them specifically.
Because these configs is vital for using this consumer safely. You should tune them to ensure the polling thread
in this consumer can at least poll once within max.poll.interval.ms during handling consumed messages
to prevent itself from session timeout.
Note that if you set enable.auto.commit to false, this consumer will set it to true by itself.
public <K1 extends K,V1 extends V> LcKafkaConsumer<K1,V1> buildSync()
public <K1 extends K,V1 extends V> LcKafkaConsumer<K1,V1> buildPartialSync()
public <K1 extends K,V1 extends V> LcKafkaConsumer<K1,V1> buildAsync()
public <K1 extends K,V1 extends V> LcKafkaConsumer<K1,V1> buildPartialAsync()
Copyright © 2019 LeanCloud. All rights reserved.