public abstract class KafkaQueue extends Object implements IQueue
IQueue.| Constructor and Description |
|---|
KafkaQueue() |
| Modifier and Type | Method and Description |
|---|---|
protected abstract IQueueMessage |
deserialize(byte[] msgData)
Deserilizes a queue message.
|
void |
destroy()
Destroy method.
|
int |
ephemeralSize()
Gets ephemeral-storage's number of items.
|
void |
finish(IQueueMessage msg)
Called when finish processing the message to cleanup ephemeral storage.
|
int |
getBufferSize()
Number of maximum items
KafkaQueue should buffer. |
String |
getConsumerGroupId()
Kafka's group-id to consume messages.
|
protected com.github.ddth.kafka.KafkaClient |
getKafkaClient() |
long |
getOffsetCommitPeriodMs()
Consumer's topic offset will be committed to server every
offsetCommitPeriodMs milliseconds.Number of maximum items
KafkaQueue should buffer. |
Collection<IQueueMessage> |
getOrphanMessages(long thresholdTimestampMs)
Gets all orphan messages (messages that were left in ephemeral storage
for a long time).
|
com.github.ddth.kafka.KafkaClient.ProducerType |
getProducerType() |
String |
getTopicName()
Name of Kafka topic to store queue messages.
|
String |
getZkConnString()
Zookeeper connection string (format
host1:port1,host2:port2,host3:port3/path). |
KafkaQueue |
init()
Init method.
|
boolean |
isLeaderAutoRebalance()
Should we allow leaders to rebalance? Default value is
false. |
boolean |
moveFromEphemeralToQueueStorage(IQueueMessage msg)
Moves a message from ephemeral back to queue storage.
|
protected boolean |
putToQueue(IQueueMessage msg)
Puts a message to Kafka queue.
|
boolean |
queue(IQueueMessage msg)
Queues a message.
|
int |
queueSize()
Gets queue's number of items.
|
boolean |
requeue(IQueueMessage msg)
Re-queues a message.
|
boolean |
requeueSilent(IQueueMessage msg)
Silently re-queues a message.
|
protected abstract byte[] |
serialize(IQueueMessage msg)
Serializes a queue message to store in Kafka.
|
KafkaQueue |
setBufferSize(int bufferSize) |
KafkaQueue |
setConsumerGroupId(String consumerGroupId) |
KafkaQueue |
setKafkaClient(com.github.ddth.kafka.KafkaClient kafkaClient)
An external
KafkaClient can be used. |
KafkaQueue |
setLeaderAutoRebalance(boolean leaderAutoRebalance) |
KafkaQueue |
setOffsetCommitPeriodMs(long offsetCommitPeriodMs) |
KafkaQueue |
setProducerType(com.github.ddth.kafka.KafkaClient.ProducerType producerType) |
KafkaQueue |
setTopicName(String topicName) |
KafkaQueue |
setZkConnString(String zkConnString)
Sets Zookeeper connection string (format
host1:port1,host2:port2,host3:port3/path). |
IQueueMessage |
take()
Takes a message out of queue.
|
protected IQueueMessage |
takeFromQueue()
Takes a message from Kafka queue.
|
public com.github.ddth.kafka.KafkaClient.ProducerType getProducerType()
public KafkaQueue setProducerType(com.github.ddth.kafka.KafkaClient.ProducerType producerType)
public String getZkConnString()
host1:port1,host2:port2,host3:port3/path).public KafkaQueue setZkConnString(String zkConnString)
host1:port1,host2:port2,host3:port3/path).redisHostAndPort - public String getTopicName()
public KafkaQueue setTopicName(String topicName)
public boolean isLeaderAutoRebalance()
false.public KafkaQueue setLeaderAutoRebalance(boolean leaderAutoRebalance)
public String getConsumerGroupId()
public KafkaQueue setConsumerGroupId(String consumerGroupId)
public int getBufferSize()
KafkaQueue should buffer. Default value
is 1. Higher value could gain more performance in consuming
messages, but could cause message lost if client crashes.public KafkaQueue setBufferSize(int bufferSize)
public long getOffsetCommitPeriodMs()
offsetCommitPeriodMs milliseconds.Number of maximum items
KafkaQueue should buffer. Default value is 1. Higher
value could gain more performance in consuming messages, but could cause
message lost if client crashes.public KafkaQueue setOffsetCommitPeriodMs(long offsetCommitPeriodMs)
protected com.github.ddth.kafka.KafkaClient getKafkaClient()
public KafkaQueue setKafkaClient(com.github.ddth.kafka.KafkaClient kafkaClient)
KafkaClient can be used. If not set,
KafkaClient will automatically create a KafkaClient for
its own use.kafkaClient - public KafkaQueue init() throws Exception
Exceptionpublic void destroy()
protected abstract byte[] serialize(IQueueMessage msg) throws QueueException
msg - QueueExceptionprotected abstract IQueueMessage deserialize(byte[] msgData) throws QueueException
msgData - QueueExceptionprotected IQueueMessage takeFromQueue()
protected boolean putToQueue(IQueueMessage msg)
msg - public boolean queue(IQueueMessage msg)
Implementation flow:
public boolean requeue(IQueueMessage msg)
Implementation flow:
Note: ephemeral storage implementation is optional, depends on implementation.
public boolean requeueSilent(IQueueMessage msg)
Implementation flow:
Note: ephemeral storage implementation is optional, depends on implementation.
requeueSilent in interface IQueuepublic void finish(IQueueMessage msg)
Implementation flow:
Note: ephemeral storage implementation is optional, depends on implementation.
public IQueueMessage take()
Implementation flow:
Note: ephemeral storage implementation is optional, depends on implementation.
public Collection<IQueueMessage> getOrphanMessages(long thresholdTimestampMs)
getOrphanMessages in interface IQueuethresholdTimestampMs - public boolean moveFromEphemeralToQueueStorage(IQueueMessage msg)
Implementation flow:
moveFromEphemeralToQueueStorage in interface IQueuetrue if a move has been made, false otherwise
(e.g. the message didn't exist in ephemeral storage)public int queueSize()
public int ephemeralSize()
Note: ephemeral storage implementation is optional, depends on implementation.
ephemeralSize in interface IQueueCopyright © 2015 DDTH. All Rights Reserved.