diff --git a/examples/hawkbit-device-simulator/src/main/java/org/eclipse/hawkbit/simulator/amqp/AmqpConfiguration.java b/examples/hawkbit-device-simulator/src/main/java/org/eclipse/hawkbit/simulator/amqp/AmqpConfiguration.java index bf5d723a8..e954cec96 100644 --- a/examples/hawkbit-device-simulator/src/main/java/org/eclipse/hawkbit/simulator/amqp/AmqpConfiguration.java +++ b/examples/hawkbit-device-simulator/src/main/java/org/eclipse/hawkbit/simulator/amqp/AmqpConfiguration.java @@ -8,6 +8,7 @@ */ package org.eclipse.hawkbit.simulator.amqp; +import java.time.Duration; import java.util.HashMap; import java.util.Map; @@ -15,6 +16,7 @@ import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; +import org.springframework.amqp.core.QueueBuilder; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; @@ -65,9 +67,12 @@ public class AmqpConfiguration { * @return the queue */ @Bean - public Queue receiverConnectorQueueFromSp() { - return new Queue(amqpProperties.getReceiverConnectorQueueFromSp(), true, false, false, - getDeadLetterExchangeArgs()); + public Queue receiverConnectorQueueFromHawkBit() { + final Map arguments = getDeadLetterExchangeArgs(); + arguments.putAll(getTTLMaxArgs()); + + return QueueBuilder.nonDurable(amqpProperties.getReceiverConnectorQueueFromSp()).withArguments(arguments) + .build(); } /** @@ -89,7 +94,7 @@ public class AmqpConfiguration { */ @Bean public Binding bindReceiverQueueToSpExchange() { - return BindingBuilder.bind(receiverConnectorQueueFromSp()).to(exchangeQueueToConnector()); + return BindingBuilder.bind(receiverConnectorQueueFromHawkBit()).to(exchangeQueueToConnector()); } /** @@ -99,7 +104,7 @@ public class AmqpConfiguration { */ @Bean public Queue deadLetterQueue() { - return new Queue(amqpProperties.getDeadLetterQueue(), true, false, true); + return QueueBuilder.nonDurable(amqpProperties.getDeadLetterQueue()).withArguments(getTTLMaxArgs()).build(); } /** @@ -145,4 +150,11 @@ public class AmqpConfiguration { return args; } + private static Map getTTLMaxArgs() { + final Map args = new HashMap<>(); + args.put("x-message-ttl", Duration.ofDays(1).toMillis()); + args.put("x-max-length", 100_000); + return args; + } + } diff --git a/examples/hawkbit-device-simulator/src/main/java/org/eclipse/hawkbit/simulator/amqp/MessageService.java b/examples/hawkbit-device-simulator/src/main/java/org/eclipse/hawkbit/simulator/amqp/MessageService.java index bd48bbe12..8bb1e1292 100644 --- a/examples/hawkbit-device-simulator/src/main/java/org/eclipse/hawkbit/simulator/amqp/MessageService.java +++ b/examples/hawkbit-device-simulator/src/main/java/org/eclipse/hawkbit/simulator/amqp/MessageService.java @@ -82,21 +82,4 @@ public class MessageService { clazz.getTypeName()); return (T) rabbitTemplate.getMessageConverter().fromMessage(message); } - - /** - * Method to verify if lwm2m header is set. - * - * @param message - * the message with the header - * @param header - * the header to verify - */ - public void checkIfLwm2mHeaderEmpty(final Message message, final String header) { - final Object headerObject = message.getMessageProperties().getHeaders().get(header); - if (null == headerObject) { - logAndThrowMessageError(message, "Header of " + header + "empty."); - } - - } - } diff --git a/examples/hawkbit-device-simulator/src/main/resources/logback.xml b/examples/hawkbit-device-simulator/src/main/resources/logback.xml index da64e62d1..469c7bde3 100644 --- a/examples/hawkbit-device-simulator/src/main/resources/logback.xml +++ b/examples/hawkbit-device-simulator/src/main/resources/logback.xml @@ -19,7 +19,6 @@ - diff --git a/hawkbit-autoconfigure/src/main/java/org/eclipse/hawkbit/autoconfigure/scheduling/AsyncConfigurerAutoConfiguration.java b/hawkbit-autoconfigure/src/main/java/org/eclipse/hawkbit/autoconfigure/scheduling/AsyncConfigurerAutoConfiguration.java index cdbbcba5b..7e2b780b4 100644 --- a/hawkbit-autoconfigure/src/main/java/org/eclipse/hawkbit/autoconfigure/scheduling/AsyncConfigurerAutoConfiguration.java +++ b/hawkbit-autoconfigure/src/main/java/org/eclipse/hawkbit/autoconfigure/scheduling/AsyncConfigurerAutoConfiguration.java @@ -23,7 +23,6 @@ import org.springframework.scheduling.annotation.EnableAsync; * * */ - @Configuration @EnableAsync @ConditionalOnMissingBean(AsyncConfigurer.class) diff --git a/hawkbit-autoconfigure/src/main/java/org/eclipse/hawkbit/autoconfigure/scheduling/AsyncConfigurerThreadpoolProperties.java b/hawkbit-autoconfigure/src/main/java/org/eclipse/hawkbit/autoconfigure/scheduling/AsyncConfigurerThreadpoolProperties.java index 35996a114..4425c7278 100644 --- a/hawkbit-autoconfigure/src/main/java/org/eclipse/hawkbit/autoconfigure/scheduling/AsyncConfigurerThreadpoolProperties.java +++ b/hawkbit-autoconfigure/src/main/java/org/eclipse/hawkbit/autoconfigure/scheduling/AsyncConfigurerThreadpoolProperties.java @@ -20,7 +20,7 @@ public class AsyncConfigurerThreadpoolProperties { /** * Max queue size for central event executor. */ - private Integer queuesize = 250; + private Integer queuesize = 5_000; /** * Core processing threads for central event executor. @@ -30,7 +30,7 @@ public class AsyncConfigurerThreadpoolProperties { /** * Maximum thread pool size for central event executor. */ - private Integer maxthreads = 50; + private Integer maxthreads = 20; /** * When the number of threads is greater than the core, this is the maximum diff --git a/hawkbit-autoconfigure/src/main/java/org/eclipse/hawkbit/autoconfigure/scheduling/ExecutorAutoConfiguration.java b/hawkbit-autoconfigure/src/main/java/org/eclipse/hawkbit/autoconfigure/scheduling/ExecutorAutoConfiguration.java index 43a096e7d..4f606f1b1 100644 --- a/hawkbit-autoconfigure/src/main/java/org/eclipse/hawkbit/autoconfigure/scheduling/ExecutorAutoConfiguration.java +++ b/hawkbit-autoconfigure/src/main/java/org/eclipse/hawkbit/autoconfigure/scheduling/ExecutorAutoConfiguration.java @@ -11,6 +11,7 @@ package org.eclipse.hawkbit.autoconfigure.scheduling; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -21,6 +22,9 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.core.task.TaskExecutor; +import org.springframework.scheduling.concurrent.ConcurrentTaskExecutor; +import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.security.concurrent.DelegatingSecurityContextExecutor; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -39,11 +43,21 @@ public class ExecutorAutoConfiguration { private AsyncConfigurerThreadpoolProperties asyncConfigurerProperties; /** - * @return ExecutorService for general purpose multi threaded operations + * @return ExecutorService with security context availability in thread + * execution.. */ @Bean @ConditionalOnMissingBean public Executor asyncExecutor() { + return new DelegatingSecurityContextExecutor(threadPoolExecutor()); + } + + /** + * @return central ThreadPoolExecutor for general purpose multi threaded + * operations. Tries an orderly shutdown when destroyed. + */ + @Bean(destroyMethod = "shutdown") + public ThreadPoolExecutor threadPoolExecutor() { final BlockingQueue blockingQueue = new ArrayBlockingQueue<>( asyncConfigurerProperties.getQueuesize()); final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(asyncConfigurerProperties.getCorethreads(), @@ -53,7 +67,8 @@ public class ExecutorAutoConfiguration { threadPoolExecutor.setRejectedExecutionHandler((r, executor) -> LOGGER.warn( "Reject runnable for centralExecutorService, reached limit of queue size {}", executor.getQueue().size())); - return new DelegatingSecurityContextExecutor(threadPoolExecutor); + + return threadPoolExecutor; } /** @@ -69,4 +84,32 @@ public class ExecutorAutoConfiguration { return new DelegatingSecurityContextExecutor(threadPoolExecutor); } + /** + * @return {@link TaskExecutor} for task execution + */ + @Bean + @ConditionalOnMissingBean + public TaskExecutor taskExecutor() { + return new ConcurrentTaskExecutor(asyncExecutor()); + } + + /** + * @return {@link ScheduledExecutorService} based on + * {@link #threadPoolTaskScheduler()}. + */ + @Bean + @ConditionalOnMissingBean + public ScheduledExecutorService scheduledExecutorService() { + return threadPoolTaskScheduler().getScheduledExecutor(); + } + + /** + * @return {@link ThreadPoolTaskScheduler} for scheduled operations. + */ + @Bean + @ConditionalOnMissingBean + public ThreadPoolTaskScheduler threadPoolTaskScheduler() { + return new ThreadPoolTaskScheduler(); + } + } diff --git a/hawkbit-autoconfigure/src/main/resources/hawkbitdefaults.properties b/hawkbit-autoconfigure/src/main/resources/hawkbitdefaults.properties index 488777b8d..488559383 100644 --- a/hawkbit-autoconfigure/src/main/resources/hawkbitdefaults.properties +++ b/hawkbit-autoconfigure/src/main/resources/hawkbitdefaults.properties @@ -27,12 +27,6 @@ vaadin.servlet.urlMapping=/UI/* vaadin.servlet.heartbeatInterval=60 vaadin.servlet.closeIdleSessions=false -# Defines the thread pool executor -hawkbit.threadpool.corethreads=5 -hawkbit.threadpool.maxthreads=20 -hawkbit.threadpool.idletimeout=10000 -hawkbit.threadpool.queuesize=20000 - # Defines the polling time for the controllers in HH:MM:SS notation hawkbit.controller.pollingTime=00:05:00 hawkbit.controller.pollingOverdueTime=00:05:00 diff --git a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpConfiguration.java b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpConfiguration.java index 20a11713f..541ae414c 100644 --- a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpConfiguration.java +++ b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpConfiguration.java @@ -8,21 +8,32 @@ */ package org.eclipse.hawkbit.amqp; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadPoolExecutor; + import org.eclipse.hawkbit.dmf.amqp.api.AmqpSettings; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; +import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.autoconfigure.amqp.RabbitProperties; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.retry.backoff.ExponentialBackOffPolicy; +import org.springframework.retry.support.RetryTemplate;; /** * The spring AMQP configuration which is enabled by using the profile @@ -32,14 +43,68 @@ import org.springframework.context.annotation.Bean; @EnableConfigurationProperties({ AmqpProperties.class, AmqpDeadletterProperties.class }) public class AmqpConfiguration { - @Autowired - protected AmqpProperties amqpProperties; + private static final Logger LOGGER = LoggerFactory.getLogger(AmqpConfiguration.class); @Autowired - protected AmqpDeadletterProperties amqpDeadletterProperties; + private AmqpProperties amqpProperties; @Autowired - private ConnectionFactory connectionFactory; + private AmqpDeadletterProperties amqpDeadletterProperties; + + @Autowired + @Qualifier("threadPoolExecutor") + private ThreadPoolExecutor threadPoolExecutor; + + @Autowired + private ConnectionFactory rabbitConnectionFactory; + + @Configuration + protected static class RabbitConnectionFactoryCreator { + + @Autowired + private AmqpProperties amqpProperties; + + @Autowired + @Qualifier("threadPoolExecutor") + private ThreadPoolExecutor threadPoolExecutor; + + @Autowired + private ScheduledExecutorService scheduledExecutorService; + + /** + * {@link ConnectionFactory} with enabled publisher confirms and + * heartbeat. + * + * @param config + * with standard {@link RabbitProperties} + * @return {@link ConnectionFactory} + */ + @Bean + public ConnectionFactory rabbitConnectionFactory(final RabbitProperties config) { + final CachingConnectionFactory factory = new CachingConnectionFactory(); + factory.setRequestedHeartBeat(amqpProperties.getRequestedHeartBeat()); + factory.setExecutor(threadPoolExecutor); + factory.getRabbitConnectionFactory().setHeartbeatExecutor(scheduledExecutorService); + factory.setPublisherConfirms(true); + + final String addresses = config.getAddresses(); + factory.setAddresses(addresses); + if (config.getHost() != null) { + factory.setHost(config.getHost()); + factory.setPort(config.getPort()); + } + if (config.getUsername() != null) { + factory.setUsername(config.getUsername()); + } + if (config.getPassword() != null) { + factory.setPassword(config.getPassword()); + } + if (config.getVirtualHost() != null) { + factory.setVirtualHost(config.getVirtualHost()); + } + return factory; + } + } /** * Create a {@link RabbitAdmin} and ignore declaration exceptions. @@ -49,20 +114,34 @@ public class AmqpConfiguration { */ @Bean public RabbitAdmin rabbitAdmin() { - final RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); + final RabbitAdmin rabbitAdmin = new RabbitAdmin(rabbitConnectionFactory); rabbitAdmin.setIgnoreDeclarationExceptions(true); return rabbitAdmin; } /** - * Method to set the Jackson2JsonMessageConverter. - * - * @return the Jackson2JsonMessageConverter + * @return {@link RabbitTemplate} with automatic retry, published confirms + * and {@link Jackson2JsonMessageConverter}. */ @Bean public RabbitTemplate rabbitTemplate() { - final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); + final RabbitTemplate rabbitTemplate = new RabbitTemplate(rabbitConnectionFactory); rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); + + final RetryTemplate retryTemplate = new RetryTemplate(); + retryTemplate.setBackOffPolicy(new ExponentialBackOffPolicy()); + rabbitTemplate.setRetryTemplate(retryTemplate); + + rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { + if (ack) { + LOGGER.debug("Message with correlation ID {} confirmed by broker.", correlationData.getId()); + } else { + LOGGER.error("Broker is unable to handle message with correlation ID {} : {}", correlationData.getId(), + cause); + } + + }); + return rabbitTemplate; } @@ -159,7 +238,7 @@ public class AmqpConfiguration { public SimpleRabbitListenerContainerFactory listenerContainerFactory() { final SimpleRabbitListenerContainerFactory containerFactory = new SimpleRabbitListenerContainerFactory(); containerFactory.setDefaultRequeueRejected(false); - containerFactory.setConnectionFactory(connectionFactory); + containerFactory.setConnectionFactory(rabbitConnectionFactory); containerFactory.setMissingQueuesFatal(amqpProperties.isMissingQueuesFatal()); return containerFactory; } diff --git a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpProperties.java b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpProperties.java index ce6068ce8..f9b4cceb6 100644 --- a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpProperties.java +++ b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpProperties.java @@ -8,6 +8,8 @@ */ package org.eclipse.hawkbit.amqp; +import java.util.concurrent.TimeUnit; + import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.boot.context.properties.ConfigurationProperties; @@ -38,6 +40,11 @@ public class AmqpProperties { */ private boolean missingQueuesFatal = false; + /** + * Requested heartbeat interval from broker in {@link TimeUnit#SECONDS}. + */ + private int requestedHeartBeat = (int) TimeUnit.SECONDS.toSeconds(60); + /** * Is missingQueuesFatal enabled * @@ -102,4 +109,13 @@ public class AmqpProperties { public void setReceiverQueue(final String receiverQueue) { this.receiverQueue = receiverQueue; } + + public int getRequestedHeartBeat() { + return requestedHeartBeat; + } + + public void setRequestedHeartBeat(final int requestedHeartBeat) { + this.requestedHeartBeat = requestedHeartBeat; + } + } diff --git a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/BaseAmqpService.java b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/BaseAmqpService.java index 76870ac93..2c8feda13 100644 --- a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/BaseAmqpService.java +++ b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/BaseAmqpService.java @@ -109,7 +109,7 @@ public class BaseAmqpService { } protected final void logAndThrowMessageError(final Message message, final String error) { - LOGGER.warn("Error \"{}\" reported by message: {}", error, message); + LOGGER.warn("Warning! \"{}\" reported by message: {}", error, message); throw new IllegalArgumentException(error); } diff --git a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/DefaultAmqpSenderService.java b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/DefaultAmqpSenderService.java index 244544b64..afb3e7ff2 100644 --- a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/DefaultAmqpSenderService.java +++ b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/DefaultAmqpSenderService.java @@ -9,10 +9,14 @@ package org.eclipse.hawkbit.amqp; import java.net.URI; +import java.util.UUID; import org.eclipse.hawkbit.util.IpUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.amqp.rabbit.support.CorrelationData; /** * A default implementation for the sender service. The service sends all amqp @@ -20,6 +24,7 @@ import org.springframework.amqp.rabbit.core.RabbitTemplate; * extracted from the uri. */ public class DefaultAmqpSenderService implements AmqpSenderService { + private static final Logger LOGGER = LoggerFactory.getLogger(DefaultAmqpSenderService.class); private final RabbitTemplate internalAmqpTemplate; @@ -39,7 +44,16 @@ public class DefaultAmqpSenderService implements AmqpSenderService { return; } - internalAmqpTemplate.send(extractExchange(replyTo), null, message); + final String correlationId = UUID.randomUUID().toString(); + final String exchange = extractExchange(replyTo); + + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Sending message {} to exchange {} with correlationId {}", message, exchange, correlationId); + } else { + LOGGER.debug("Sending message to exchange {} with correlationId {}", exchange, correlationId); + } + + internalAmqpTemplate.send(exchange, null, message, new CorrelationData(correlationId)); } } diff --git a/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/AmqpTestConfiguration.java b/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/AmqpTestConfiguration.java index c9be9ffa6..075313a19 100644 --- a/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/AmqpTestConfiguration.java +++ b/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/AmqpTestConfiguration.java @@ -8,6 +8,14 @@ */ package org.eclipse.hawkbit; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.eclipse.hawkbit.amqp.AmqpProperties; import org.eclipse.hawkbit.amqp.AmqpSenderService; import org.eclipse.hawkbit.amqp.DefaultAmqpSenderService; import org.eclipse.hawkbit.repository.jpa.model.helper.SystemSecurityContextHolder; @@ -16,13 +24,22 @@ import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.core.task.TaskExecutor; +import org.springframework.scheduling.concurrent.ConcurrentTaskExecutor; +import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; +import org.springframework.security.concurrent.DelegatingSecurityContextExecutor; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; /** * */ @Configuration +@EnableConfigurationProperties({ AmqpProperties.class }) public class AmqpTestConfiguration { /** * @return the {@link SystemSecurityContext} singleton bean which make it @@ -56,4 +73,55 @@ public class AmqpTestConfiguration { public AmqpSenderService amqpSenderServiceBean(final RabbitTemplate rabbitTemplate) { return new DefaultAmqpSenderService(rabbitTemplate); } + + /** + * @return ExecutorService with security context availability in thread + * execution.. + */ + @Bean + @ConditionalOnMissingBean + public Executor asyncExecutor() { + return new DelegatingSecurityContextExecutor(threadPoolExecutor()); + } + + /** + * @return central ThreadPoolExecutor for general purpose multi threaded + * operations. Tries an orderly shutdown when destroyed. + */ + @Bean(destroyMethod = "shutdown") + public ThreadPoolExecutor threadPoolExecutor() { + final BlockingQueue blockingQueue = new ArrayBlockingQueue<>(10); + final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 10, 1000, TimeUnit.MILLISECONDS, + blockingQueue, new ThreadFactoryBuilder().setNameFormat("central-executor-pool-%d").build()); + + return threadPoolExecutor; + } + + /** + * @return {@link TaskExecutor} for task execution + */ + @Bean + @ConditionalOnMissingBean + public TaskExecutor taskExecutor() { + return new ConcurrentTaskExecutor(asyncExecutor()); + } + + /** + * @return {@link ScheduledExecutorService} based on + * {@link #threadPoolTaskScheduler()}. + */ + @Bean + @ConditionalOnMissingBean + public ScheduledExecutorService scheduledExecutorService() { + return threadPoolTaskScheduler().getScheduledExecutor(); + } + + /** + * @return {@link ThreadPoolTaskScheduler} for scheduled operations. + */ + @Bean + @ConditionalOnMissingBean + public ThreadPoolTaskScheduler threadPoolTaskScheduler() { + return new ThreadPoolTaskScheduler(); + } } diff --git a/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/util/PropertyBasedArtifactUrlHandlerTest.java b/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/util/PropertyBasedArtifactUrlHandlerTest.java index 8cd92d2bd..e93e1340f 100644 --- a/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/util/PropertyBasedArtifactUrlHandlerTest.java +++ b/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/util/PropertyBasedArtifactUrlHandlerTest.java @@ -33,6 +33,7 @@ import ru.yandex.qatools.allure.annotations.Stories; @Stories("Test to generate the artifact download URL") @SpringApplicationConfiguration(classes = { AmqpTestConfiguration.class, org.eclipse.hawkbit.RepositoryApplicationConfiguration.class }) + public class PropertyBasedArtifactUrlHandlerTest extends AbstractIntegrationTestWithMongoDB { private static final String HTTPS_LOCALHOST = "https://localhost:8080/"; diff --git a/hawkbit-repository/hawkbit-repository-api/src/main/java/org/eclipse/hawkbit/repository/Constants.java b/hawkbit-repository/hawkbit-repository-api/src/main/java/org/eclipse/hawkbit/repository/Constants.java index 9a1e442b0..5dcb26638 100644 --- a/hawkbit-repository/hawkbit-repository-api/src/main/java/org/eclipse/hawkbit/repository/Constants.java +++ b/hawkbit-repository/hawkbit-repository-api/src/main/java/org/eclipse/hawkbit/repository/Constants.java @@ -33,7 +33,7 @@ public final class Constants { * generated by repository for every new account for "Firmware/Operating * System" . */ - public static final String SMT_DEFAULT_OS_NAME = "Firmware"; + public static final String SMT_DEFAULT_OS_NAME = "OS"; /** * {@link SoftwareModuleType#getName()} of a {@link SoftwareModuleType} * generated by repository for every new account for "applications/apps". diff --git a/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/DistributionSetRepository.java b/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/DistributionSetRepository.java index 1ab6cbe08..83791a66d 100644 --- a/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/DistributionSetRepository.java +++ b/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/DistributionSetRepository.java @@ -16,7 +16,9 @@ import org.eclipse.hawkbit.repository.jpa.model.JpaDistributionSet; import org.eclipse.hawkbit.repository.jpa.model.JpaDistributionSetTag; import org.eclipse.hawkbit.repository.jpa.model.JpaDistributionSetType; import org.eclipse.hawkbit.repository.jpa.model.JpaSoftwareModule; +import org.eclipse.hawkbit.repository.model.Action; import org.eclipse.hawkbit.repository.model.DistributionSet; +import org.eclipse.hawkbit.repository.model.Rollout; import org.eclipse.hawkbit.repository.model.SoftwareModule; import org.eclipse.hawkbit.repository.model.Tag; import org.springframework.data.jpa.repository.JpaSpecificationExecutor; @@ -82,15 +84,26 @@ public interface DistributionSetRepository List findByModules(JpaSoftwareModule module); /** - * Finds {@link DistributionSet}s based on given ID if they are not assigned - * yet to an {@link UpdateAction}, i.e. unused. + * Finds {@link DistributionSet}s based on given ID that are assigned yet to + * an {@link Action}, i.e. in use. * * @param ids * to search for - * @return + * @return list of {@link DistributionSet#getId()} */ @Query("select ac.distributionSet.id from JpaAction ac where ac.distributionSet.id in :ids") - List findAssignedDistributionSetsById(@Param("ids") Long... ids); + List findAssignedToTargetDistributionSetsById(@Param("ids") Long... ids); + + /** + * Finds {@link DistributionSet}s based on given ID that are assigned yet to + * an {@link Rollout}, i.e. in use. + * + * @param ids + * to search for + * @return list of {@link DistributionSet#getId()} + */ + @Query("select ra.distributionSet.id from JpaRollout ra where ra.distributionSet.id in :ids") + List findAssignedToRolloutDistributionSetsById(@Param("ids") Long... ids); /** * Saves all given {@link DistributionSet}s. diff --git a/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/JpaDistributionSetManagement.java b/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/JpaDistributionSetManagement.java index e5a38cf51..f3ecbcf1b 100644 --- a/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/JpaDistributionSetManagement.java +++ b/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/JpaDistributionSetManagement.java @@ -173,7 +173,9 @@ public class JpaDistributionSetManagement implements DistributionSetManagement { public void deleteDistributionSet(final Long... distributionSetIDs) { final List toHardDelete = new ArrayList<>(); - final List assigned = distributionSetRepository.findAssignedDistributionSetsById(distributionSetIDs); + final List assigned = distributionSetRepository + .findAssignedToTargetDistributionSetsById(distributionSetIDs); + assigned.addAll(distributionSetRepository.findAssignedToRolloutDistributionSetsById(distributionSetIDs)); // soft delete assigned if (!assigned.isEmpty()) { diff --git a/hawkbit-repository/hawkbit-repository-jpa/src/test/java/org/eclipse/hawkbit/repository/jpa/DistributionSetManagementTest.java b/hawkbit-repository/hawkbit-repository-jpa/src/test/java/org/eclipse/hawkbit/repository/jpa/DistributionSetManagementTest.java index 67d5202a5..b7da2ff06 100644 --- a/hawkbit-repository/hawkbit-repository-jpa/src/test/java/org/eclipse/hawkbit/repository/jpa/DistributionSetManagementTest.java +++ b/hawkbit-repository/hawkbit-repository-jpa/src/test/java/org/eclipse/hawkbit/repository/jpa/DistributionSetManagementTest.java @@ -27,6 +27,7 @@ import org.eclipse.hawkbit.repository.jpa.model.JpaDistributionSet; import org.eclipse.hawkbit.repository.jpa.model.JpaDistributionSetMetadata; import org.eclipse.hawkbit.repository.jpa.model.JpaDistributionSetTag; import org.eclipse.hawkbit.repository.jpa.model.JpaDistributionSetType; +import org.eclipse.hawkbit.repository.jpa.model.JpaRollout; import org.eclipse.hawkbit.repository.jpa.model.JpaSoftwareModule; import org.eclipse.hawkbit.repository.jpa.model.JpaTarget; import org.eclipse.hawkbit.repository.model.Action; @@ -34,12 +35,18 @@ import org.eclipse.hawkbit.repository.model.Action.Status; import org.eclipse.hawkbit.repository.model.ActionStatus; import org.eclipse.hawkbit.repository.model.DistributionSet; import org.eclipse.hawkbit.repository.model.DistributionSetFilter.DistributionSetFilterBuilder; -import org.eclipse.hawkbit.repository.test.util.WithUser; import org.eclipse.hawkbit.repository.model.DistributionSetMetadata; import org.eclipse.hawkbit.repository.model.DistributionSetTag; import org.eclipse.hawkbit.repository.model.DistributionSetType; +import org.eclipse.hawkbit.repository.model.Rollout; +import org.eclipse.hawkbit.repository.model.RolloutGroup.RolloutGroupErrorAction; +import org.eclipse.hawkbit.repository.model.RolloutGroup.RolloutGroupErrorCondition; +import org.eclipse.hawkbit.repository.model.RolloutGroup.RolloutGroupSuccessCondition; +import org.eclipse.hawkbit.repository.model.RolloutGroupConditionBuilder; +import org.eclipse.hawkbit.repository.model.RolloutGroupConditions; import org.eclipse.hawkbit.repository.model.SoftwareModule; import org.eclipse.hawkbit.repository.model.Target; +import org.eclipse.hawkbit.repository.test.util.WithUser; import org.fest.assertions.core.Condition; import org.junit.Test; import org.springframework.data.domain.Page; @@ -777,36 +784,55 @@ public class DistributionSetManagementTest extends AbstractJpaIntegrationTest { } @Test - @Description("Deltes a DS that is no in use. Expected behaviour is a soft delete on the database, i.e. only marked as " - + "deleted, kept eas refernce and unavailable for future use..") + @Description("Deletes a DS that is in use by either target assignment or rollout. Expected behaviour is a soft delete on the database, i.e. only marked as " + + "deleted, kept as reference but unavailable for future use..") public void deleteAssignedDistributionSet() { DistributionSet ds1 = testdataFactory.createDistributionSet("ds-1"); DistributionSet ds2 = testdataFactory.createDistributionSet("ds-2"); - DistributionSet dsAssigned = testdataFactory.createDistributionSet("ds-3"); + DistributionSet dsToTargetAssigned = testdataFactory.createDistributionSet("ds-3"); + final DistributionSet dsToRolloutAssigned = testdataFactory.createDistributionSet("ds-4"); ds1 = distributionSetManagement.findDistributionSetByNameAndVersion(ds1.getName(), ds1.getVersion()); ds2 = distributionSetManagement.findDistributionSetByNameAndVersion(ds2.getName(), ds2.getVersion()); // create assigned DS - dsAssigned = distributionSetManagement.findDistributionSetByNameAndVersion(dsAssigned.getName(), - dsAssigned.getVersion()); + dsToTargetAssigned = distributionSetManagement.findDistributionSetByNameAndVersion(dsToTargetAssigned.getName(), + dsToTargetAssigned.getVersion()); final Target target = new JpaTarget("4712"); final Target savedTarget = targetManagement.createTarget(target); final List toAssign = new ArrayList<>(); toAssign.add(savedTarget); - deploymentManagement.assignDistributionSet(dsAssigned, toAssign); + deploymentManagement.assignDistributionSet(dsToTargetAssigned, toAssign); - // delete a ds - assertThat(distributionSetRepository.findAll()).hasSize(3); - distributionSetManagement.deleteDistributionSet(dsAssigned.getId()); + // create assigned rollout + createRolloutByVariables("test", "test", 5, "name==*", dsToRolloutAssigned, "50", "5"); + + // delete assigned ds + assertThat(distributionSetRepository.findAll()).hasSize(4); + distributionSetManagement.deleteDistributionSet(dsToTargetAssigned.getId(), dsToRolloutAssigned.getId()); // not assigned so not marked as deleted - assertThat(distributionSetRepository.findAll()).hasSize(3); + assertThat(distributionSetRepository.findAll()).hasSize(4); assertThat(distributionSetManagement .findDistributionSetsByDeletedAndOrCompleted(pageReq, Boolean.FALSE, Boolean.TRUE).getTotalElements()) .isEqualTo(2); } + private Rollout createRolloutByVariables(final String rolloutName, final String rolloutDescription, + final int groupSize, final String filterQuery, final DistributionSet distributionSet, + final String successCondition, final String errorCondition) { + final RolloutGroupConditions conditions = new RolloutGroupConditionBuilder() + .successCondition(RolloutGroupSuccessCondition.THRESHOLD, successCondition) + .errorCondition(RolloutGroupErrorCondition.THRESHOLD, errorCondition) + .errorAction(RolloutGroupErrorAction.PAUSE, null).build(); + final Rollout rolloutToCreate = new JpaRollout(); + rolloutToCreate.setName(rolloutName); + rolloutToCreate.setDescription(rolloutDescription); + rolloutToCreate.setTargetFilterQuery(filterQuery); + rolloutToCreate.setDistributionSet(distributionSet); + return rolloutManagement.createRollout(rolloutToCreate, groupSize, conditions); + } + private Target sendUpdateActionStatusToTarget(final Status status, final Action updActA, final Target t, final String... msgs) { updActA.setStatus(status); diff --git a/hawkbit-repository/hawkbit-repository-jpa/src/test/java/org/eclipse/hawkbit/repository/jpa/rsql/RSQLSoftwareModuleTypeFieldsTest.java b/hawkbit-repository/hawkbit-repository-jpa/src/test/java/org/eclipse/hawkbit/repository/jpa/rsql/RSQLSoftwareModuleTypeFieldsTest.java index e1538aaee..344d0e320 100644 --- a/hawkbit-repository/hawkbit-repository-jpa/src/test/java/org/eclipse/hawkbit/repository/jpa/rsql/RSQLSoftwareModuleTypeFieldsTest.java +++ b/hawkbit-repository/hawkbit-repository-jpa/src/test/java/org/eclipse/hawkbit/repository/jpa/rsql/RSQLSoftwareModuleTypeFieldsTest.java @@ -10,6 +10,7 @@ package org.eclipse.hawkbit.repository.jpa.rsql; import static org.fest.assertions.api.Assertions.assertThat; +import org.eclipse.hawkbit.repository.Constants; import org.eclipse.hawkbit.repository.SoftwareModuleTypeFields; import org.eclipse.hawkbit.repository.jpa.AbstractJpaIntegrationTest; import org.eclipse.hawkbit.repository.model.SoftwareModuleType; @@ -34,7 +35,7 @@ public class RSQLSoftwareModuleTypeFieldsTest extends AbstractJpaIntegrationTest @Test @Description("Test filter software module test type by name") public void testFilterByParameterName() { - assertRSQLQuery(SoftwareModuleTypeFields.NAME.name() + "==Firmware", 1); + assertRSQLQuery(SoftwareModuleTypeFields.NAME.name() + "==" + Constants.SMT_DEFAULT_OS_NAME, 1); } @Test diff --git a/hawkbit-ui/src/main/java/org/eclipse/hawkbit/ui/distributions/event/DistributionSetTypeEvent.java b/hawkbit-ui/src/main/java/org/eclipse/hawkbit/ui/distributions/event/DistributionSetTypeEvent.java index c6a625b10..7ba296f03 100644 --- a/hawkbit-ui/src/main/java/org/eclipse/hawkbit/ui/distributions/event/DistributionSetTypeEvent.java +++ b/hawkbit-ui/src/main/java/org/eclipse/hawkbit/ui/distributions/event/DistributionSetTypeEvent.java @@ -26,11 +26,16 @@ public class DistributionSetTypeEvent { private final DistributionSetTypeEnum distributionSetTypeEnum; + private String distributionSetTypeName; + /** * @param distributionSetTypeEnum + * @param distributionSetTypeName */ - public DistributionSetTypeEvent(final DistributionSetTypeEnum distributionSetTypeEnum) { + public DistributionSetTypeEvent(final DistributionSetTypeEnum distributionSetTypeEnum, + final String distributionSetTypeName) { this.distributionSetTypeEnum = distributionSetTypeEnum; + this.distributionSetTypeName = distributionSetTypeName; } /** @@ -43,6 +48,14 @@ public class DistributionSetTypeEvent { this.distributionSetType = distributionSetType; } + public String getDistributionSetTypeName() { + return distributionSetTypeName; + } + + public void setDistributionSetTypeName(final String distributionSetTypeName) { + this.distributionSetTypeName = distributionSetTypeName; + } + public DistributionSetType getDistributionSetType() { return distributionSetType; } diff --git a/pom.xml b/pom.xml index af5151124..27990b715 100644 --- a/pom.xml +++ b/pom.xml @@ -69,6 +69,7 @@ 2.5.5 5.2.4.Final 1.2.0.RELEASE + 1.6.0.RELEASE 0.18.0.RELEASE Fowler-SR1