From 6a775f39cf69f0dff3a3f0bd7b1c580b23bcf56c Mon Sep 17 00:00:00 2001 From: Sonu Kumar Date: Wed, 27 Oct 2021 20:10:31 +0530 Subject: [PATCH] Concurrency issue when task executor is provided. (#123) * Concurrency issue when task executor is provided. * remove exit * Removed unused property * increase timeout --- CHANGELOG.md | 13 +++- README.md | 8 +-- build.gradle | 2 +- .../rqueue/annotation/RqueueListener.java | 3 +- .../sonus21/rqueue/config/RqueueConfig.java | 6 -- .../SimpleRqueueListenerContainerFactory.java | 11 ++-- .../sonus21/rqueue/listener/QueueDetail.java | 6 +- .../RqueueMessageListenerContainer.java | 59 +++++++++++++------ .../rqueue/listener/RqueueMessagePoller.java | 2 +- .../sonus21/rqueue/utils/QueueThreadPool.java | 9 ++- .../spring-configuration-metadata.json | 16 +---- .../rqueue/test/tests/BasicListenerTest.java | 2 +- 12 files changed, 79 insertions(+), 58 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 437c0613..7bf8cf33 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # [Rqueue] New and Notable Changes +### [2.10.1] - 18-Oct-2021 + +* Fixes for concurrency when task executor is provided see issue #[122] + ### [2.10.0] - 10-Oct-2021 ### Fixes @@ -8,13 +12,14 @@ * Fixes message move message count (by default 1000 messages are moved) * Potential issue in rename collection * More than one (-) sign in the dashboard -* Fixes for server context path. Rqueue end points would be served relative to x-forwarded-prefix/server.servlet.context-path +* Fixes for server context path. Rqueue end points would be served relative to + x-forwarded-prefix/server.servlet.context-path ### Features * Display completed jobs in the dashboard * Option to choose number of days in the chart - ReactiveWebViewTest + ### [2.9.0] - 30-Jul-2021 ### Fixes @@ -273,3 +278,7 @@ Fixes: [2.9.0]: https://repo1.maven.org/maven2/com/github/sonus21/rqueue-core/2.9.0-RELEASE [2.10.0]: https://repo1.maven.org/maven2/com/github/sonus21/rqueue-core/2.10.0-RELEASE + +[2.10.1]: https://repo1.maven.org/maven2/com/github/sonus21/rqueue-core/2.10.1-RELEASE + +[122]: https://github.com/sonus21/rqueue/issues/122 diff --git a/README.md b/README.md index 26c42558..5fc316fe 100644 --- a/README.md +++ b/README.md @@ -71,14 +71,14 @@ Release Version: [Maven central](https://search.maven.org/search?q=g:com.github. * Add dependency * Gradle ```groovy - implementation 'com.github.sonus21:rqueue-spring-boot-starter:2.10.0-RELEASE' + implementation 'com.github.sonus21:rqueue-spring-boot-starter:2.10.1-RELEASE' ``` * Maven ```xml com.github.sonus21 rqueue-spring-boot-starter - 2.10.0-RELEASE + 2.10.1-RELEASE ``` @@ -91,14 +91,14 @@ Release Version: [Maven central](https://search.maven.org/search?q=g:com.github. * Add Dependency * Gradle ```groovy - implementation 'com.github.sonus21:rqueue-spring:2.10.0-RELEASE' + implementation 'com.github.sonus21:rqueue-spring:2.10.1-RELEASE' ``` * Maven ```xml com.github.sonus21 rqueue-spring - 2.10.0-RELEASE + 2.10.1-RELEASE ``` diff --git a/build.gradle b/build.gradle index 29727f72..2581eb99 100644 --- a/build.gradle +++ b/build.gradle @@ -70,7 +70,7 @@ ext { subprojects { group = 'com.github.sonus21' - version = '2.10.0-RELEASE' + version = '2.10.1-RELEASE' dependencies { // https://mvnrepository.com/artifact/org.springframework/spring-messaging diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/annotation/RqueueListener.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/annotation/RqueueListener.java index 52369742..aea47178 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/annotation/RqueueListener.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/annotation/RqueueListener.java @@ -144,6 +144,7 @@ * have the same concurrency. * * @return concurrency for this worker. + * @see #priority() */ String concurrency() default "-1"; @@ -158,7 +159,7 @@ * *

Priority can be any number. There are two priority control modes. 1. Strict 2. Weighted, in * strict priority mode queue with higher priority is preferred over other queues. In case of - * weighted a round robin approach is used, and weight is followed. + * weighted a round-robin approach is used, and weight is followed. * * @return the priority for this listener. * @see #priorityGroup() diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/config/RqueueConfig.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/config/RqueueConfig.java index 18ab6cdf..e152e161 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/config/RqueueConfig.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/config/RqueueConfig.java @@ -113,12 +113,6 @@ public class RqueueConfig { @Value("${rqueue.retry.per.poll:1}") private int retryPerPoll; - @Value("${rqueue.add.default.queue.with.queue.level.priority:true}") - private boolean addDefaultQueueWithQueueLevelPriority; - - @Value("${rqueue.default.queue.with.queue.level.priority:-1}") - private int defaultQueueWithQueueLevelPriority; - @Value("${rqueue.net.proxy.host:}") private String proxyHost; diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/config/SimpleRqueueListenerContainerFactory.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/config/SimpleRqueueListenerContainerFactory.java index 0ec48857..25e1caf2 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/config/SimpleRqueueListenerContainerFactory.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/config/SimpleRqueueListenerContainerFactory.java @@ -211,16 +211,17 @@ public Integer getMaxNumWorkers() { * for every queue. * *

When you're using custom executor then you should set this number as (thread pool max size - - * number of queues) given executor is not shared. The maxNumWorkers tells how many workers you - * want to run in parallel for all listeners, for example if you have 3 listeners, and you have - * set this as 10 then all 3 listeners would be running maximum **combined 10 jobs** at any point - * of time. + * number of queues) given executor is not shared with other application component. The + * maxNumWorkers tells how many workers you want to run in parallel for all listeners those are + * not having configured concurrency. For example if you have 3 queues without concurrency, and + * you have set this as 10 then all 3 listeners would be running maximum **combined 10 jobs** at + * any point of time. Queues having concurrency will be running at the configured concurrency. * *

What would happen if I set this to very high value while using custom executor?
* 1. Task(s) would be rejected by the executor unless queue size is non-zero
* 2. When queue size is non-zero then it can create duplicate message problem, since the polled * message has not been processed yet. This will happen when {@link - * RqueueListener#visibilityTimeout()} is smaller than the time a task took to execute from the + * RqueueListener#visibilityTimeout()} is smaller than the time a task takes to execute from the * time of polling to final execution. * * @param maxNumWorkers Maximum number of workers. diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/QueueDetail.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/QueueDetail.java index 4c2ad647..9435cffa 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/QueueDetail.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/QueueDetail.java @@ -101,7 +101,7 @@ public QueueConfig toConfig() { List expandQueueDetail(boolean addDefault, int priority) { List queueDetails = new ArrayList<>(); for (Entry entry : getPriority().entrySet()) { - QueueDetail cloneQueueDetail = cloneQueueDetail(entry.getKey(), entry.getValue(), true, name); + QueueDetail cloneQueueDetail = cloneQueueDetail(entry.getKey(), entry.getValue(), name); queueDetails.add(cloneQueueDetail); } if (addDefault) { @@ -121,7 +121,7 @@ List expandQueueDetail(boolean addDefault, int priority) { } private QueueDetail cloneQueueDetail( - String priorityName, Integer priority, boolean systemGenerated, String priorityGroup) { + String priorityName, Integer priority, String priorityGroup) { if (priority == null || priorityName == null) { throw new IllegalStateException("priority name is null"); } @@ -140,7 +140,7 @@ private QueueDetail cloneQueueDetail( .completedQueueName(completedQueueName + suffix) .active(active) .batchSize(batchSize) - .systemGenerated(systemGenerated) + .systemGenerated(true) .priorityGroup(priorityGroup) .concurrency(concurrency) .priority(Collections.singletonMap(Constants.DEFAULT_PRIORITY_KEY, priority)) diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainer.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainer.java index 5d4a4e0d..6df226ed 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainer.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainer.java @@ -255,11 +255,29 @@ private void initializeQueue() { defaultTaskExecutor = true; taskExecutor = createDefaultTaskExecutor(queueDetails); } else { - initializeThreadMap(queueDetails, taskExecutor, false, queueDetails.size()); + initializeThreadMapForNonDefaultExecutor(queueDetails); } initializeRunningQueueState(); } + private void initializeThreadMapForNonDefaultExecutor( + List registeredActiveQueueDetail) { + List queueDetails = + registeredActiveQueueDetail.stream() + .filter(e -> !e.isSystemGenerated()) + .collect(Collectors.toList()); + List withoutConcurrency = new ArrayList<>(); + for (QueueDetail queueDetail : queueDetails) { + if (queueDetail.getConcurrency().isValid()) { + addExecutorForConcurrencyBasedQueue(queueDetail, taskExecutor, false); + } else { + withoutConcurrency.add(queueDetail); + } + } + initializeThreadMap( + withoutConcurrency, taskExecutor, false, getWorkersCount(withoutConcurrency.size())); + } + private void initialize() { initializeQueue(); this.postProcessingHandler = @@ -296,9 +314,12 @@ private void initializeThreadMap( AsyncTaskExecutor taskExecutor, boolean defaultExecutor, int workersCount) { + if (queueDetails.isEmpty()) { + return; + } + QueueThreadPool pool = new QueueThreadPool(taskExecutor, defaultExecutor, workersCount); for (QueueDetail queueDetail : queueDetails) { - queueThreadMap.put( - queueDetail.getName(), new QueueThreadPool(taskExecutor, defaultExecutor, workersCount)); + queueThreadMap.put(queueDetail.getName(), pool); } } @@ -332,16 +353,19 @@ private AsyncTaskExecutor createNonConcurrencyBasedExecutor( return executor; } + private void addExecutorForConcurrencyBasedQueue( + QueueDetail queueDetail, AsyncTaskExecutor executor, boolean defaultTaskExecutor) { + int maxJobs = queueDetail.getConcurrency().getMax(); + QueueThreadPool threadPool = new QueueThreadPool(executor, defaultTaskExecutor, maxJobs); + queueThreadMap.put(queueDetail.getName(), threadPool); + } + private void createExecutor(QueueDetail queueDetail) { Concurrency concurrency = queueDetail.getConcurrency(); - int queueCapacity = 0; - int maxJobs = concurrency.getMax(); int corePoolSize = concurrency.getMin(); int maxPoolSize = concurrency.getMax(); - AsyncTaskExecutor executor = - createTaskExecutor(queueDetail, corePoolSize, maxPoolSize, queueCapacity); - QueueThreadPool threadPool = new QueueThreadPool(executor, true, maxJobs); - queueThreadMap.put(queueDetail.getName(), threadPool); + AsyncTaskExecutor executor = createTaskExecutor(queueDetail, corePoolSize, maxPoolSize); + addExecutorForConcurrencyBasedQueue(queueDetail, executor, true); } public AsyncTaskExecutor createDefaultTaskExecutor( @@ -362,15 +386,14 @@ public AsyncTaskExecutor createDefaultTaskExecutor( } private AsyncTaskExecutor createTaskExecutor( - QueueDetail queueDetail, int corePoolSize, int maxPoolSize, int queueCapacity) { + QueueDetail queueDetail, int corePoolSize, int maxPoolSize) { String name = ThreadUtils.getWorkerName(queueDetail.getName()); - return ThreadUtils.createTaskExecutor( - name, name + "-", corePoolSize, maxPoolSize, queueCapacity); + return ThreadUtils.createTaskExecutor(name, name + "-", corePoolSize, maxPoolSize, 0); } private List getQueueDetail(String queue, MappingInformation mappingInformation) { int numRetry = mappingInformation.getNumRetry(); - if (!mappingInformation.getDeadLetterQueueName().isEmpty() && numRetry == -1) { + if (!StringUtils.isEmpty(mappingInformation.getDeadLetterQueueName()) && numRetry == -1) { log.warn( "Dead letter queue {} is set but retry is not set", mappingInformation.getDeadLetterQueueName()); @@ -403,12 +426,13 @@ private List getQueueDetail(String queue, MappingInformation mappin .priority(priority) .priorityGroup(priorityGroup) .build(); + List queueDetails; if (queueDetail.getPriority().size() <= 1) { - return Collections.singletonList(queueDetail); + queueDetails = Collections.singletonList(queueDetail); + } else { + queueDetails = queueDetail.expandQueueDetail(true, -1); } - return queueDetail.expandQueueDetail( - rqueueConfig.isAddDefaultQueueWithQueueLevelPriority(), - rqueueConfig.getDefaultQueueWithQueueLevelPriority()); + return queueDetails; } @Override @@ -450,6 +474,7 @@ protected void doStart() { private Map getQueueThreadMap( String groupName, List queueDetails) { + // this happens only for queue having priorities like critical:10,high:5,low:3 QueueThreadPool queueThreadPool = queueThreadMap.get(groupName); if (queueThreadPool != null) { return queueDetails.stream() diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessagePoller.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessagePoller.java index a7892d3d..2fe1ab04 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessagePoller.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessagePoller.java @@ -82,7 +82,7 @@ private void execute( queueThreadPool)); } catch (Exception e) { if (e instanceof TaskRejectedException) { - queueThreadPool.taskRejected(); + queueThreadPool.taskRejected(queueDetail, message); } log(Level.WARN, "Execution failed Msg: {}", e, message); release(postProcessingHandler, queueThreadPool, queueDetail, message); diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/utils/QueueThreadPool.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/utils/QueueThreadPool.java index 1745d0cd..20736eba 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/utils/QueueThreadPool.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/utils/QueueThreadPool.java @@ -16,6 +16,8 @@ package com.github.sonus21.rqueue.utils; +import com.github.sonus21.rqueue.core.RqueueMessage; +import com.github.sonus21.rqueue.listener.QueueDetail; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; @@ -81,7 +83,10 @@ public String destroy() { return null; } - public void taskRejected() { - log.warn("Task rejected by executor"); + public void taskRejected(QueueDetail queueDetail, RqueueMessage message) { + log.warn( + "Task rejected by executor Queue: {}, Message: {}", + queueDetail.getName(), + message.getMessage()); } } diff --git a/rqueue-core/src/main/resources/META-INF/spring-configuration-metadata.json b/rqueue-core/src/main/resources/META-INF/spring-configuration-metadata.json index 2194d82f..8f598bb7 100644 --- a/rqueue-core/src/main/resources/META-INF/spring-configuration-metadata.json +++ b/rqueue-core/src/main/resources/META-INF/spring-configuration-metadata.json @@ -48,7 +48,7 @@ { "sourceType": "com.github.sonus21.rqueue.config.RqueueConfig", "name": "rqueue.completed.job.cleanup.interval", - "description": "How frequently completed jobs should be disabled in millisecond", + "description": "How frequently completed jobs should be removed (in millisecond)", "type": "java.lang.Long", "defaultValue": 30000 }, @@ -185,20 +185,6 @@ "type": "java.lang.Integer", "defaultValue": 1 }, - { - "sourceType": "com.github.sonus21.rqueue.config.RqueueConfig", - "name": "rqueue.add.default.queue.with.queue.level.priority", - "description": "Add default queue when priority of queues are used", - "type": "java.lang.Boolean", - "defaultValue": true - }, - { - "sourceType": "com.github.sonus21.rqueue.config.RqueueConfig", - "name": "rqueue.default.queue.with.queue.level.priority", - "description": "Priority of the default queue when priority is used, default to use middle priority", - "type": "java.lang.Integer", - "defaultValue": -1 - }, { "sourceType": "com.github.sonus21.rqueue.config.RqueueConfig", "name": "rqueue.net.proxy.host", diff --git a/rqueue-spring-common-test/src/main/java/com/github/sonus21/rqueue/test/tests/BasicListenerTest.java b/rqueue-spring-common-test/src/main/java/com/github/sonus21/rqueue/test/tests/BasicListenerTest.java index d269e42a..d9e013d2 100644 --- a/rqueue-spring-common-test/src/main/java/com/github/sonus21/rqueue/test/tests/BasicListenerTest.java +++ b/rqueue-spring-common-test/src/main/java/com/github/sonus21/rqueue/test/tests/BasicListenerTest.java @@ -98,7 +98,7 @@ protected void verifyScheduledTaskExecution() throws TimedOutException { return messages.contains(job); }, "message should be present in internal storage"); - waitFor(() -> getMessageCount(jobQueue) == 0, "job to run"); + waitFor(() -> getMessageCount(jobQueue) == 0, 30_000, "job to run"); } protected void testMultiMessageConsumer() throws TimedOutException {