From d9046bb9cc24e3caa5a3fdb37d48f8ae8dc9707f Mon Sep 17 00:00:00 2001 From: Kai Zimmermann Date: Tue, 7 Jun 2016 18:38:39 +0200 Subject: [PATCH 01/10] Upgraded Spring AMQP. Added task scheduler. Improved executor shutdown. Signed-off-by: Kai Zimmermann --- .../simulator/amqp/MessageService.java | 17 ------ ...ableDelegatingSecurityContextExecutor.java | 59 +++++++++++++++++++ .../scheduling/ExecutorAutoConfiguration.java | 36 ++++++++--- pom.xml | 1 + 4 files changed, 89 insertions(+), 24 deletions(-) create mode 100644 hawkbit-autoconfigure/src/main/java/org/eclipse/hawkbit/autoconfigure/scheduling/CloseableDelegatingSecurityContextExecutor.java diff --git a/examples/hawkbit-device-simulator/src/main/java/org/eclipse/hawkbit/simulator/amqp/MessageService.java b/examples/hawkbit-device-simulator/src/main/java/org/eclipse/hawkbit/simulator/amqp/MessageService.java index bd48bbe12..8bb1e1292 100644 --- a/examples/hawkbit-device-simulator/src/main/java/org/eclipse/hawkbit/simulator/amqp/MessageService.java +++ b/examples/hawkbit-device-simulator/src/main/java/org/eclipse/hawkbit/simulator/amqp/MessageService.java @@ -82,21 +82,4 @@ public class MessageService { clazz.getTypeName()); return (T) rabbitTemplate.getMessageConverter().fromMessage(message); } - - /** - * Method to verify if lwm2m header is set. - * - * @param message - * the message with the header - * @param header - * the header to verify - */ - public void checkIfLwm2mHeaderEmpty(final Message message, final String header) { - final Object headerObject = message.getMessageProperties().getHeaders().get(header); - if (null == headerObject) { - logAndThrowMessageError(message, "Header of " + header + "empty."); - } - - } - } diff --git a/hawkbit-autoconfigure/src/main/java/org/eclipse/hawkbit/autoconfigure/scheduling/CloseableDelegatingSecurityContextExecutor.java b/hawkbit-autoconfigure/src/main/java/org/eclipse/hawkbit/autoconfigure/scheduling/CloseableDelegatingSecurityContextExecutor.java new file mode 100644 index 000000000..1160e12e0 --- /dev/null +++ b/hawkbit-autoconfigure/src/main/java/org/eclipse/hawkbit/autoconfigure/scheduling/CloseableDelegatingSecurityContextExecutor.java @@ -0,0 +1,59 @@ +/** + * Copyright (c) 2015 Bosch Software Innovations GmbH and others. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + */ +package org.eclipse.hawkbit.autoconfigure.scheduling; + +import java.util.List; +import java.util.concurrent.Executor; +import java.util.concurrent.ThreadPoolExecutor; + +import org.springframework.context.annotation.Bean; +import org.springframework.security.concurrent.DelegatingSecurityContextExecutor; +import org.springframework.security.core.context.SecurityContext; +import org.springframework.security.core.context.SecurityContextHolder; + +/** + * Extension for {@link DelegatingSecurityContextExecutor} to allow proper + * shutdown at {@link Bean} destruction time. + * + */ +public class CloseableDelegatingSecurityContextExecutor extends DelegatingSecurityContextExecutor { + + private final ThreadPoolExecutor executor; + + /** + * Creates a new {@link CloseableDelegatingSecurityContextExecutor} that + * uses the current {@link SecurityContext} from the + * {@link SecurityContextHolder} at the time the task is submitted. + * + * @param delegate + * the {@link Executor} to delegate to. Cannot be null. + */ + public CloseableDelegatingSecurityContextExecutor(final ThreadPoolExecutor delegate) { + super(delegate); + executor = delegate; + } + + /** + * Initiates an orderly shutdown in which previously submitted tasks are + * executed, but no new tasks will be accepted. + */ + public void shutdown() { + executor.shutdown(); + } + + /** + * Initiates an immediate shutdown. + * + * @return a list of the tasks that were awaiting execution + */ + public List shutdownNow() { + return executor.shutdownNow(); + } + +} diff --git a/hawkbit-autoconfigure/src/main/java/org/eclipse/hawkbit/autoconfigure/scheduling/ExecutorAutoConfiguration.java b/hawkbit-autoconfigure/src/main/java/org/eclipse/hawkbit/autoconfigure/scheduling/ExecutorAutoConfiguration.java index 43a096e7d..d5bcdd0f1 100644 --- a/hawkbit-autoconfigure/src/main/java/org/eclipse/hawkbit/autoconfigure/scheduling/ExecutorAutoConfiguration.java +++ b/hawkbit-autoconfigure/src/main/java/org/eclipse/hawkbit/autoconfigure/scheduling/ExecutorAutoConfiguration.java @@ -21,7 +21,10 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.security.concurrent.DelegatingSecurityContextExecutor; +import org.springframework.core.task.TaskExecutor; +import org.springframework.scheduling.TaskScheduler; +import org.springframework.scheduling.concurrent.ConcurrentTaskExecutor; +import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -39,9 +42,10 @@ public class ExecutorAutoConfiguration { private AsyncConfigurerThreadpoolProperties asyncConfigurerProperties; /** - * @return ExecutorService for general purpose multi threaded operations + * @return ExecutorService for general purpose multi threaded operations. + * Tries an orderly shutdown when destroyed. */ - @Bean + @Bean(destroyMethod = "shutdown") @ConditionalOnMissingBean public Executor asyncExecutor() { final BlockingQueue blockingQueue = new ArrayBlockingQueue<>( @@ -53,20 +57,38 @@ public class ExecutorAutoConfiguration { threadPoolExecutor.setRejectedExecutionHandler((r, executor) -> LOGGER.warn( "Reject runnable for centralExecutorService, reached limit of queue size {}", executor.getQueue().size())); - return new DelegatingSecurityContextExecutor(threadPoolExecutor); + return new CloseableDelegatingSecurityContextExecutor(threadPoolExecutor); } /** - * @return the executor for UI background processes. + * @return the executor for UI background processes. Run immediate shutdown + * when destroyed. */ - @Bean(name = "uiExecutor") + @Bean(name = "uiExecutor", destroyMethod = "shutdownNow") @ConditionalOnMissingBean(name = "uiExecutor") public Executor uiExecutor() { final BlockingQueue blockingQueue = new ArrayBlockingQueue<>(20); final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 20, 10000, TimeUnit.MILLISECONDS, blockingQueue, new ThreadFactoryBuilder().setNameFormat("ui-executor-pool-%d").build()); threadPoolExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); - return new DelegatingSecurityContextExecutor(threadPoolExecutor); + return new CloseableDelegatingSecurityContextExecutor(threadPoolExecutor); } + /** + * @return {@link TaskExecutor} for task execution + */ + @Bean + @ConditionalOnMissingBean + public TaskExecutor taskExecutor() { + return new ConcurrentTaskExecutor(asyncExecutor()); + } + + /** + * @return {@link TaskScheduler} for scheduled tasks + */ + @Bean + @ConditionalOnMissingBean + public TaskScheduler taskScheduler() { + return new ThreadPoolTaskScheduler(); + } } diff --git a/pom.xml b/pom.xml index 33ad1f81e..c535ba2d8 100644 --- a/pom.xml +++ b/pom.xml @@ -69,6 +69,7 @@ 2.5.5 5.2.4.Final 1.2.0.RELEASE + 1.6.0.RELEASE Fowler-SR1 3.2.2 From 4f23fb1377c61a81d2d6b8d83f98ce3c9f1d1b2c Mon Sep 17 00:00:00 2001 From: Kai Zimmermann Date: Wed, 8 Jun 2016 11:58:59 +0200 Subject: [PATCH 02/10] Created separate bean for poolexecutor. Signed-off-by: Kai Zimmermann --- ...ableDelegatingSecurityContextExecutor.java | 59 ------------------- .../scheduling/ExecutorAutoConfiguration.java | 26 +++++--- 2 files changed, 18 insertions(+), 67 deletions(-) delete mode 100644 hawkbit-autoconfigure/src/main/java/org/eclipse/hawkbit/autoconfigure/scheduling/CloseableDelegatingSecurityContextExecutor.java diff --git a/hawkbit-autoconfigure/src/main/java/org/eclipse/hawkbit/autoconfigure/scheduling/CloseableDelegatingSecurityContextExecutor.java b/hawkbit-autoconfigure/src/main/java/org/eclipse/hawkbit/autoconfigure/scheduling/CloseableDelegatingSecurityContextExecutor.java deleted file mode 100644 index 1160e12e0..000000000 --- a/hawkbit-autoconfigure/src/main/java/org/eclipse/hawkbit/autoconfigure/scheduling/CloseableDelegatingSecurityContextExecutor.java +++ /dev/null @@ -1,59 +0,0 @@ -/** - * Copyright (c) 2015 Bosch Software Innovations GmbH and others. - * - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * which accompanies this distribution, and is available at - * http://www.eclipse.org/legal/epl-v10.html - */ -package org.eclipse.hawkbit.autoconfigure.scheduling; - -import java.util.List; -import java.util.concurrent.Executor; -import java.util.concurrent.ThreadPoolExecutor; - -import org.springframework.context.annotation.Bean; -import org.springframework.security.concurrent.DelegatingSecurityContextExecutor; -import org.springframework.security.core.context.SecurityContext; -import org.springframework.security.core.context.SecurityContextHolder; - -/** - * Extension for {@link DelegatingSecurityContextExecutor} to allow proper - * shutdown at {@link Bean} destruction time. - * - */ -public class CloseableDelegatingSecurityContextExecutor extends DelegatingSecurityContextExecutor { - - private final ThreadPoolExecutor executor; - - /** - * Creates a new {@link CloseableDelegatingSecurityContextExecutor} that - * uses the current {@link SecurityContext} from the - * {@link SecurityContextHolder} at the time the task is submitted. - * - * @param delegate - * the {@link Executor} to delegate to. Cannot be null. - */ - public CloseableDelegatingSecurityContextExecutor(final ThreadPoolExecutor delegate) { - super(delegate); - executor = delegate; - } - - /** - * Initiates an orderly shutdown in which previously submitted tasks are - * executed, but no new tasks will be accepted. - */ - public void shutdown() { - executor.shutdown(); - } - - /** - * Initiates an immediate shutdown. - * - * @return a list of the tasks that were awaiting execution - */ - public List shutdownNow() { - return executor.shutdownNow(); - } - -} diff --git a/hawkbit-autoconfigure/src/main/java/org/eclipse/hawkbit/autoconfigure/scheduling/ExecutorAutoConfiguration.java b/hawkbit-autoconfigure/src/main/java/org/eclipse/hawkbit/autoconfigure/scheduling/ExecutorAutoConfiguration.java index d5bcdd0f1..4fd55cbaa 100644 --- a/hawkbit-autoconfigure/src/main/java/org/eclipse/hawkbit/autoconfigure/scheduling/ExecutorAutoConfiguration.java +++ b/hawkbit-autoconfigure/src/main/java/org/eclipse/hawkbit/autoconfigure/scheduling/ExecutorAutoConfiguration.java @@ -25,6 +25,7 @@ import org.springframework.core.task.TaskExecutor; import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.concurrent.ConcurrentTaskExecutor; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; +import org.springframework.security.concurrent.DelegatingSecurityContextExecutor; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -42,12 +43,21 @@ public class ExecutorAutoConfiguration { private AsyncConfigurerThreadpoolProperties asyncConfigurerProperties; /** - * @return ExecutorService for general purpose multi threaded operations. - * Tries an orderly shutdown when destroyed. + * @return ExecutorService with security context availability in thread + * execution.. */ - @Bean(destroyMethod = "shutdown") + @Bean @ConditionalOnMissingBean public Executor asyncExecutor() { + return new DelegatingSecurityContextExecutor(threadPoolExecutor()); + } + + /** + * @return central ThreadPoolExecutor for general purpose multi threaded + * operations. Tries an orderly shutdown when destroyed. + */ + @Bean(destroyMethod = "shutdown") + public ThreadPoolExecutor threadPoolExecutor() { final BlockingQueue blockingQueue = new ArrayBlockingQueue<>( asyncConfigurerProperties.getQueuesize()); final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(asyncConfigurerProperties.getCorethreads(), @@ -57,21 +67,21 @@ public class ExecutorAutoConfiguration { threadPoolExecutor.setRejectedExecutionHandler((r, executor) -> LOGGER.warn( "Reject runnable for centralExecutorService, reached limit of queue size {}", executor.getQueue().size())); - return new CloseableDelegatingSecurityContextExecutor(threadPoolExecutor); + + return threadPoolExecutor; } /** - * @return the executor for UI background processes. Run immediate shutdown - * when destroyed. + * @return the executor for UI background processes. */ - @Bean(name = "uiExecutor", destroyMethod = "shutdownNow") + @Bean(name = "uiExecutor") @ConditionalOnMissingBean(name = "uiExecutor") public Executor uiExecutor() { final BlockingQueue blockingQueue = new ArrayBlockingQueue<>(20); final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 20, 10000, TimeUnit.MILLISECONDS, blockingQueue, new ThreadFactoryBuilder().setNameFormat("ui-executor-pool-%d").build()); threadPoolExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); - return new CloseableDelegatingSecurityContextExecutor(threadPoolExecutor); + return new DelegatingSecurityContextExecutor(threadPoolExecutor); } /** From 0beef57df77e37346bf7ad5a79aa30330e27f9b8 Mon Sep 17 00:00:00 2001 From: Kai Zimmermann Date: Thu, 9 Jun 2016 11:57:44 +0200 Subject: [PATCH 03/10] Added rabbit connection heartbeat for DMF. Signed-off-by: Kai Zimmermann --- .../scheduling/ExecutorAutoConfiguration.java | 17 ++++-- .../hawkbit/amqp/AmqpConfiguration.java | 54 +++++++++++++++++-- .../eclipse/hawkbit/amqp/AmqpProperties.java | 16 ++++++ 3 files changed, 80 insertions(+), 7 deletions(-) diff --git a/hawkbit-autoconfigure/src/main/java/org/eclipse/hawkbit/autoconfigure/scheduling/ExecutorAutoConfiguration.java b/hawkbit-autoconfigure/src/main/java/org/eclipse/hawkbit/autoconfigure/scheduling/ExecutorAutoConfiguration.java index 4fd55cbaa..4f606f1b1 100644 --- a/hawkbit-autoconfigure/src/main/java/org/eclipse/hawkbit/autoconfigure/scheduling/ExecutorAutoConfiguration.java +++ b/hawkbit-autoconfigure/src/main/java/org/eclipse/hawkbit/autoconfigure/scheduling/ExecutorAutoConfiguration.java @@ -11,6 +11,7 @@ package org.eclipse.hawkbit.autoconfigure.scheduling; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -22,7 +23,6 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.task.TaskExecutor; -import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.concurrent.ConcurrentTaskExecutor; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.security.concurrent.DelegatingSecurityContextExecutor; @@ -94,11 +94,22 @@ public class ExecutorAutoConfiguration { } /** - * @return {@link TaskScheduler} for scheduled tasks + * @return {@link ScheduledExecutorService} based on + * {@link #threadPoolTaskScheduler()}. */ @Bean @ConditionalOnMissingBean - public TaskScheduler taskScheduler() { + public ScheduledExecutorService scheduledExecutorService() { + return threadPoolTaskScheduler().getScheduledExecutor(); + } + + /** + * @return {@link ThreadPoolTaskScheduler} for scheduled operations. + */ + @Bean + @ConditionalOnMissingBean + public ThreadPoolTaskScheduler threadPoolTaskScheduler() { return new ThreadPoolTaskScheduler(); } + } diff --git a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpConfiguration.java b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpConfiguration.java index 20a11713f..adf56e10f 100644 --- a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpConfiguration.java +++ b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpConfiguration.java @@ -8,21 +8,28 @@ */ package org.eclipse.hawkbit.amqp; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadPoolExecutor; + import org.eclipse.hawkbit.dmf.amqp.api.AmqpSettings; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; +import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.autoconfigure.amqp.RabbitProperties; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; /** * The spring AMQP configuration which is enabled by using the profile @@ -39,7 +46,46 @@ public class AmqpConfiguration { protected AmqpDeadletterProperties amqpDeadletterProperties; @Autowired - private ConnectionFactory connectionFactory; + private ConnectionFactory rabbitConnectionFactory; + + @Configuration + protected static class HawkBitRabbitConnectionFactoryCreator { + @Autowired + @Qualifier("threadPoolExecutor") + private ThreadPoolExecutor threadPoolExecutor; + + @Autowired + private ScheduledExecutorService scheduledExecutorService; + + @Autowired + protected AmqpProperties amqpProperties; + + @Bean + public ConnectionFactory rabbitConnectionFactory(final RabbitProperties config) { + final CachingConnectionFactory factory = new CachingConnectionFactory(); + factory.setRequestedHeartBeat(amqpProperties.getRequestedHeartBeat()); + factory.setExecutor(threadPoolExecutor); + factory.getRabbitConnectionFactory().setHeartbeatExecutor(scheduledExecutorService); + + final String addresses = config.getAddresses(); + factory.setAddresses(addresses); + if (config.getHost() != null) { + factory.setHost(config.getHost()); + factory.setPort(config.getPort()); + } + if (config.getUsername() != null) { + factory.setUsername(config.getUsername()); + } + if (config.getPassword() != null) { + factory.setPassword(config.getPassword()); + } + if (config.getVirtualHost() != null) { + factory.setVirtualHost(config.getVirtualHost()); + } + return factory; + } + + } /** * Create a {@link RabbitAdmin} and ignore declaration exceptions. @@ -49,7 +95,7 @@ public class AmqpConfiguration { */ @Bean public RabbitAdmin rabbitAdmin() { - final RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); + final RabbitAdmin rabbitAdmin = new RabbitAdmin(rabbitConnectionFactory); rabbitAdmin.setIgnoreDeclarationExceptions(true); return rabbitAdmin; } @@ -61,7 +107,7 @@ public class AmqpConfiguration { */ @Bean public RabbitTemplate rabbitTemplate() { - final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); + final RabbitTemplate rabbitTemplate = new RabbitTemplate(rabbitConnectionFactory); rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); return rabbitTemplate; } @@ -159,7 +205,7 @@ public class AmqpConfiguration { public SimpleRabbitListenerContainerFactory listenerContainerFactory() { final SimpleRabbitListenerContainerFactory containerFactory = new SimpleRabbitListenerContainerFactory(); containerFactory.setDefaultRequeueRejected(false); - containerFactory.setConnectionFactory(connectionFactory); + containerFactory.setConnectionFactory(rabbitConnectionFactory); containerFactory.setMissingQueuesFatal(amqpProperties.isMissingQueuesFatal()); return containerFactory; } diff --git a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpProperties.java b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpProperties.java index ce6068ce8..5b06c3318 100644 --- a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpProperties.java +++ b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpProperties.java @@ -8,6 +8,8 @@ */ package org.eclipse.hawkbit.amqp; +import java.util.concurrent.TimeUnit; + import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.boot.context.properties.ConfigurationProperties; @@ -38,6 +40,11 @@ public class AmqpProperties { */ private boolean missingQueuesFatal = false; + /** + * Requested heartbeat interval from broker in {@link TimeUnit#SECONDS}. + */ + private int requestedHeartBeat = 60; + /** * Is missingQueuesFatal enabled * @@ -102,4 +109,13 @@ public class AmqpProperties { public void setReceiverQueue(final String receiverQueue) { this.receiverQueue = receiverQueue; } + + public int getRequestedHeartBeat() { + return requestedHeartBeat; + } + + public void setRequestedHeartBeat(final int requestedHeartBeat) { + this.requestedHeartBeat = requestedHeartBeat; + } + } From 00013a7f36034648544143149d2f9a6288c2a5c1 Mon Sep 17 00:00:00 2001 From: Kai Zimmermann Date: Thu, 9 Jun 2016 13:00:47 +0200 Subject: [PATCH 04/10] Added executor config to tests. Signed-off-by: Kai Zimmermann --- .../hawkbit/AmqpTestConfiguration.java | 68 +++++++++++++++++++ .../PropertyBasedArtifactUrlHandlerTest.java | 1 + 2 files changed, 69 insertions(+) diff --git a/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/AmqpTestConfiguration.java b/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/AmqpTestConfiguration.java index c9be9ffa6..075313a19 100644 --- a/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/AmqpTestConfiguration.java +++ b/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/AmqpTestConfiguration.java @@ -8,6 +8,14 @@ */ package org.eclipse.hawkbit; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.eclipse.hawkbit.amqp.AmqpProperties; import org.eclipse.hawkbit.amqp.AmqpSenderService; import org.eclipse.hawkbit.amqp.DefaultAmqpSenderService; import org.eclipse.hawkbit.repository.jpa.model.helper.SystemSecurityContextHolder; @@ -16,13 +24,22 @@ import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.core.task.TaskExecutor; +import org.springframework.scheduling.concurrent.ConcurrentTaskExecutor; +import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; +import org.springframework.security.concurrent.DelegatingSecurityContextExecutor; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; /** * */ @Configuration +@EnableConfigurationProperties({ AmqpProperties.class }) public class AmqpTestConfiguration { /** * @return the {@link SystemSecurityContext} singleton bean which make it @@ -56,4 +73,55 @@ public class AmqpTestConfiguration { public AmqpSenderService amqpSenderServiceBean(final RabbitTemplate rabbitTemplate) { return new DefaultAmqpSenderService(rabbitTemplate); } + + /** + * @return ExecutorService with security context availability in thread + * execution.. + */ + @Bean + @ConditionalOnMissingBean + public Executor asyncExecutor() { + return new DelegatingSecurityContextExecutor(threadPoolExecutor()); + } + + /** + * @return central ThreadPoolExecutor for general purpose multi threaded + * operations. Tries an orderly shutdown when destroyed. + */ + @Bean(destroyMethod = "shutdown") + public ThreadPoolExecutor threadPoolExecutor() { + final BlockingQueue blockingQueue = new ArrayBlockingQueue<>(10); + final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 10, 1000, TimeUnit.MILLISECONDS, + blockingQueue, new ThreadFactoryBuilder().setNameFormat("central-executor-pool-%d").build()); + + return threadPoolExecutor; + } + + /** + * @return {@link TaskExecutor} for task execution + */ + @Bean + @ConditionalOnMissingBean + public TaskExecutor taskExecutor() { + return new ConcurrentTaskExecutor(asyncExecutor()); + } + + /** + * @return {@link ScheduledExecutorService} based on + * {@link #threadPoolTaskScheduler()}. + */ + @Bean + @ConditionalOnMissingBean + public ScheduledExecutorService scheduledExecutorService() { + return threadPoolTaskScheduler().getScheduledExecutor(); + } + + /** + * @return {@link ThreadPoolTaskScheduler} for scheduled operations. + */ + @Bean + @ConditionalOnMissingBean + public ThreadPoolTaskScheduler threadPoolTaskScheduler() { + return new ThreadPoolTaskScheduler(); + } } diff --git a/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/util/PropertyBasedArtifactUrlHandlerTest.java b/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/util/PropertyBasedArtifactUrlHandlerTest.java index c67d6cf47..4d7912211 100644 --- a/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/util/PropertyBasedArtifactUrlHandlerTest.java +++ b/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/util/PropertyBasedArtifactUrlHandlerTest.java @@ -33,6 +33,7 @@ import ru.yandex.qatools.allure.annotations.Stories; @Stories("Test to generate the artifact download URL") @SpringApplicationConfiguration(classes = { AmqpTestConfiguration.class, org.eclipse.hawkbit.RepositoryApplicationConfiguration.class }) + public class PropertyBasedArtifactUrlHandlerTest extends AbstractIntegrationTestWithMongoDB { private static final String HTTPS_LOCALHOST = "https://localhost:8080/"; From 2b2ba523f5ef4a3dee396e5f55381d095aef8adb Mon Sep 17 00:00:00 2001 From: Kai Zimmermann Date: Fri, 10 Jun 2016 11:00:58 +0200 Subject: [PATCH 05/10] Fixed bug where DS could not be delete as it was assigned to a rollout. Signed-off-by: Kai Zimmermann --- .../jpa/DistributionSetRepository.java | 21 ++++++-- .../jpa/JpaDistributionSetManagement.java | 4 +- .../jpa/DistributionSetManagementTest.java | 48 ++++++++++++++----- 3 files changed, 57 insertions(+), 16 deletions(-) diff --git a/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/DistributionSetRepository.java b/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/DistributionSetRepository.java index 1ab6cbe08..83791a66d 100644 --- a/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/DistributionSetRepository.java +++ b/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/DistributionSetRepository.java @@ -16,7 +16,9 @@ import org.eclipse.hawkbit.repository.jpa.model.JpaDistributionSet; import org.eclipse.hawkbit.repository.jpa.model.JpaDistributionSetTag; import org.eclipse.hawkbit.repository.jpa.model.JpaDistributionSetType; import org.eclipse.hawkbit.repository.jpa.model.JpaSoftwareModule; +import org.eclipse.hawkbit.repository.model.Action; import org.eclipse.hawkbit.repository.model.DistributionSet; +import org.eclipse.hawkbit.repository.model.Rollout; import org.eclipse.hawkbit.repository.model.SoftwareModule; import org.eclipse.hawkbit.repository.model.Tag; import org.springframework.data.jpa.repository.JpaSpecificationExecutor; @@ -82,15 +84,26 @@ public interface DistributionSetRepository List findByModules(JpaSoftwareModule module); /** - * Finds {@link DistributionSet}s based on given ID if they are not assigned - * yet to an {@link UpdateAction}, i.e. unused. + * Finds {@link DistributionSet}s based on given ID that are assigned yet to + * an {@link Action}, i.e. in use. * * @param ids * to search for - * @return + * @return list of {@link DistributionSet#getId()} */ @Query("select ac.distributionSet.id from JpaAction ac where ac.distributionSet.id in :ids") - List findAssignedDistributionSetsById(@Param("ids") Long... ids); + List findAssignedToTargetDistributionSetsById(@Param("ids") Long... ids); + + /** + * Finds {@link DistributionSet}s based on given ID that are assigned yet to + * an {@link Rollout}, i.e. in use. + * + * @param ids + * to search for + * @return list of {@link DistributionSet#getId()} + */ + @Query("select ra.distributionSet.id from JpaRollout ra where ra.distributionSet.id in :ids") + List findAssignedToRolloutDistributionSetsById(@Param("ids") Long... ids); /** * Saves all given {@link DistributionSet}s. diff --git a/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/JpaDistributionSetManagement.java b/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/JpaDistributionSetManagement.java index e5a38cf51..f3ecbcf1b 100644 --- a/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/JpaDistributionSetManagement.java +++ b/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/JpaDistributionSetManagement.java @@ -173,7 +173,9 @@ public class JpaDistributionSetManagement implements DistributionSetManagement { public void deleteDistributionSet(final Long... distributionSetIDs) { final List toHardDelete = new ArrayList<>(); - final List assigned = distributionSetRepository.findAssignedDistributionSetsById(distributionSetIDs); + final List assigned = distributionSetRepository + .findAssignedToTargetDistributionSetsById(distributionSetIDs); + assigned.addAll(distributionSetRepository.findAssignedToRolloutDistributionSetsById(distributionSetIDs)); // soft delete assigned if (!assigned.isEmpty()) { diff --git a/hawkbit-repository/hawkbit-repository-jpa/src/test/java/org/eclipse/hawkbit/repository/jpa/DistributionSetManagementTest.java b/hawkbit-repository/hawkbit-repository-jpa/src/test/java/org/eclipse/hawkbit/repository/jpa/DistributionSetManagementTest.java index 67d5202a5..104f2260d 100644 --- a/hawkbit-repository/hawkbit-repository-jpa/src/test/java/org/eclipse/hawkbit/repository/jpa/DistributionSetManagementTest.java +++ b/hawkbit-repository/hawkbit-repository-jpa/src/test/java/org/eclipse/hawkbit/repository/jpa/DistributionSetManagementTest.java @@ -27,6 +27,7 @@ import org.eclipse.hawkbit.repository.jpa.model.JpaDistributionSet; import org.eclipse.hawkbit.repository.jpa.model.JpaDistributionSetMetadata; import org.eclipse.hawkbit.repository.jpa.model.JpaDistributionSetTag; import org.eclipse.hawkbit.repository.jpa.model.JpaDistributionSetType; +import org.eclipse.hawkbit.repository.jpa.model.JpaRollout; import org.eclipse.hawkbit.repository.jpa.model.JpaSoftwareModule; import org.eclipse.hawkbit.repository.jpa.model.JpaTarget; import org.eclipse.hawkbit.repository.model.Action; @@ -34,12 +35,18 @@ import org.eclipse.hawkbit.repository.model.Action.Status; import org.eclipse.hawkbit.repository.model.ActionStatus; import org.eclipse.hawkbit.repository.model.DistributionSet; import org.eclipse.hawkbit.repository.model.DistributionSetFilter.DistributionSetFilterBuilder; -import org.eclipse.hawkbit.repository.test.util.WithUser; import org.eclipse.hawkbit.repository.model.DistributionSetMetadata; import org.eclipse.hawkbit.repository.model.DistributionSetTag; import org.eclipse.hawkbit.repository.model.DistributionSetType; +import org.eclipse.hawkbit.repository.model.Rollout; +import org.eclipse.hawkbit.repository.model.RolloutGroup.RolloutGroupErrorAction; +import org.eclipse.hawkbit.repository.model.RolloutGroup.RolloutGroupErrorCondition; +import org.eclipse.hawkbit.repository.model.RolloutGroup.RolloutGroupSuccessCondition; +import org.eclipse.hawkbit.repository.model.RolloutGroupConditionBuilder; +import org.eclipse.hawkbit.repository.model.RolloutGroupConditions; import org.eclipse.hawkbit.repository.model.SoftwareModule; import org.eclipse.hawkbit.repository.model.Target; +import org.eclipse.hawkbit.repository.test.util.WithUser; import org.fest.assertions.core.Condition; import org.junit.Test; import org.springframework.data.domain.Page; @@ -777,36 +784,55 @@ public class DistributionSetManagementTest extends AbstractJpaIntegrationTest { } @Test - @Description("Deltes a DS that is no in use. Expected behaviour is a soft delete on the database, i.e. only marked as " - + "deleted, kept eas refernce and unavailable for future use..") + @Description("Deletes a DS that is in use by either target assignment or rollout. Expected behaviour is a soft delete on the database, i.e. only marked as " + + "deleted, kept as refernce but unavailable for future use..") public void deleteAssignedDistributionSet() { DistributionSet ds1 = testdataFactory.createDistributionSet("ds-1"); DistributionSet ds2 = testdataFactory.createDistributionSet("ds-2"); - DistributionSet dsAssigned = testdataFactory.createDistributionSet("ds-3"); + DistributionSet dsToTargetAssigned = testdataFactory.createDistributionSet("ds-3"); + final DistributionSet dsToRolloutAssigned = testdataFactory.createDistributionSet("ds-4"); ds1 = distributionSetManagement.findDistributionSetByNameAndVersion(ds1.getName(), ds1.getVersion()); ds2 = distributionSetManagement.findDistributionSetByNameAndVersion(ds2.getName(), ds2.getVersion()); // create assigned DS - dsAssigned = distributionSetManagement.findDistributionSetByNameAndVersion(dsAssigned.getName(), - dsAssigned.getVersion()); + dsToTargetAssigned = distributionSetManagement.findDistributionSetByNameAndVersion(dsToTargetAssigned.getName(), + dsToTargetAssigned.getVersion()); final Target target = new JpaTarget("4712"); final Target savedTarget = targetManagement.createTarget(target); final List toAssign = new ArrayList<>(); toAssign.add(savedTarget); - deploymentManagement.assignDistributionSet(dsAssigned, toAssign); + deploymentManagement.assignDistributionSet(dsToTargetAssigned, toAssign); - // delete a ds - assertThat(distributionSetRepository.findAll()).hasSize(3); - distributionSetManagement.deleteDistributionSet(dsAssigned.getId()); + // create assigned rollout + createRolloutByVariables("test", "test", 5, "name==*", dsToRolloutAssigned, "50", "5"); + + // delete assigned ds + assertThat(distributionSetRepository.findAll()).hasSize(4); + distributionSetManagement.deleteDistributionSet(dsToTargetAssigned.getId(), dsToRolloutAssigned.getId()); // not assigned so not marked as deleted - assertThat(distributionSetRepository.findAll()).hasSize(3); + assertThat(distributionSetRepository.findAll()).hasSize(4); assertThat(distributionSetManagement .findDistributionSetsByDeletedAndOrCompleted(pageReq, Boolean.FALSE, Boolean.TRUE).getTotalElements()) .isEqualTo(2); } + private Rollout createRolloutByVariables(final String rolloutName, final String rolloutDescription, + final int groupSize, final String filterQuery, final DistributionSet distributionSet, + final String successCondition, final String errorCondition) { + final RolloutGroupConditions conditions = new RolloutGroupConditionBuilder() + .successCondition(RolloutGroupSuccessCondition.THRESHOLD, successCondition) + .errorCondition(RolloutGroupErrorCondition.THRESHOLD, errorCondition) + .errorAction(RolloutGroupErrorAction.PAUSE, null).build(); + final Rollout rolloutToCreate = new JpaRollout(); + rolloutToCreate.setName(rolloutName); + rolloutToCreate.setDescription(rolloutDescription); + rolloutToCreate.setTargetFilterQuery(filterQuery); + rolloutToCreate.setDistributionSet(distributionSet); + return rolloutManagement.createRollout(rolloutToCreate, groupSize, conditions); + } + private Target sendUpdateActionStatusToTarget(final Status status, final Action updActA, final Target t, final String... msgs) { updActA.setStatus(status); From 64f3acfa2b2f666cc890231bc671b3cbac7b04ad Mon Sep 17 00:00:00 2001 From: Kai Zimmermann Date: Fri, 10 Jun 2016 15:45:20 +0200 Subject: [PATCH 06/10] Improved documentation of time value. Signed-off-by: Kai Zimmermann --- .../src/main/java/org/eclipse/hawkbit/amqp/AmqpProperties.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpProperties.java b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpProperties.java index 5b06c3318..f9b4cceb6 100644 --- a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpProperties.java +++ b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpProperties.java @@ -43,7 +43,7 @@ public class AmqpProperties { /** * Requested heartbeat interval from broker in {@link TimeUnit#SECONDS}. */ - private int requestedHeartBeat = 60; + private int requestedHeartBeat = (int) TimeUnit.SECONDS.toSeconds(60); /** * Is missingQueuesFatal enabled From 12ce60f72a2835da02c3306d536c64f0533a6347 Mon Sep 17 00:00:00 2001 From: Kai Zimmermann Date: Sun, 12 Jun 2016 08:09:28 +0200 Subject: [PATCH 07/10] Rabbit publisherConfirms Signed-off-by: Kai Zimmermann --- .../simulator/amqp/AmqpConfiguration.java | 22 ++++++-- .../src/main/resources/logback.xml | 1 - .../AsyncConfigurerAutoConfiguration.java | 1 - .../hawkbit/amqp/AmqpConfiguration.java | 55 +++++++++++++++---- .../amqp/DefaultAmqpSenderService.java | 16 +++++- 5 files changed, 76 insertions(+), 19 deletions(-) diff --git a/examples/hawkbit-device-simulator/src/main/java/org/eclipse/hawkbit/simulator/amqp/AmqpConfiguration.java b/examples/hawkbit-device-simulator/src/main/java/org/eclipse/hawkbit/simulator/amqp/AmqpConfiguration.java index bf5d723a8..e954cec96 100644 --- a/examples/hawkbit-device-simulator/src/main/java/org/eclipse/hawkbit/simulator/amqp/AmqpConfiguration.java +++ b/examples/hawkbit-device-simulator/src/main/java/org/eclipse/hawkbit/simulator/amqp/AmqpConfiguration.java @@ -8,6 +8,7 @@ */ package org.eclipse.hawkbit.simulator.amqp; +import java.time.Duration; import java.util.HashMap; import java.util.Map; @@ -15,6 +16,7 @@ import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; +import org.springframework.amqp.core.QueueBuilder; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; @@ -65,9 +67,12 @@ public class AmqpConfiguration { * @return the queue */ @Bean - public Queue receiverConnectorQueueFromSp() { - return new Queue(amqpProperties.getReceiverConnectorQueueFromSp(), true, false, false, - getDeadLetterExchangeArgs()); + public Queue receiverConnectorQueueFromHawkBit() { + final Map arguments = getDeadLetterExchangeArgs(); + arguments.putAll(getTTLMaxArgs()); + + return QueueBuilder.nonDurable(amqpProperties.getReceiverConnectorQueueFromSp()).withArguments(arguments) + .build(); } /** @@ -89,7 +94,7 @@ public class AmqpConfiguration { */ @Bean public Binding bindReceiverQueueToSpExchange() { - return BindingBuilder.bind(receiverConnectorQueueFromSp()).to(exchangeQueueToConnector()); + return BindingBuilder.bind(receiverConnectorQueueFromHawkBit()).to(exchangeQueueToConnector()); } /** @@ -99,7 +104,7 @@ public class AmqpConfiguration { */ @Bean public Queue deadLetterQueue() { - return new Queue(amqpProperties.getDeadLetterQueue(), true, false, true); + return QueueBuilder.nonDurable(amqpProperties.getDeadLetterQueue()).withArguments(getTTLMaxArgs()).build(); } /** @@ -145,4 +150,11 @@ public class AmqpConfiguration { return args; } + private static Map getTTLMaxArgs() { + final Map args = new HashMap<>(); + args.put("x-message-ttl", Duration.ofDays(1).toMillis()); + args.put("x-max-length", 100_000); + return args; + } + } diff --git a/examples/hawkbit-device-simulator/src/main/resources/logback.xml b/examples/hawkbit-device-simulator/src/main/resources/logback.xml index da64e62d1..469c7bde3 100644 --- a/examples/hawkbit-device-simulator/src/main/resources/logback.xml +++ b/examples/hawkbit-device-simulator/src/main/resources/logback.xml @@ -19,7 +19,6 @@ - diff --git a/hawkbit-autoconfigure/src/main/java/org/eclipse/hawkbit/autoconfigure/scheduling/AsyncConfigurerAutoConfiguration.java b/hawkbit-autoconfigure/src/main/java/org/eclipse/hawkbit/autoconfigure/scheduling/AsyncConfigurerAutoConfiguration.java index cdbbcba5b..7e2b780b4 100644 --- a/hawkbit-autoconfigure/src/main/java/org/eclipse/hawkbit/autoconfigure/scheduling/AsyncConfigurerAutoConfiguration.java +++ b/hawkbit-autoconfigure/src/main/java/org/eclipse/hawkbit/autoconfigure/scheduling/AsyncConfigurerAutoConfiguration.java @@ -23,7 +23,6 @@ import org.springframework.scheduling.annotation.EnableAsync; * * */ - @Configuration @EnableAsync @ConditionalOnMissingBean(AsyncConfigurer.class) diff --git a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpConfiguration.java b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpConfiguration.java index adf56e10f..541ae414c 100644 --- a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpConfiguration.java +++ b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpConfiguration.java @@ -12,6 +12,8 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadPoolExecutor; import org.eclipse.hawkbit.dmf.amqp.api.AmqpSettings; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; @@ -30,6 +32,8 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.retry.backoff.ExponentialBackOffPolicy; +import org.springframework.retry.support.RetryTemplate;; /** * The spring AMQP configuration which is enabled by using the profile @@ -39,17 +43,27 @@ import org.springframework.context.annotation.Configuration; @EnableConfigurationProperties({ AmqpProperties.class, AmqpDeadletterProperties.class }) public class AmqpConfiguration { - @Autowired - protected AmqpProperties amqpProperties; + private static final Logger LOGGER = LoggerFactory.getLogger(AmqpConfiguration.class); @Autowired - protected AmqpDeadletterProperties amqpDeadletterProperties; + private AmqpProperties amqpProperties; + + @Autowired + private AmqpDeadletterProperties amqpDeadletterProperties; + + @Autowired + @Qualifier("threadPoolExecutor") + private ThreadPoolExecutor threadPoolExecutor; @Autowired private ConnectionFactory rabbitConnectionFactory; @Configuration - protected static class HawkBitRabbitConnectionFactoryCreator { + protected static class RabbitConnectionFactoryCreator { + + @Autowired + private AmqpProperties amqpProperties; + @Autowired @Qualifier("threadPoolExecutor") private ThreadPoolExecutor threadPoolExecutor; @@ -57,15 +71,21 @@ public class AmqpConfiguration { @Autowired private ScheduledExecutorService scheduledExecutorService; - @Autowired - protected AmqpProperties amqpProperties; - + /** + * {@link ConnectionFactory} with enabled publisher confirms and + * heartbeat. + * + * @param config + * with standard {@link RabbitProperties} + * @return {@link ConnectionFactory} + */ @Bean public ConnectionFactory rabbitConnectionFactory(final RabbitProperties config) { final CachingConnectionFactory factory = new CachingConnectionFactory(); factory.setRequestedHeartBeat(amqpProperties.getRequestedHeartBeat()); factory.setExecutor(threadPoolExecutor); factory.getRabbitConnectionFactory().setHeartbeatExecutor(scheduledExecutorService); + factory.setPublisherConfirms(true); final String addresses = config.getAddresses(); factory.setAddresses(addresses); @@ -84,7 +104,6 @@ public class AmqpConfiguration { } return factory; } - } /** @@ -101,14 +120,28 @@ public class AmqpConfiguration { } /** - * Method to set the Jackson2JsonMessageConverter. - * - * @return the Jackson2JsonMessageConverter + * @return {@link RabbitTemplate} with automatic retry, published confirms + * and {@link Jackson2JsonMessageConverter}. */ @Bean public RabbitTemplate rabbitTemplate() { final RabbitTemplate rabbitTemplate = new RabbitTemplate(rabbitConnectionFactory); rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); + + final RetryTemplate retryTemplate = new RetryTemplate(); + retryTemplate.setBackOffPolicy(new ExponentialBackOffPolicy()); + rabbitTemplate.setRetryTemplate(retryTemplate); + + rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { + if (ack) { + LOGGER.debug("Message with correlation ID {} confirmed by broker.", correlationData.getId()); + } else { + LOGGER.error("Broker is unable to handle message with correlation ID {} : {}", correlationData.getId(), + cause); + } + + }); + return rabbitTemplate; } diff --git a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/DefaultAmqpSenderService.java b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/DefaultAmqpSenderService.java index 244544b64..afb3e7ff2 100644 --- a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/DefaultAmqpSenderService.java +++ b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/DefaultAmqpSenderService.java @@ -9,10 +9,14 @@ package org.eclipse.hawkbit.amqp; import java.net.URI; +import java.util.UUID; import org.eclipse.hawkbit.util.IpUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.amqp.rabbit.support.CorrelationData; /** * A default implementation for the sender service. The service sends all amqp @@ -20,6 +24,7 @@ import org.springframework.amqp.rabbit.core.RabbitTemplate; * extracted from the uri. */ public class DefaultAmqpSenderService implements AmqpSenderService { + private static final Logger LOGGER = LoggerFactory.getLogger(DefaultAmqpSenderService.class); private final RabbitTemplate internalAmqpTemplate; @@ -39,7 +44,16 @@ public class DefaultAmqpSenderService implements AmqpSenderService { return; } - internalAmqpTemplate.send(extractExchange(replyTo), null, message); + final String correlationId = UUID.randomUUID().toString(); + final String exchange = extractExchange(replyTo); + + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Sending message {} to exchange {} with correlationId {}", message, exchange, correlationId); + } else { + LOGGER.debug("Sending message to exchange {} with correlationId {}", exchange, correlationId); + } + + internalAmqpTemplate.send(exchange, null, message, new CorrelationData(correlationId)); } } From 3726aac6db875eaf00d9ca9cd937f915b3325a8a Mon Sep 17 00:00:00 2001 From: Kai Zimmermann Date: Mon, 13 Jun 2016 12:38:08 +0200 Subject: [PATCH 08/10] Cleaned up default mismatch. Signed-off-by: Kai Zimmermann --- .../scheduling/AsyncConfigurerThreadpoolProperties.java | 4 ++-- .../src/main/resources/hawkbitdefaults.properties | 6 ------ 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/hawkbit-autoconfigure/src/main/java/org/eclipse/hawkbit/autoconfigure/scheduling/AsyncConfigurerThreadpoolProperties.java b/hawkbit-autoconfigure/src/main/java/org/eclipse/hawkbit/autoconfigure/scheduling/AsyncConfigurerThreadpoolProperties.java index 35996a114..4425c7278 100644 --- a/hawkbit-autoconfigure/src/main/java/org/eclipse/hawkbit/autoconfigure/scheduling/AsyncConfigurerThreadpoolProperties.java +++ b/hawkbit-autoconfigure/src/main/java/org/eclipse/hawkbit/autoconfigure/scheduling/AsyncConfigurerThreadpoolProperties.java @@ -20,7 +20,7 @@ public class AsyncConfigurerThreadpoolProperties { /** * Max queue size for central event executor. */ - private Integer queuesize = 250; + private Integer queuesize = 5_000; /** * Core processing threads for central event executor. @@ -30,7 +30,7 @@ public class AsyncConfigurerThreadpoolProperties { /** * Maximum thread pool size for central event executor. */ - private Integer maxthreads = 50; + private Integer maxthreads = 20; /** * When the number of threads is greater than the core, this is the maximum diff --git a/hawkbit-autoconfigure/src/main/resources/hawkbitdefaults.properties b/hawkbit-autoconfigure/src/main/resources/hawkbitdefaults.properties index 488777b8d..488559383 100644 --- a/hawkbit-autoconfigure/src/main/resources/hawkbitdefaults.properties +++ b/hawkbit-autoconfigure/src/main/resources/hawkbitdefaults.properties @@ -27,12 +27,6 @@ vaadin.servlet.urlMapping=/UI/* vaadin.servlet.heartbeatInterval=60 vaadin.servlet.closeIdleSessions=false -# Defines the thread pool executor -hawkbit.threadpool.corethreads=5 -hawkbit.threadpool.maxthreads=20 -hawkbit.threadpool.idletimeout=10000 -hawkbit.threadpool.queuesize=20000 - # Defines the polling time for the controllers in HH:MM:SS notation hawkbit.controller.pollingTime=00:05:00 hawkbit.controller.pollingOverdueTime=00:05:00 From 2b245aec823ea9ae0103eda19e92e1bb6376b750 Mon Sep 17 00:00:00 2001 From: Jonathan Knoblauch Date: Mon, 13 Jun 2016 14:50:45 +0200 Subject: [PATCH 09/10] Misspelling Changed refernce to reference --- .../hawkbit/repository/jpa/DistributionSetManagementTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hawkbit-repository/hawkbit-repository-jpa/src/test/java/org/eclipse/hawkbit/repository/jpa/DistributionSetManagementTest.java b/hawkbit-repository/hawkbit-repository-jpa/src/test/java/org/eclipse/hawkbit/repository/jpa/DistributionSetManagementTest.java index 104f2260d..b7da2ff06 100644 --- a/hawkbit-repository/hawkbit-repository-jpa/src/test/java/org/eclipse/hawkbit/repository/jpa/DistributionSetManagementTest.java +++ b/hawkbit-repository/hawkbit-repository-jpa/src/test/java/org/eclipse/hawkbit/repository/jpa/DistributionSetManagementTest.java @@ -785,7 +785,7 @@ public class DistributionSetManagementTest extends AbstractJpaIntegrationTest { @Test @Description("Deletes a DS that is in use by either target assignment or rollout. Expected behaviour is a soft delete on the database, i.e. only marked as " - + "deleted, kept as refernce but unavailable for future use..") + + "deleted, kept as reference but unavailable for future use..") public void deleteAssignedDistributionSet() { DistributionSet ds1 = testdataFactory.createDistributionSet("ds-1"); DistributionSet ds2 = testdataFactory.createDistributionSet("ds-2"); From 8230a476205535df059144f6d6a121c314dd67ed Mon Sep 17 00:00:00 2001 From: Kai Zimmermann Date: Mon, 13 Jun 2016 16:02:04 +0200 Subject: [PATCH 10/10] Fixed typos in repo default setup and logging. Signed-off-by: Kai Zimmermann --- .../main/java/org/eclipse/hawkbit/amqp/BaseAmqpService.java | 2 +- .../main/java/org/eclipse/hawkbit/repository/Constants.java | 2 +- .../repository/jpa/rsql/RSQLSoftwareModuleTypeFieldsTest.java | 3 ++- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/BaseAmqpService.java b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/BaseAmqpService.java index 76870ac93..2c8feda13 100644 --- a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/BaseAmqpService.java +++ b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/BaseAmqpService.java @@ -109,7 +109,7 @@ public class BaseAmqpService { } protected final void logAndThrowMessageError(final Message message, final String error) { - LOGGER.warn("Error \"{}\" reported by message: {}", error, message); + LOGGER.warn("Warning! \"{}\" reported by message: {}", error, message); throw new IllegalArgumentException(error); } diff --git a/hawkbit-repository/hawkbit-repository-api/src/main/java/org/eclipse/hawkbit/repository/Constants.java b/hawkbit-repository/hawkbit-repository-api/src/main/java/org/eclipse/hawkbit/repository/Constants.java index 9a1e442b0..5dcb26638 100644 --- a/hawkbit-repository/hawkbit-repository-api/src/main/java/org/eclipse/hawkbit/repository/Constants.java +++ b/hawkbit-repository/hawkbit-repository-api/src/main/java/org/eclipse/hawkbit/repository/Constants.java @@ -33,7 +33,7 @@ public final class Constants { * generated by repository for every new account for "Firmware/Operating * System" . */ - public static final String SMT_DEFAULT_OS_NAME = "Firmware"; + public static final String SMT_DEFAULT_OS_NAME = "OS"; /** * {@link SoftwareModuleType#getName()} of a {@link SoftwareModuleType} * generated by repository for every new account for "applications/apps". diff --git a/hawkbit-repository/hawkbit-repository-jpa/src/test/java/org/eclipse/hawkbit/repository/jpa/rsql/RSQLSoftwareModuleTypeFieldsTest.java b/hawkbit-repository/hawkbit-repository-jpa/src/test/java/org/eclipse/hawkbit/repository/jpa/rsql/RSQLSoftwareModuleTypeFieldsTest.java index e1538aaee..344d0e320 100644 --- a/hawkbit-repository/hawkbit-repository-jpa/src/test/java/org/eclipse/hawkbit/repository/jpa/rsql/RSQLSoftwareModuleTypeFieldsTest.java +++ b/hawkbit-repository/hawkbit-repository-jpa/src/test/java/org/eclipse/hawkbit/repository/jpa/rsql/RSQLSoftwareModuleTypeFieldsTest.java @@ -10,6 +10,7 @@ package org.eclipse.hawkbit.repository.jpa.rsql; import static org.fest.assertions.api.Assertions.assertThat; +import org.eclipse.hawkbit.repository.Constants; import org.eclipse.hawkbit.repository.SoftwareModuleTypeFields; import org.eclipse.hawkbit.repository.jpa.AbstractJpaIntegrationTest; import org.eclipse.hawkbit.repository.model.SoftwareModuleType; @@ -34,7 +35,7 @@ public class RSQLSoftwareModuleTypeFieldsTest extends AbstractJpaIntegrationTest @Test @Description("Test filter software module test type by name") public void testFilterByParameterName() { - assertRSQLQuery(SoftwareModuleTypeFields.NAME.name() + "==Firmware", 1); + assertRSQLQuery(SoftwareModuleTypeFields.NAME.name() + "==" + Constants.SMT_DEFAULT_OS_NAME, 1); } @Test