Merge pull request #62 from bsinno/fix_wrong_amqp_message_converter
👍 merged
This commit is contained in:
@@ -29,7 +29,7 @@ import org.eclipse.hawkbit.util.ArtifactUrlHandler;
|
||||
import org.eclipse.hawkbit.util.IpUtil;
|
||||
import org.springframework.amqp.core.Message;
|
||||
import org.springframework.amqp.core.MessageProperties;
|
||||
import org.springframework.amqp.support.converter.MessageConverter;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
|
||||
import com.google.common.eventbus.Subscribe;
|
||||
@@ -58,8 +58,8 @@ public class AmqpMessageDispatcherService extends BaseAmqpService {
|
||||
* message converter
|
||||
*/
|
||||
@Autowired
|
||||
public AmqpMessageDispatcherService(final MessageConverter messageConverter) {
|
||||
super(messageConverter);
|
||||
public AmqpMessageDispatcherService(final RabbitTemplate rabbitTemplate) {
|
||||
super(rabbitTemplate);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -87,8 +87,9 @@ public class AmqpMessageDispatcherService extends BaseAmqpService {
|
||||
downloadAndUpdateRequest.addSoftwareModule(amqpSoftwareModule);
|
||||
}
|
||||
|
||||
final Message message = messageConverter.toMessage(downloadAndUpdateRequest, createConnectorMessageProperties(
|
||||
targetAssignDistributionSetEvent.getTenant(), controllerId, EventTopic.DOWNLOAD_AND_INSTALL));
|
||||
final Message message = getMessageConverter().toMessage(downloadAndUpdateRequest,
|
||||
createConnectorMessageProperties(targetAssignDistributionSetEvent.getTenant(), controllerId,
|
||||
EventTopic.DOWNLOAD_AND_INSTALL));
|
||||
amqpSenderService.sendMessage(message, targetAdress);
|
||||
}
|
||||
|
||||
@@ -104,7 +105,7 @@ public class AmqpMessageDispatcherService extends BaseAmqpService {
|
||||
final CancelTargetAssignmentEvent cancelTargetAssignmentDistributionSetEvent) {
|
||||
final String controllerId = cancelTargetAssignmentDistributionSetEvent.getControllerId();
|
||||
final Long actionId = cancelTargetAssignmentDistributionSetEvent.getActionId();
|
||||
final Message message = messageConverter.toMessage(actionId, createConnectorMessageProperties(
|
||||
final Message message = getMessageConverter().toMessage(actionId, createConnectorMessageProperties(
|
||||
cancelTargetAssignmentDistributionSetEvent.getTenant(), controllerId, EventTopic.CANCEL_DOWNLOAD));
|
||||
|
||||
amqpSenderService.sendMessage(message, cancelTargetAssignmentDistributionSetEvent.getTargetAdress());
|
||||
|
||||
@@ -96,8 +96,6 @@ public class AmqpMessageHandlerService extends BaseAmqpService {
|
||||
@Autowired
|
||||
private HostnameResolver hostnameResolver;
|
||||
|
||||
private final RabbitTemplate internalAmqpTemplate;
|
||||
|
||||
/**
|
||||
* Constructor.
|
||||
*
|
||||
@@ -105,14 +103,13 @@ public class AmqpMessageHandlerService extends BaseAmqpService {
|
||||
* the configured amqp template.
|
||||
*/
|
||||
public AmqpMessageHandlerService(final RabbitTemplate defaultTemplate) {
|
||||
super(defaultTemplate.getMessageConverter());
|
||||
this.internalAmqpTemplate = defaultTemplate;
|
||||
super(defaultTemplate);
|
||||
}
|
||||
|
||||
@RabbitListener(queues = "${hawkbit.dmf.rabbitmq.receiverQueue}", containerFactory = "listenerContainerFactory")
|
||||
private Message onMessage(final Message message, @Header(MessageHeaderKey.TYPE) final String type,
|
||||
@Header(MessageHeaderKey.TENANT) final String tenant) {
|
||||
return onMessage(message, type, tenant, internalAmqpTemplate.getConnectionFactory().getVirtualHost());
|
||||
return onMessage(message, type, tenant, getRabbitTemplate().getConnectionFactory().getVirtualHost());
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -207,7 +204,7 @@ public class AmqpMessageHandlerService extends BaseAmqpService {
|
||||
authentificationResponse.setMessage(errorMessage);
|
||||
}
|
||||
|
||||
return messageConverter.toMessage(authentificationResponse, messageProperties);
|
||||
return getMessageConverter().toMessage(authentificationResponse, messageProperties);
|
||||
}
|
||||
|
||||
private static Artifact convertDbArtifact(final DbArtifact dbArtifact) {
|
||||
|
||||
@@ -16,6 +16,7 @@ import java.util.Map;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.amqp.core.Message;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
import org.springframework.amqp.support.converter.AbstractJavaTypeMapper;
|
||||
import org.springframework.amqp.support.converter.MessageConverter;
|
||||
|
||||
@@ -25,16 +26,16 @@ import org.springframework.amqp.support.converter.MessageConverter;
|
||||
public class BaseAmqpService {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(BaseAmqpService.class);
|
||||
protected MessageConverter messageConverter;
|
||||
private final RabbitTemplate rabbitTemplate;
|
||||
|
||||
/**
|
||||
* Constructor.
|
||||
*
|
||||
* @param messageConverter
|
||||
* the message messageConverter.
|
||||
* @param rabbitTemplate
|
||||
* the rabbit template.
|
||||
*/
|
||||
public BaseAmqpService(final MessageConverter messageConverter) {
|
||||
this.messageConverter = messageConverter;
|
||||
public BaseAmqpService(final RabbitTemplate rabbitTemplate) {
|
||||
this.rabbitTemplate = rabbitTemplate;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -63,7 +64,7 @@ public class BaseAmqpService {
|
||||
}
|
||||
message.getMessageProperties().getHeaders().put(AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME,
|
||||
clazz.getName());
|
||||
return (T) messageConverter.fromMessage(message);
|
||||
return (T) rabbitTemplate.getMessageConverter().fromMessage(message);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -85,11 +86,11 @@ public class BaseAmqpService {
|
||||
ArrayList.class.getName());
|
||||
message.getMessageProperties().getHeaders().put(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME,
|
||||
clazz.getName());
|
||||
return (List<T>) messageConverter.fromMessage(message);
|
||||
return (List<T>) rabbitTemplate.getMessageConverter().fromMessage(message);
|
||||
}
|
||||
|
||||
public MessageConverter getMessageConverter() {
|
||||
return messageConverter;
|
||||
return rabbitTemplate.getMessageConverter();
|
||||
}
|
||||
|
||||
protected final String getStringHeaderKey(final Message message, final String key,
|
||||
@@ -107,4 +108,7 @@ public class BaseAmqpService {
|
||||
throw new IllegalArgumentException(error);
|
||||
}
|
||||
|
||||
protected RabbitTemplate getRabbitTemplate() {
|
||||
return rabbitTemplate;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -70,7 +70,7 @@ public class AmqpMessageDispatcherServiceTest extends AbstractIntegrationTestWit
|
||||
super.before();
|
||||
this.rabbitTemplate = Mockito.mock(RabbitTemplate.class);
|
||||
when(rabbitTemplate.getMessageConverter()).thenReturn(new Jackson2JsonMessageConverter());
|
||||
amqpMessageDispatcherService = new AmqpMessageDispatcherService(new Jackson2JsonMessageConverter());
|
||||
amqpMessageDispatcherService = new AmqpMessageDispatcherService(rabbitTemplate);
|
||||
amqpMessageDispatcherService = spy(amqpMessageDispatcherService);
|
||||
|
||||
senderService = Mockito.mock(DefaultAmqpSenderService.class);
|
||||
|
||||
Reference in New Issue
Block a user