public abstract class KafkaQueue extends Object implements IQueue
IQueue.| Constructor and Description |
|---|
KafkaQueue() |
| Modifier and Type | Method and Description |
|---|---|
protected abstract IQueueMessage |
deserialize(byte[] msgData)
Deserilizes a queue message.
|
void |
destroy()
Destroy method.
|
int |
ephemeralSize()
Gets ephemeral-storage's number of items.
|
void |
finish(IQueueMessage msg)
Called when finish processing the message to cleanup ephemeral storage.
|
protected com.github.ddth.kafka.KafkaClient |
getKafkaClient() |
Collection<IQueueMessage> |
getOrphanMessages(long thresholdTimestampMs)
Gets all orphan messages (messages that were left in ephemeral storage
for a long time).
|
com.github.ddth.kafka.KafkaClient.ProducerType |
getProducerType() |
String |
getTopicName() |
String |
getZkConnString()
Zookeeper connection string (format
host1:port1,host2:port2,host3:port3/path). |
KafkaQueue |
init()
Init method.
|
boolean |
moveFromEphemeralToQueueStorage(IQueueMessage msg)
Moves a message from ephemeral back to queue storage.
|
protected boolean |
putToQueue(IQueueMessage msg)
Puts a message to Kafka queue.
|
boolean |
queue(IQueueMessage msg)
Queues a message.
|
int |
queueSize()
Gets queue's number of items.
|
boolean |
requeue(IQueueMessage msg)
Re-queues a message.
|
boolean |
requeueSilent(IQueueMessage msg)
Silently re-queues a message.
|
protected abstract byte[] |
serialize(IQueueMessage msg)
Serializes a queue message to store in Kafka.
|
KafkaQueue |
setKafkaClient(com.github.ddth.kafka.KafkaClient kafkaClient) |
KafkaQueue |
setProducerType(com.github.ddth.kafka.KafkaClient.ProducerType producerType) |
KafkaQueue |
setTopicName(String topicName) |
KafkaQueue |
setZkConnString(String zkConnString)
Sets Zookeeper connection string (format
host1:port1,host2:port2,host3:port3/path). |
IQueueMessage |
take()
Takes a message out of queue.
|
public com.github.ddth.kafka.KafkaClient.ProducerType getProducerType()
public KafkaQueue setProducerType(com.github.ddth.kafka.KafkaClient.ProducerType producerType)
public String getZkConnString()
host1:port1,host2:port2,host3:port3/path).public KafkaQueue setZkConnString(String zkConnString)
host1:port1,host2:port2,host3:port3/path).redisHostAndPort - public String getTopicName()
public KafkaQueue setTopicName(String topicName)
protected com.github.ddth.kafka.KafkaClient getKafkaClient()
public KafkaQueue setKafkaClient(com.github.ddth.kafka.KafkaClient kafkaClient)
public KafkaQueue init() throws Exception
Exceptionpublic void destroy()
protected abstract byte[] serialize(IQueueMessage msg)
msg - protected abstract IQueueMessage deserialize(byte[] msgData)
msgData - protected boolean putToQueue(IQueueMessage msg)
msg - public boolean queue(IQueueMessage msg)
Implementation flow:
public boolean requeue(IQueueMessage msg)
Implementation flow:
Note: ephemeral storage implementation is optional, depends on implementation.
public boolean requeueSilent(IQueueMessage msg)
Implementation flow:
Note: ephemeral storage implementation is optional, depends on implementation.
requeueSilent in interface IQueuepublic void finish(IQueueMessage msg)
Implementation flow:
Note: ephemeral storage implementation is optional, depends on implementation.
public IQueueMessage take()
Implementation flow:
Note: ephemeral storage implementation is optional, depends on implementation.
public Collection<IQueueMessage> getOrphanMessages(long thresholdTimestampMs)
getOrphanMessages in interface IQueuethresholdTimestampMs - public boolean moveFromEphemeralToQueueStorage(IQueueMessage msg)
Implementation flow:
moveFromEphemeralToQueueStorage in interface IQueuetrue if a move has been made, false otherwise
(e.g. the message didn't exist in ephemeral storage)public int queueSize()
public int ephemeralSize()
Note: ephemeral storage implementation is optional, depends on implementation.
ephemeralSize in interface IQueueCopyright © 2015 DDTH. All Rights Reserved.