Added polling for DMF simulated devices.

Signed-off-by: kaizimmerm <kai.zimmermann@bosch-si.com>
This commit is contained in:
kaizimmerm
2016-06-19 20:42:23 +02:00
parent 20ebbed27f
commit 58b883d773
14 changed files with 94 additions and 58 deletions

View File

@@ -27,7 +27,7 @@ public abstract class AbstractSimulatedDevice {
private UpdateStatus updateStatus = new UpdateStatus(ResponseStatus.SUCCESSFUL, "Simulation complete!");
private Protocol protocol = Protocol.DMF_AMQP;
private String targetSecurityToken;
private int pollDelaySec;
private int nextPollCounterSec;
/**
@@ -84,13 +84,26 @@ public abstract class AbstractSimulatedDevice {
* the ID of the simulated device
* @param tenant
* the tenant of the simulated device
* @param int
* pollDelaySec
*/
AbstractSimulatedDevice(final String id, final String tenant, final Protocol protocol) {
AbstractSimulatedDevice(final String id, final String tenant, final Protocol protocol, final int pollDelaySec) {
this.id = id;
this.tenant = tenant;
this.status = Status.UNKNWON;
this.progress = 0.0;
this.protocol = protocol;
this.pollDelaySec = pollDelaySec;
}
abstract public void poll();
public int getPollDelaySec() {
return pollDelaySec;
}
public void setPollDelaySec(final int pollDelaySec) {
this.pollDelaySec = pollDelaySec;
}
/**

View File

@@ -23,7 +23,6 @@ public class DDISimulatedDevice extends AbstractSimulatedDevice {
private static final Logger LOGGER = LoggerFactory.getLogger(DDISimulatedDevice.class);
private final int pollDelaySec;
private final ControllerResource controllerResource;
private final DeviceSimulatorUpdater deviceUpdater;
@@ -45,11 +44,9 @@ public class DDISimulatedDevice extends AbstractSimulatedDevice {
*/
public DDISimulatedDevice(final String id, final String tenant, final int pollDelaySec,
final ControllerResource controllerResource, final DeviceSimulatorUpdater deviceUpdater) {
super(id, tenant, Protocol.DDI_HTTP);
this.pollDelaySec = pollDelaySec;
super(id, tenant, Protocol.DDI_HTTP, pollDelaySec);
this.controllerResource = controllerResource;
this.deviceUpdater = deviceUpdater;
setNextPollCounterSec(pollDelaySec);
}
@Override
@@ -58,13 +55,10 @@ public class DDISimulatedDevice extends AbstractSimulatedDevice {
removed = true;
}
public int getPollDelaySec() {
return pollDelaySec;
}
/**
* Polls the base URL for the DDI API interface.
*/
@Override
public void poll() {
if (!removed) {
final String basePollJson = controllerResource.get(getTenant(), getId());

View File

@@ -8,10 +8,13 @@
*/
package org.eclipse.hawkbit.simulator;
import org.eclipse.hawkbit.simulator.amqp.SpSenderService;
/**
* A simulated device using the DMF API of the hawkBit update server.
*/
public class DMFSimulatedDevice extends AbstractSimulatedDevice {
private final SpSenderService spSenderService;
/**
* @param id
@@ -19,8 +22,15 @@ public class DMFSimulatedDevice extends AbstractSimulatedDevice {
* @param tenant
* the tenant of the simulated device
*/
public DMFSimulatedDevice(final String id, final String tenant) {
super(id, tenant, Protocol.DMF_AMQP);
public DMFSimulatedDevice(final String id, final String tenant, final SpSenderService spSenderService,
final int pollDelaySec) {
super(id, tenant, Protocol.DMF_AMQP, pollDelaySec);
this.spSenderService = spSenderService;
}
@Override
public void poll() {
spSenderService.createOrUpdateThing(super.getTenant(), super.getId());
}
}

View File

@@ -9,8 +9,8 @@
package org.eclipse.hawkbit.simulator;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@@ -25,7 +25,7 @@ import org.springframework.stereotype.Service;
@Service
public class DeviceSimulatorRepository {
private final Map<DeviceKey, AbstractSimulatedDevice> devices = new LinkedHashMap<>();
private final Map<DeviceKey, AbstractSimulatedDevice> devices = new ConcurrentHashMap<>();
@Autowired
private SimulatedDeviceFactory deviceFactory;

View File

@@ -8,12 +8,11 @@
*/
package org.eclipse.hawkbit.simulator;
import java.util.List;
import java.util.Collection;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.eclipse.hawkbit.simulator.event.NextPollCounterUpdate;
import org.slf4j.Logger;
@@ -51,18 +50,17 @@ public class NextPollTimeController {
private class NextPollUpdaterRunnable implements Runnable {
@Override
public void run() {
final List<AbstractSimulatedDevice> devices = repository.getAll().stream()
.filter(device -> device instanceof DDISimulatedDevice).collect(Collectors.toList());
final Collection<AbstractSimulatedDevice> devices = repository.getAll();
devices.forEach(device -> {
int nextCounter = device.getNextPollCounterSec() - 1;
if (nextCounter < 0 && device instanceof DDISimulatedDevice) {
if (nextCounter < 0) {
try {
pollService.submit(() -> ((DDISimulatedDevice) device).poll());
pollService.submit(() -> device.poll());
} catch (final IllegalStateException e) {
LOGGER.trace("Device could not be polled", e);
}
nextCounter = ((DDISimulatedDevice) device).getPollDelaySec();
nextCounter = device.getPollDelaySec();
}
device.setNextPollCounterSec(nextCounter);

View File

@@ -11,6 +11,7 @@ package org.eclipse.hawkbit.simulator;
import java.net.URL;
import org.eclipse.hawkbit.simulator.AbstractSimulatedDevice.Protocol;
import org.eclipse.hawkbit.simulator.amqp.SpSenderService;
import org.eclipse.hawkbit.simulator.http.ControllerResource;
import org.eclipse.hawkbit.simulator.http.GatewayTokenInterceptor;
import org.springframework.beans.factory.annotation.Autowired;
@@ -28,6 +29,9 @@ public class SimulatedDeviceFactory {
@Autowired
private DeviceSimulatorUpdater deviceUpdater;
@Autowired
private SpSenderService spSenderService;
/**
* Creating a simulated devices.
*
@@ -55,7 +59,7 @@ public class SimulatedDeviceFactory {
* the protocol which should be used be the simulated device
* @param pollDelaySec
* the poll delay time in seconds which should be used for
* {@link DDISimulatedDevice}s
* {@link DDISimulatedDevice}s and {@link DMFSimulatedDevice}
* @param baseEndpoint
* the http base endpoint which should be used for
* {@link DDISimulatedDevice}s
@@ -68,7 +72,7 @@ public class SimulatedDeviceFactory {
final int pollDelaySec, final URL baseEndpoint, final String gatewayToken) {
switch (protocol) {
case DMF_AMQP:
return new DMFSimulatedDevice(id, tenant);
return new DMFSimulatedDevice(id, tenant, spSenderService, pollDelaySec);
case DDI_HTTP:
final ControllerResource controllerResource = Feign.builder().logger(new Logger.ErrorLogger())
.requestInterceptor(new GatewayTokenInterceptor(gatewayToken)).logLevel(Logger.Level.BASIC)

View File

@@ -66,7 +66,7 @@ public class SimulationController {
@RequestParam(value = "tenant", defaultValue = "DEFAULT") final String tenant,
@RequestParam(value = "api", defaultValue = "dmf") final String api,
@RequestParam(value = "endpoint", defaultValue = "http://localhost:8080") final String endpoint,
@RequestParam(value = "polldelay", defaultValue = "30") final int pollDelay,
@RequestParam(value = "polldelay", defaultValue = "1800") final int pollDelay,
@RequestParam(value = "gatewaytoken", defaultValue = "") final String gatewayToken)
throws MalformedURLException {

View File

@@ -10,6 +10,7 @@ package org.eclipse.hawkbit.simulator;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.eclipse.hawkbit.simulator.AbstractSimulatedDevice.Protocol;
import org.hibernate.validator.constraints.NotEmpty;
@@ -68,9 +69,9 @@ public class SimulationProperties {
private String endpoint = "http://localhost:8080";
/**
* Poll time in case of DDI API based simulation.
* Poll time in {@link TimeUnit#SECONDS} for simulated devices.
*/
private int pollDelay = 30;
private int pollDelay = (int) TimeUnit.MINUTES.toSeconds(30);
/**
* Optional gateway token for DDI API based simulation.

View File

@@ -20,11 +20,13 @@ import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.amqp.RabbitProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@@ -73,6 +75,42 @@ public class AmqpConfiguration {
return rabbitTemplate;
}
@Configuration
protected static class RabbitConnectionFactoryCreator {
/**
* {@link ConnectionFactory} with enabled publisher confirms and
* heartbeat.
*
* @param config
* with standard {@link RabbitProperties}
* @return {@link ConnectionFactory}
*/
@Bean
public ConnectionFactory rabbitConnectionFactory(final RabbitProperties config) {
final CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setRequestedHeartBeat(60);
factory.setPublisherConfirms(true);
final String addresses = config.getAddresses();
factory.setAddresses(addresses);
if (config.getHost() != null) {
factory.setHost(config.getHost());
factory.setPort(config.getPort());
}
if (config.getUsername() != null) {
factory.setUsername(config.getUsername());
}
if (config.getPassword() != null) {
factory.setPassword(config.getPassword());
}
if (config.getVirtualHost() != null) {
factory.setVirtualHost(config.getVirtualHost());
}
return factory;
}
}
/**
* Creates the receiver queue from update server for receiving message from
* update server.

View File

@@ -8,7 +8,7 @@
*/
package org.eclipse.hawkbit.simulator.event;
import java.util.List;
import java.util.Collection;
import org.eclipse.hawkbit.simulator.AbstractSimulatedDevice;
@@ -20,7 +20,7 @@ import org.eclipse.hawkbit.simulator.AbstractSimulatedDevice;
*/
public class NextPollCounterUpdate {
private final List<AbstractSimulatedDevice> devices;
private final Collection<AbstractSimulatedDevice> devices;
/**
* Creates poll timer update event.
@@ -28,14 +28,14 @@ public class NextPollCounterUpdate {
* @param devices
* the devices which progress has been updated
*/
public NextPollCounterUpdate(final List<AbstractSimulatedDevice> devices) {
public NextPollCounterUpdate(final Collection<AbstractSimulatedDevice> devices) {
this.devices = devices;
}
/**
* @return the devices of the event
*/
public List<AbstractSimulatedDevice> getDevices() {
public Collection<AbstractSimulatedDevice> getDevices() {
return devices;
}

View File

@@ -8,7 +8,7 @@
*/
package org.eclipse.hawkbit.simulator.ui;
import java.util.List;
import java.util.Collection;
import java.util.Locale;
import org.eclipse.hawkbit.simulator.AbstractSimulatedDevice;
@@ -167,7 +167,7 @@ public class SimulatorView extends VerticalLayout implements View {
@SuppressWarnings("unchecked")
@Subscribe
public void pollCounterUpdate(final NextPollCounterUpdate update) {
final List<AbstractSimulatedDevice> devices = update.getDevices();
final Collection<AbstractSimulatedDevice> devices = update.getDevices();
this.getUI().access(() -> devices.forEach(device -> {
final BeanItem<AbstractSimulatedDevice> item = beanContainer.getItem(device.getId());
if (item != null) {

View File

@@ -24,7 +24,6 @@ spring.rabbitmq.virtualHost=/
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.dynamic=true
spring.rabbitmq.listener.prefetch=100
security.basic.enabled=false
server.port=8083

View File

@@ -20,11 +20,6 @@
<Logger name="org.apache.coyote.http11.Http11NioProtocol" level="WARN" />
<Logger name="org.apache.tomcat.util.net.NioSelectorPool" level="WARN" />
<Logger name="org.apache.catalina.startup.DigesterFactory" level="ERROR" />
<Logger name="org.eclipse" level="DEBUG" />
<!-- Security Log with hints on potential attacks -->
<logger name="server-security" level="INFO" />
<Root level="INFO">
<AppenderRef ref="Console" />