public class LessLockingUniversalMySQLQueue extends JdbcQueue
UniversalJdbcQueue, but using a less-locking algorithm -
specific for MySQL, and requires onle one single db table for both queue and
ephemeral storages.
Queue db table schema:
queue_id: bigint, auto increment, see
IQueueMessage.qId()ephemeral_id: bigintmsg_org_timestamp: datetime, see
IQueueMessage.qOriginalTimestamp()msg_timestamp: datetime, see
IQueueMessage.qTimestamp()msg_num_requeues: int, see
IQueueMessage.qNumRequeues()msg_content: blob, message's content| Modifier and Type | Field and Description |
|---|---|
static String |
COL_CONTENT |
static String |
COL_EPHEMERAL_ID |
static String |
COL_NUM_REQUEUES |
static String |
COL_ORG_TIMESTAMP |
static String |
COL_QUEUE_ID |
static String |
COL_TIMESTAMP |
DEFAULT_MAX_RETRIES| Constructor and Description |
|---|
LessLockingUniversalMySQLQueue() |
| Modifier and Type | Method and Description |
|---|---|
protected void |
_finishWithRetries(Connection conn,
IQueueMessage _msg,
int numRetries,
int maxRetries)
Performs "finish" action, retry if deadlock.
|
protected boolean |
_moveFromEphemeralToQueueStorageWithRetries(IQueueMessage _msg,
Connection conn,
int numRetries,
int maxRetries)
Moves a message from ephemeral back to queue storage, retry if deadlock.
|
protected boolean |
_queueWithRetries(Connection conn,
IQueueMessage _msg,
int numRetries,
int maxRetries)
Queues a message, retry if deadlock.
|
protected boolean |
_requeueSilentWithRetries(Connection conn,
IQueueMessage _msg,
int numRetries,
int maxRetries)
Re-queues a message silently, retry if deadlock.
|
protected boolean |
_requeueWithRetries(Connection conn,
IQueueMessage _msg,
int numRetries,
int maxRetries)
Re-queues a message, retry if deadlock.
|
protected UniversalQueueMessage |
_takeWithRetries(Connection conn,
int numRetries,
int maxRetries)
Takes a message from queue, retry if deadlock.
|
boolean |
getFifo() |
protected Collection<IQueueMessage> |
getOrphanFromEphemeralStorage(org.springframework.jdbc.core.JdbcTemplate jdbcTemplate,
long thresholdTimestampMs)
Gets all orphan messages (messages that were left in ephemeral storage
for a long time).
|
String |
getTableNameEphemeral() |
LessLockingUniversalMySQLQueue |
init() |
boolean |
isFifo() |
LessLockingUniversalMySQLQueue |
markFifo(boolean fifo) |
protected boolean |
putToEphemeralStorage(org.springframework.jdbc.core.JdbcTemplate jdbcTemplate,
IQueueMessage _msg)
Puts a message to the ephemeral storage.
|
protected boolean |
putToQueueStorage(org.springframework.jdbc.core.JdbcTemplate jdbcTemplate,
IQueueMessage _msg)
Puts a message to tail of the queue storage.
|
protected UniversalQueueMessage |
readFromEphemeralStorage(org.springframework.jdbc.core.JdbcTemplate jdbcTemplate,
IQueueMessage msg)
Reads a message from the ephemeral storage.
|
protected UniversalQueueMessage |
readFromQueueStorage(org.springframework.jdbc.core.JdbcTemplate jdbcTemplate)
Reads a message from head of queue storage.
|
protected boolean |
removeFromEphemeralStorage(org.springframework.jdbc.core.JdbcTemplate jdbcTemplate,
IQueueMessage _msg)
Removes a message from the ephemeral storage.
|
protected boolean |
removeFromQueueStorage(org.springframework.jdbc.core.JdbcTemplate jdbcTemplate,
IQueueMessage msg)
Removes a message from the queue storage.
|
LessLockingUniversalMySQLQueue |
setFifo(boolean fifo) |
UniversalQueueMessage |
take()
Takes a message out of queue.
|
_getOrphanMessagesWithRetries, destroy, ephemeralSize, finish, getMaxRetries, getOrphanMessages, getTableName, getTransactionIsolationLevel, moveFromEphemeralToQueueStorage, queue, queueSize, requeue, requeueSilent, setMaxRetries, setTableName, setTableNameEphemeral, setTransactionIsolationLevelcommitTransaction, connection, connection, execute, execute, execute, executeSelect, executeSelect, executeSelect, executeSelect, executeSelect, executeSelect, getDataSource, jdbcTemplate, jdbcTemplate, returnConnection, rollbackTransaction, setDataSource, startTransactionaddProfiling, clearProfiling, getCache, getCacheFactory, getFromCache, getFromCache, getProfiling, isCacheEnabled, isCacheItemsExpireAfterWrite, putToCache, putToCache, putToCache, removeFromCache, setCacheFactory, setCacheItemsExpireAfterWrite, startProfilingpublic static final String COL_QUEUE_ID
public static final String COL_EPHEMERAL_ID
public static final String COL_ORG_TIMESTAMP
public static final String COL_TIMESTAMP
public static final String COL_NUM_REQUEUES
public static final String COL_CONTENT
public LessLockingUniversalMySQLQueue setFifo(boolean fifo)
public LessLockingUniversalMySQLQueue markFifo(boolean fifo)
public boolean isFifo()
public boolean getFifo()
public String getTableNameEphemeral()
getTableNameEphemeral in class JdbcQueuepublic LessLockingUniversalMySQLQueue init()
JdbcQueueprotected UniversalQueueMessage readFromQueueStorage(org.springframework.jdbc.core.JdbcTemplate jdbcTemplate)
readFromQueueStorage in class JdbcQueueprotected UniversalQueueMessage readFromEphemeralStorage(org.springframework.jdbc.core.JdbcTemplate jdbcTemplate, IQueueMessage msg)
readFromEphemeralStorage in class JdbcQueueprotected Collection<IQueueMessage> getOrphanFromEphemeralStorage(org.springframework.jdbc.core.JdbcTemplate jdbcTemplate, long thresholdTimestampMs)
getOrphanFromEphemeralStorage in class JdbcQueuethresholdTimestampMs - get all orphan messages that were queued
before this timestampprotected boolean putToQueueStorage(org.springframework.jdbc.core.JdbcTemplate jdbcTemplate,
IQueueMessage _msg)
putToQueueStorage in class JdbcQueueprotected boolean putToEphemeralStorage(org.springframework.jdbc.core.JdbcTemplate jdbcTemplate,
IQueueMessage _msg)
putToEphemeralStorage in class JdbcQueueprotected boolean removeFromQueueStorage(org.springframework.jdbc.core.JdbcTemplate jdbcTemplate,
IQueueMessage msg)
removeFromQueueStorage in class JdbcQueueprotected boolean removeFromEphemeralStorage(org.springframework.jdbc.core.JdbcTemplate jdbcTemplate,
IQueueMessage _msg)
removeFromEphemeralStorage in class JdbcQueueprotected boolean _queueWithRetries(Connection conn, IQueueMessage _msg, int numRetries, int maxRetries) throws SQLException
Note: http://dev.mysql.com/doc/refman/5.0/en/innodb-deadlocks.html
InnoDB uses automatic row-level locking. You can get deadlocks even in the case of transactions that just insert or delete a single row. That is because these operations are not really "atomic"; they automatically set locks on the (possibly several) index records of the row inserted or deleted.
_queueWithRetries in class JdbcQueueSQLExceptionprotected boolean _requeueWithRetries(Connection conn, IQueueMessage _msg, int numRetries, int maxRetries) throws SQLException
Note: http://dev.mysql.com/doc/refman/5.0/en/innodb-deadlocks.html
InnoDB uses automatic row-level locking. You can get deadlocks even in the case of transactions that just insert or delete a single row. That is because these operations are not really "atomic"; they automatically set locks on the (possibly several) index records of the row inserted or deleted.
_requeueWithRetries in class JdbcQueueSQLExceptionprotected boolean _requeueSilentWithRetries(Connection conn, IQueueMessage _msg, int numRetries, int maxRetries) throws SQLException
Note: http://dev.mysql.com/doc/refman/5.0/en/innodb-deadlocks.html
InnoDB uses automatic row-level locking. You can get deadlocks even in the case of transactions that just insert or delete a single row. That is because these operations are not really "atomic"; they automatically set locks on the (possibly several) index records of the row inserted or deleted.
_requeueSilentWithRetries in class JdbcQueueSQLExceptionprotected void _finishWithRetries(Connection conn, IQueueMessage _msg, int numRetries, int maxRetries) throws SQLException
Note: http://dev.mysql.com/doc/refman/5.0/en/innodb-deadlocks.html
InnoDB uses automatic row-level locking. You can get deadlocks even in the case of transactions that just insert or delete a single row. That is because these operations are not really "atomic"; they automatically set locks on the (possibly several) index records of the row inserted or deleted.
_finishWithRetries in class JdbcQueueSQLExceptionprotected UniversalQueueMessage _takeWithRetries(Connection conn, int numRetries, int maxRetries) throws SQLException
Note: http://dev.mysql.com/doc/refman/5.0/en/innodb-deadlocks.html
InnoDB uses automatic row-level locking. You can get deadlocks even in the case of transactions that just insert or delete a single row. That is because these operations are not really "atomic"; they automatically set locks on the (possibly several) index records of the row inserted or deleted.
_takeWithRetries in class JdbcQueueSQLExceptionprotected boolean _moveFromEphemeralToQueueStorageWithRetries(IQueueMessage _msg, Connection conn, int numRetries, int maxRetries) throws SQLException
Note: http://dev.mysql.com/doc/refman/5.0/en/innodb-deadlocks.html
InnoDB uses automatic row-level locking. You can get deadlocks even in the case of transactions that just insert or delete a single row. That is because these operations are not really "atomic"; they automatically set locks on the (possibly several) index records of the row inserted or deleted.
_moveFromEphemeralToQueueStorageWithRetries in class JdbcQueueSQLExceptionpublic UniversalQueueMessage take()
Implementation flow:
Note: ephemeral storage implementation is optional, depends on implementation.
Copyright © 2015 DDTH. All Rights Reserved.