Rqueue is an asynchronous task executor(worker) built for spring framework based on the spring framework's messaging library backed by Redis.
- A message can be delayed for an arbitrary period of time or delivered immediately.
- Multiple messages can be consumed in parallel by different workers.
- Message delivery: It's guaranteed that a message is consumed at least once. (Message would be consumed by a worker more than once due to the failure in the underlying worker/restart-process etc, otherwise exactly one delivery)
Rqueue supports two types of tasks.
- Execute tasks as soon as possible
- Delayed tasks (task that would be scheduled at given delay, run task at 2:30PM)
Task execution can be configured in different ways
- By default a tasks would be retried for Integer.MAX_VALUE number of times
- If we do not need retry then we need to set retry count to zero
- After retrying/executing a task N (>=1) times if we can't execute the given task then the task can be discarded or push to dead-letter-queue
Add Dependency
- Get latest one from Maven central
- Gradle
implementation 'com.github.sonus21:rqueue-spring-boot-starter:1.3-RELEASE'- Maven
<dependency> <groupId>com.github.sonus21</groupId> <artifactId>rqueue-spring-boot-starter</artifactId> <version>1.3-RELEASE</version> <type>pom</type> </dependency>
-
Add Dependency Get latest one from Maven central
- Gradle
implementation 'com.github.sonus21:rqueue-spring:1.3-RELEASE'- Maven
<dependency> <groupId>com.github.sonus21</groupId> <artifactId>rqueue-spring</artifactId> <version>1.3-RELEASE</version> <type>pom</type> </dependency>
-
Add annotation
EnableRqueueon application config class -
Provide a RedisConnectionFactory bean
@EnableRqueue
public class Application{
@Bean
public RedisConnectionFactory redisConnectionFactory(){
// return a redis connection factory
}
}A method can be marked as worker/message listener using RqueueListener annotation
@Component
@Slf4j
public class MessageListener {
@RqueueListener(value = "simple-queue")
public void simpleMessage(String message) {
log.info("simple-queue: {}", message);
}
@RqueueListener(value = "delayed-queue", delayedQueue = "true")
public void delayedMessage(String message) {
log.info("delayedMessage: {}", message);
}
@RqueueListener(value = "delayed-queue-2", delayedQueue = "true",
numRetries="3", deadLetterQueue="failed-delayed-queue")
public void delayedMessageWithDlq(String message) {
log.info("delayedMessageWithDlq: {}", message);
}
@RqueueListener(value = "job-queue", delayedQueue = "true",
numRetries="3", deadLetterQueue="failed-job-queue")
public void onMessage(Job job) {
log.info("Job created: {}", job);
}
@RqueueListener(value = "notification-queue", delayedQueue = "true",
numRetries="3", deadLetterQueue="failed-notification-queue")
public void onMessage(Notification notification) {
log.info("Notification message: {}", notification);
}
}All messages can be send using RqueueMessageSender bean's methods. It has handful number of put methods, we can use one of them based on the use case.
public class MessageService {
@AutoWired private RqueueMessageSender rqueueMessageSender;
public void doSomething(){
rqueueMessageSender.put("simple-queue", "Rqueue is configured");
}
public void createJOB(Job job){
//do something
rqueueMessageSender.put("job-queue", job);
}
// send notification in 30 seconds
public void sendNotification(Notification notification){
//do something
rqueueMessageSender.put("notification-queue", notification, 30*1000L);
}
}- A task would be retried without any further configuration
- Method arguments are handled automatically, as in above example even task of
Jobtype can be executed by workers.
Apart from basic configuration, it can be customized heavily, like number of tasks it would be executing concurrently.
More and more configurations can be provided using SimpleRqueueListenerContainerFactory class.
class Application{
@Bean
public SimpleRqueueListenerContainerFactory simpleRqueueListenerContainerFactory(){
// return SimpleRqueueListenerContainerFactory object
}
}Configure Task store
All tasks are stored in Redis database, either we can utilise the same Redis database for entire application or we can provide a separate one for task store. Nonetheless if we need different database then we can configure using setRedisConnectionFactory method.
It's highly recommended to provide master connection as it reads and writes to the Redis database
// Create redis connection factory of your choice either Redission or Lettuce or Jedis
// Get redis configuration
RedisConfiguration redisConfiguration = new RedisConfiguration();
// Set fields of redis configuration
// Create lettuce connection factory
LettuceConnectionFactory lettuceConnectionFactory = new LettuceConnectionFactory(redisConfiguration);
factory.setRedisConnectionFactory(lettuceConnectionFactory);Redis connection failure and retry
Whenever a call to Redis failed then it would be retried in 5 seconds, to change that we can set back off to some different value.
// set backoff time to 100 milli second
factory.setBackOffTime( 100 );Task executor
Number of workers can be configured using setMaxNumWorkers method. For example to configure 10 workers we can do
SimpleRqueueListenerContainerFactory factory = new SimpleRqueueListenerContainerFactory();
factory.setMaxNumWorkers(10);By default number of task executors are same as number of queues. A custom or shared task executor can be configured using factory's setTaskExecutor method, we need to provide an implementation of AsyncTaskExecutor
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setThreadNamePrefix( "taskExecutor" );
threadPoolTaskExecutor.setCorePoolSize(10);
threadPoolTaskExecutor.setMaxPoolSize(50);
threadPoolTaskExecutor.setQueueCapacity(0);
threadPoolTaskExecutor.afterPropertiesSet();
factory.setTaskExecutor(threadPoolTaskExecutor);Manual/Auto start of the container
Whenever container is refreshed then it can be started automatically or manfully. Default behaviour is to start automatically, to change this behaviour set auto-start to false.
factory.setAutoStartup(false);Message converters configuration
Generally any message can be converted to and from without any problems, though it can be customized by providing an implementation org.springframework.messaging.converter.MessageConverter, this message converter must implement both the methods of MessageConverter interface.
Implementation must make sure the return type of method toMessage is Message<String> while as in the case of fromMessage a object can be returned as well.
MessageConverter messageConverter = new SomeFancyMessageConverter();
List<MessageConverter> messageConverters = new ArrayList<>();
messageConverters.add(messageConverter);
factory.setMessageConverters(messageConverters);More than one message converter can be used as well, when more than one message converters are provided then they are used in the order, whichever returns non null value is used.
NOTE: Rqueue support micrometer library for monitoring.
It provides 4 types gauge of metrics.
1. queue.size : number of tasks to be run
2. dead.letter.queue.size : number of tasks in the dead letter queue
3. delayed.queue.size : number of tasks scheduled for later time, it's an approximate number, since some tasks might not have moved to be processed despite best efforts
4. processing.queue.size : number of tasks are being processed. It's also an approximate number due to retry and tasks acknowledgements.
Execution and failure counters can be enabled (by default this is disabled).
We need to set count.execution and count.failure fields of RqueueMetricsProperties
1. execution.count
2. failure.count
All these metrics are tagged
Spring Boot Application
- Add micrometer and the exporter dependencies
- Set tags if any using
rqueue.metrics.tags.<name> = <value> - Enable counting features using
rqueue.metrics.count.execution=true,rqueue.metrics.count.failure=true
Spring Application
- Add micrometer and the exporter dependencies provide MeterRegistry as bean
- Provide bean of RqueueMetricsProperties, in this bean set all the required fields.
Please report problems/bugs to issue tracker. You are most welcome for any pull requests for feature/issue.
The Rqueue is released under version 2.0 of the Apache License.
