Merge pull request #147 from bsinno/Add_TTL_to_Deadeletter_Queue
Add ttl to deadeletter queue
This commit is contained in:
@@ -39,6 +39,12 @@ public class AmqpProperties {
|
||||
*/
|
||||
private String deadLetterExchange = "simulator.deadletter";
|
||||
|
||||
/**
|
||||
* Message time to live (ttl) for the deadletter queue. Default ttl is 1
|
||||
* hour.
|
||||
*/
|
||||
private int deadLetterTtl = 60_000;
|
||||
|
||||
public String getReceiverConnectorQueueFromSp() {
|
||||
return receiverConnectorQueueFromSp;
|
||||
}
|
||||
@@ -70,4 +76,12 @@ public class AmqpProperties {
|
||||
public void setSenderForSpExchange(final String senderForSpExchange) {
|
||||
this.senderForSpExchange = senderForSpExchange;
|
||||
}
|
||||
|
||||
public int getDeadLetterTtl() {
|
||||
return deadLetterTtl;
|
||||
}
|
||||
|
||||
public void setDeadLetterTtl(final int deadLetterTtl) {
|
||||
this.deadLetterTtl = deadLetterTtl;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -21,7 +21,7 @@ spring.rabbitmq.password=guest
|
||||
spring.rabbitmq.virtualHost=/
|
||||
spring.rabbitmq.host=localhost
|
||||
spring.rabbitmq.port=5672
|
||||
hawkbit.dmf.rabbitmq.deadLetterQueue=dmf_connector_deadletter
|
||||
hawkbit.dmf.rabbitmq.deadLetterQueue=dmf_connector_deadletter_ttl
|
||||
hawkbit.dmf.rabbitmq.deadLetterExchange=dmf.connector.deadletter
|
||||
hawkbit.dmf.rabbitmq.receiverQueue=dmf_receiver
|
||||
|
||||
|
||||
@@ -42,6 +42,6 @@ hawkbit.controller.minPollingTime=00:00:30
|
||||
|
||||
|
||||
# Configuration for RabbitMQ integration
|
||||
hawkbit.dmf.rabbitmq.deadLetterQueue=dmf_connector_deadletter
|
||||
hawkbit.dmf.rabbitmq.deadLetterQueue=dmf_connector_deadletter_ttl
|
||||
hawkbit.dmf.rabbitmq.deadLetterExchange=dmf.connector.deadletter
|
||||
hawkbit.dmf.rabbitmq.receiverQueue=dmf_receiver
|
||||
@@ -8,9 +8,6 @@
|
||||
*/
|
||||
package org.eclipse.hawkbit.amqp;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.eclipse.hawkbit.dmf.amqp.api.AmqpSettings;
|
||||
import org.springframework.amqp.core.Binding;
|
||||
import org.springframework.amqp.core.BindingBuilder;
|
||||
@@ -18,6 +15,7 @@ import org.springframework.amqp.core.FanoutExchange;
|
||||
import org.springframework.amqp.core.Queue;
|
||||
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
|
||||
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
|
||||
import org.springframework.amqp.rabbit.core.RabbitAdmin;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
|
||||
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
|
||||
@@ -31,15 +29,31 @@ import org.springframework.context.annotation.Bean;
|
||||
* {@code amqp} to use a AMQP for communication with SP enabled devices.
|
||||
*
|
||||
*/
|
||||
@EnableConfigurationProperties(AmqpProperties.class)
|
||||
@EnableConfigurationProperties({ AmqpProperties.class, AmqpDeadletterProperties.class })
|
||||
public class AmqpConfiguration {
|
||||
|
||||
@Autowired
|
||||
protected AmqpProperties amqpProperties;
|
||||
|
||||
@Autowired
|
||||
protected AmqpDeadletterProperties amqpDeadletterProperties;
|
||||
|
||||
@Autowired
|
||||
private ConnectionFactory connectionFactory;
|
||||
|
||||
/**
|
||||
* Create a {@link RabbitAdmin} and ignore declaration exceptions.
|
||||
* {@link RabbitAdmin#setIgnoreDeclarationExceptions(boolean)}
|
||||
*
|
||||
* @return the bean
|
||||
*/
|
||||
@Bean
|
||||
public RabbitAdmin rabbitAdmin() {
|
||||
final RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
|
||||
rabbitAdmin.setIgnoreDeclarationExceptions(true);
|
||||
return rabbitAdmin;
|
||||
}
|
||||
|
||||
/**
|
||||
* Method to set the Jackson2JsonMessageConverter.
|
||||
*
|
||||
@@ -59,7 +73,8 @@ public class AmqpConfiguration {
|
||||
*/
|
||||
@Bean
|
||||
public Queue receiverQueue() {
|
||||
return new Queue(amqpProperties.getReceiverQueue(), true, false, false, getDeadLetterExchangeArgs());
|
||||
return new Queue(amqpProperties.getReceiverQueue(), true, false, false,
|
||||
amqpDeadletterProperties.getDeadLetterExchangeArgs(amqpProperties.getDeadLetterExchange()));
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -79,7 +94,7 @@ public class AmqpConfiguration {
|
||||
*/
|
||||
@Bean
|
||||
public Queue deadLetterQueue() {
|
||||
return new Queue(amqpProperties.getDeadLetterQueue());
|
||||
return amqpDeadletterProperties.createDeadletterQueue(amqpProperties.getDeadLetterQueue());
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -149,10 +164,4 @@ public class AmqpConfiguration {
|
||||
return containerFactory;
|
||||
}
|
||||
|
||||
private Map<String, Object> getDeadLetterExchangeArgs() {
|
||||
final Map<String, Object> args = new HashMap<>();
|
||||
args.put("x-dead-letter-exchange", amqpProperties.getDeadLetterExchange());
|
||||
return args;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -0,0 +1,69 @@
|
||||
/**
|
||||
* Copyright (c) 2015 Bosch Software Innovations GmbH and others.
|
||||
*
|
||||
* All rights reserved. This program and the accompanying materials
|
||||
* are made available under the terms of the Eclipse Public License v1.0
|
||||
* which accompanies this distribution, and is available at
|
||||
* http://www.eclipse.org/legal/epl-v10.html
|
||||
*/
|
||||
package org.eclipse.hawkbit.amqp;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.springframework.amqp.core.Queue;
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
|
||||
/**
|
||||
* Bean which holds the necessary properties for configuring the AMQP deadletter
|
||||
* queue.
|
||||
*/
|
||||
@ConfigurationProperties("hawkbit.dmf.rabbitmq.deadLetter")
|
||||
public class AmqpDeadletterProperties {
|
||||
|
||||
/**
|
||||
* Message time to live (ttl) for the deadletter queue. Default ttl is 3
|
||||
* weeks.
|
||||
*/
|
||||
private long ttl = Duration.ofDays(21).toMillis();
|
||||
|
||||
/**
|
||||
* Return the deadletter arguments.
|
||||
*
|
||||
* @param exchange
|
||||
* the deadletter exchange
|
||||
* @return map which holds the properties
|
||||
*/
|
||||
public Map<String, Object> getDeadLetterExchangeArgs(final String exchange) {
|
||||
final Map<String, Object> args = new HashMap<>();
|
||||
args.put("x-dead-letter-exchange", exchange);
|
||||
return args;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a deadletter queue with ttl for messages
|
||||
*
|
||||
* @param queueName
|
||||
* the deadlette queue name
|
||||
* @return the deadletter queue
|
||||
*/
|
||||
public Queue createDeadletterQueue(final String queueName) {
|
||||
return new Queue(queueName, true, false, false, getTTLArgs());
|
||||
}
|
||||
|
||||
private Map<String, Object> getTTLArgs() {
|
||||
final Map<String, Object> args = new HashMap<>();
|
||||
args.put("x-message-ttl", getTtl());
|
||||
return args;
|
||||
}
|
||||
|
||||
public long getTtl() {
|
||||
return ttl;
|
||||
}
|
||||
|
||||
public void setTtl(final long ttl) {
|
||||
this.ttl = ttl;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -21,7 +21,7 @@ public class AmqpProperties {
|
||||
/**
|
||||
* DMF API dead letter queue.
|
||||
*/
|
||||
private String deadLetterQueue = "dmf_connector_deadletter";
|
||||
private String deadLetterQueue = "dmf_connector_deadletter_ttl";
|
||||
|
||||
/**
|
||||
* DMF API dead letter exchange.
|
||||
|
||||
Reference in New Issue
Block a user