Remote Events migrated from Spring Bus to Spring Cloud Stream (#2563)

* Remote Events migrated from Spring Bus to Spring Cloud Stream

---------

Co-authored-by: vasilchev <vasil.ilchev@bosch.com>
This commit is contained in:
Vasil Ilchev
2025-07-30 16:58:00 +03:00
committed by GitHub
parent 10da0288d9
commit 4a8e60764f
49 changed files with 1147 additions and 461 deletions

View File

@@ -0,0 +1,24 @@
/**
* Copyright (c) 2025 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
* which is available at https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.hawkbit.event;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.util.MimeType;
public class EventJacksonMessageConverter extends MappingJackson2MessageConverter {
public EventJacksonMessageConverter() {
super(new MimeType("application", "remote-event-json"));
ObjectMapper objectMapper = new ObjectMapper();
EventType.getNamedTypes().forEach(objectMapper::registerSubtypes);
setObjectMapper(objectMapper);
}
}

View File

@@ -14,7 +14,7 @@ import io.protostuff.ProtobufIOUtil;
import io.protostuff.Schema;
import io.protostuff.runtime.RuntimeSchema;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.bus.event.RemoteApplicationEvent;
import org.eclipse.hawkbit.repository.event.remote.AbstractRemoteEvent;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.AbstractMessageConverter;
@@ -30,20 +30,20 @@ import org.springframework.util.MimeType;
* values of {@link EventType}.
*/
@Slf4j
public class BusProtoStuffMessageConverter extends AbstractMessageConverter {
public class EventProtoStuffMessageConverter extends AbstractMessageConverter {
public static final MimeType APPLICATION_BINARY_PROTOSTUFF = new MimeType("application", "binary+protostuff");
/** The length of the class type length of the payload. */
private static final byte EVENT_TYPE_LENGTH = 2;
public BusProtoStuffMessageConverter() {
public EventProtoStuffMessageConverter() {
super(APPLICATION_BINARY_PROTOSTUFF);
}
@Override
protected boolean supports(final Class<?> aClass) {
return RemoteApplicationEvent.class.isAssignableFrom(aClass);
return AbstractRemoteEvent.class.isAssignableFrom(aClass);
}
@Override

View File

@@ -10,23 +10,18 @@
package org.eclipse.hawkbit.event;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import io.protostuff.ProtostuffIOUtil;
import io.protostuff.Schema;
import org.eclipse.hawkbit.repository.event.ApplicationEventFilter;
import org.eclipse.hawkbit.repository.event.EventPublisherHolder;
import org.eclipse.hawkbit.repository.event.remote.AbstractRemoteEvent;
import org.eclipse.hawkbit.repository.event.remote.RemoteTenantAwareEvent;
import org.eclipse.hawkbit.repository.event.EventPublisherHolder;
import org.eclipse.hawkbit.security.SystemSecurityContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.bus.BusProperties;
import org.springframework.cloud.bus.ConditionalOnBusEnabled;
import org.springframework.cloud.bus.ServiceMatcher;
import org.springframework.cloud.bus.jackson.RemoteApplicationEventScan;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
@@ -37,12 +32,10 @@ import org.springframework.core.ResolvableType;
import org.springframework.messaging.converter.MessageConverter;
/**
* Autoconfiguration for the event bus.
* Autoconfiguration for the events.
*/
@Configuration
@RemoteApplicationEventScan(basePackages = "org.eclipse.hawkbit.repository.event.remote")
@PropertySource("classpath:/hawkbit-eventbus-defaults.properties")
@EnableConfigurationProperties(BusProperties.class)
@PropertySource("classpath:/hawkbit-events-defaults.properties")
public class EventPublisherConfiguration {
/**
@@ -66,7 +59,7 @@ public class EventPublisherConfiguration {
* @return the singleton instance of the {@link EventPublisherHolder}
*/
@Bean
EventPublisherHolder eventBusHolder() {
public EventPublisherHolder eventPublisherHolder() {
return EventPublisherHolder.getInstance();
}
@@ -84,19 +77,12 @@ public class EventPublisherConfiguration {
private final SystemSecurityContext systemSecurityContext;
private final ApplicationEventFilter applicationEventFilter;
private ServiceMatcher serviceMatcher;
protected TenantAwareApplicationEventPublisher(
final SystemSecurityContext systemSecurityContext, final ApplicationEventFilter applicationEventFilter) {
this.systemSecurityContext = systemSecurityContext;
this.applicationEventFilter = applicationEventFilter;
}
@Autowired(required = false)
public void setServiceMatcher(final ServiceMatcher serviceMatcher) {
this.serviceMatcher = serviceMatcher;
}
/**
* Was overridden that not every event has to run within an own tenantAware.
*/
@@ -106,33 +92,53 @@ public class EventPublisherConfiguration {
return;
}
if (serviceMatcher == null || !(event instanceof final RemoteTenantAwareEvent remoteEvent)) {
super.multicastEvent(event, eventType);
if (event instanceof final RemoteTenantAwareEvent remoteEvent) {
systemSecurityContext.runAsSystemAsTenant(() -> {
super.multicastEvent(event, eventType);
return null;
}, remoteEvent.getTenant());
return;
}
if (serviceMatcher.isFromSelf(remoteEvent)) {
super.multicastEvent(event, eventType);
return;
}
systemSecurityContext.runAsSystemAsTenant(() -> {
super.multicastEvent(event, eventType);
return null;
}, remoteEvent.getTenant());
super.multicastEvent(event, eventType);
}
}
@ConditionalOnBusEnabled
@ConditionalOnClass({ Schema.class, ProtostuffIOUtil.class })
protected static class BusProtoStuffAutoConfiguration {
@Bean
@ConditionalOnProperty(name = "org.eclipse.hawkbit.events.remote-enabled", havingValue = "true")
public Consumer<AbstractRemoteEvent> serviceEventConsumer(ApplicationEventPublisher publisher) {
return publisher::publishEvent;
}
@Bean
@ConditionalOnProperty(name = "org.eclipse.hawkbit.events.remote-enabled", havingValue = "true")
public Consumer<AbstractRemoteEvent> fanoutEventConsumer(ApplicationEventPublisher publisher) {
return publisher::publishEvent;
}
@ConditionalOnProperty(name = "org.eclipse.hawkbit.events.remote-enabled", havingValue = "true")
@ConditionalOnProperty(name = "spring.cloud.stream.default.content-type", havingValue = "application/binary+stuff")
protected static class EventProtoStuffAutoConfiguration {
/**
* @return the protostuff io message converter
* @return the protostuff io message converter for events
*/
@Bean
public MessageConverter busProtoBufConverter() {
return new BusProtoStuffMessageConverter();
public MessageConverter eventProtoStuffConverter() {
return new EventProtoStuffMessageConverter();
}
}
@ConditionalOnProperty(name = "org.eclipse.hawkbit.events.remote-enabled", havingValue = "true")
@ConditionalOnProperty(name = "spring.cloud.stream.default.content-type", havingValue = "application/remote-event-json")
protected static class EventJacksonAutoConfiguration {
/**
* @return the Jackson message converter for events
*/
@Bean
public MessageConverter eventJacksonMessageConverter() {
return new EventJacksonMessageConverter();
}
}
}

View File

@@ -9,10 +9,12 @@
*/
package org.eclipse.hawkbit.event;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
@@ -24,6 +26,13 @@ import org.eclipse.hawkbit.repository.event.remote.DistributionSetTypeDeletedEve
import org.eclipse.hawkbit.repository.event.remote.DownloadProgressEvent;
import org.eclipse.hawkbit.repository.event.remote.MultiActionAssignEvent;
import org.eclipse.hawkbit.repository.event.remote.MultiActionCancelEvent;
import org.eclipse.hawkbit.repository.event.remote.service.CancelTargetAssignmentServiceEvent;
import org.eclipse.hawkbit.repository.event.remote.service.MultiActionAssignServiceEvent;
import org.eclipse.hawkbit.repository.event.remote.service.MultiActionCancelServiceEvent;
import org.eclipse.hawkbit.repository.event.remote.service.TargetAssignDistributionSetServiceEvent;
import org.eclipse.hawkbit.repository.event.remote.service.TargetAttributesRequestedServiceEvent;
import org.eclipse.hawkbit.repository.event.remote.service.TargetCreatedServiceEvent;
import org.eclipse.hawkbit.repository.event.remote.service.TargetDeletedServiceEvent;
import org.eclipse.hawkbit.repository.event.remote.RolloutDeletedEvent;
import org.eclipse.hawkbit.repository.event.remote.RolloutGroupDeletedEvent;
import org.eclipse.hawkbit.repository.event.remote.RolloutStoppedEvent;
@@ -166,6 +175,15 @@ public class EventType {
TYPES.put(44, TargetTypeCreatedEvent.class);
TYPES.put(45, TargetTypeUpdatedEvent.class);
TYPES.put(46, TargetTypeDeletedEvent.class);
// processing events - start from 1000 to leave room for future db events
TYPES.put(1000, TargetCreatedServiceEvent.class);
TYPES.put(1001, TargetDeletedServiceEvent.class);
TYPES.put(1002, TargetAssignDistributionSetServiceEvent.class);
TYPES.put(1003, TargetAttributesRequestedServiceEvent.class);
TYPES.put(1004, CancelTargetAssignmentServiceEvent.class);
TYPES.put(1005, MultiActionAssignServiceEvent.class);
TYPES.put(1006, MultiActionCancelServiceEvent.class);
}
/**
@@ -197,4 +215,10 @@ public class EventType {
public Class<?> getTargetClass() {
return TYPES.get(value);
}
public static Collection<NamedType> getNamedTypes() {
return TYPES.entrySet().stream()
.map(e -> new NamedType(e.getValue(), String.valueOf(e.getKey())))
.toList();
}
}

View File

@@ -1,21 +0,0 @@
#
# Copyright (c) 2015 Bosch Software Innovations GmbH and others
#
# This program and the accompanying materials are made
# available under the terms of the Eclipse Public License 2.0
# which is available at https://www.eclipse.org/legal/epl-2.0/
#
# SPDX-License-Identifier: EPL-2.0
#
# Spring cloud bus and stream
spring.cloud.bus.enabled=true
# Disable Cloud Bus default events
spring.cloud.bus.env.enabled=false
spring.cloud.bus.ack.enabled=false
spring.cloud.bus.trace.enabled=false
spring.cloud.bus.refresh.enabled=false
# Disable Cloud Bus endpoints
management.endpoint.busrefresh.access=none
management.endpoint.busenv.access=none
# Spring cloud bus and stream END

View File

@@ -0,0 +1,47 @@
#
# Copyright (c) 2015 Bosch Software Innovations GmbH and others
#
# This program and the accompanying materials are made
# available under the terms of the Eclipse Public License 2.0
# which is available at https://www.eclipse.org/legal/epl-2.0/
#
# SPDX-License-Identifier: EPL-2.0
#
org.eclipse.hawkbit.events.remote-enabled=true
spring.cloud.function.definition=fanoutEventConsumer;serviceEventConsumer
spring.cloud.stream.default.content-type=application/remote-event-json
# -- Consumer bindings --
spring.cloud.stream.bindings.fanoutEventConsumer-in-0.destination=fanoutEventChannel
spring.cloud.stream.bindings.serviceEventConsumer-in-0.destination=serviceEventChannel
# -- Producer bindings (for StreamBridge) --
spring.cloud.stream.bindings.fanoutEventChannel.destination=fanoutEventChannel
spring.cloud.stream.bindings.serviceEventChannel.destination=serviceEventChannel
spring.cloud.stream.bindings.serviceEventConsumer-in-0.group=${spring.application.name}
# Performance
spring.cloud.stream.rabbit.binder.compressionLevel=0
spring.cloud.stream.rabbit.bindings.fanoutEventConsumer-in-0.consumer.anonymousGroupPrefix=${spring.application.name}-
spring.cloud.stream.rabbit.bindings.fanoutEventConsumer-in-0.consumer.durableSubscription=false
spring.cloud.stream.rabbit.bindings.fanoutEventConsumer-in-0.consumer.maxConcurrency=1
spring.cloud.stream.rabbit.bindings.fanoutEventConsumer-in-0.consumer.requeueRejected=false
spring.cloud.stream.rabbit.bindings.fanoutEventConsumer-in-0.consumer.prefetch=100
spring.cloud.stream.rabbit.bindings.serviceEventConsumer-in-0.consumer.maxConcurrency=1
spring.cloud.stream.rabbit.bindings.serviceEventConsumer-in-0.consumer.requeueRejected=false
spring.cloud.stream.rabbit.bindings.serviceEventConsumer-in-0.consumer.prefetch=100
spring.cloud.stream.rabbit.bindings.fanoutEventChannel.producer.declareExchange=false
spring.cloud.stream.rabbit.bindings.fanoutEventChannel.producer.batchingEnabled=true
spring.cloud.stream.rabbit.bindings.fanoutEventChannel.producer.batchSize=1000
spring.cloud.stream.rabbit.bindings.fanoutEventChannel.producer.batch-buffer-limit=100000
spring.cloud.stream.rabbit.bindings.fanoutEventChannel.producer.deliveryMode=NON_PERSISTENT
spring.cloud.stream.rabbit.bindings.serviceEventChannel.producer.declareExchange=false
spring.cloud.stream.rabbit.bindings.serviceEventChannel.producer.batchingEnabled=true
spring.cloud.stream.rabbit.bindings.serviceEventChannel.producer.batchSize=1000
spring.cloud.stream.rabbit.bindings.serviceEventChannel.producer.batch-buffer-limit=100000
spring.cloud.stream.rabbit.bindings.serviceEventChannel.producer.deliveryMode=NON_PERSISTENT

View File

@@ -0,0 +1,77 @@
/**
* Copyright (c) 2025 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
* which is available at https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.hawkbit.event;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.when;
import java.util.HashMap;
import org.eclipse.hawkbit.repository.event.remote.AbstractRemoteEvent;
import org.eclipse.hawkbit.repository.event.remote.entity.TargetCreatedEvent;
import org.eclipse.hawkbit.repository.model.Target;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
@ExtendWith(MockitoExtension.class)
class EventJacksonMessageConverterTest {
private final TestEventJacksonMessageConverter underTest = new TestEventJacksonMessageConverter();
@Mock
private Target targetMock;
@Mock
private Message<Object> messageMock;
@BeforeEach
void before() {
when(targetMock.getId()).thenReturn(1L);
}
/**
* Verifies that the TargetCreatedEvent can be successfully serialized and deserialized
*/
@Test
void successfullySerializeAndDeserializeEvent() {
final TargetCreatedEvent targetCreatedEvent = new TargetCreatedEvent(targetMock);
// serialize
final Object serializedEvent = underTest.convertToInternal(targetCreatedEvent,
new MessageHeaders(new HashMap<>()), null);
assertThat(serializedEvent).isInstanceOf(byte[].class);
// deserialize
when(messageMock.getPayload()).thenReturn(serializedEvent);
final Object deserializedEvent = underTest.convertFromInternal(messageMock, AbstractRemoteEvent.class, null);
assertThat(deserializedEvent)
.isInstanceOf(TargetCreatedEvent.class)
.isEqualTo(targetCreatedEvent);
}
/**
* Test subclass to expose protected methods for testing.
*/
private static class TestEventJacksonMessageConverter extends EventJacksonMessageConverter {
@Override
public Object convertToInternal(Object payload, MessageHeaders headers, Object conversionHint) {
return super.convertToInternal(payload, headers, conversionHint);
}
@Override
public Object convertFromInternal(Message<?> message, Class<?> targetClass, Object conversionHint) {
return super.convertFromInternal(message, targetClass, conversionHint);
}
}
}

View File

@@ -16,6 +16,7 @@ import static org.mockito.Mockito.when;
import java.io.Serial;
import java.util.HashMap;
import org.eclipse.hawkbit.repository.event.remote.AbstractRemoteEvent;
import org.eclipse.hawkbit.repository.event.remote.entity.RemoteEntityEvent;
import org.eclipse.hawkbit.repository.event.remote.entity.TargetCreatedEvent;
import org.eclipse.hawkbit.repository.model.Target;
@@ -24,15 +25,14 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.cloud.bus.event.RemoteApplicationEvent;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.MessageConversionException;
@ExtendWith(MockitoExtension.class)
class BusProtoStuffMessageConverterTest {
class EventProtoStuffMessageConverterTest {
private final BusProtoStuffMessageConverter underTest = new BusProtoStuffMessageConverter();
private final EventProtoStuffMessageConverter underTest = new EventProtoStuffMessageConverter();
@Mock
private Target targetMock;
@@ -58,7 +58,7 @@ class BusProtoStuffMessageConverterTest {
// deserialize
when(messageMock.getPayload()).thenReturn(serializedEvent);
final Object deserializedEvent = underTest.convertFromInternal(messageMock, RemoteApplicationEvent.class, null);
final Object deserializedEvent = underTest.convertFromInternal(messageMock, AbstractRemoteEvent.class, null);
assertThat(deserializedEvent)
.isInstanceOf(TargetCreatedEvent.class)
.isEqualTo(targetCreatedEvent);