Fix missing TargetUpdate events (#476)

* Fix problem where target assignment sends not all events.

Signed-off-by: kaizimmerm <kai.zimmermann@bosch-si.com>

* Improved concurrency.

Signed-off-by: kaizimmerm <kai.zimmermann@bosch-si.com>

* Revert send assigned event change.

Signed-off-by: kaizimmerm <kai.zimmermann@bosch-si.com>

* Adapt tests.

Signed-off-by: kaizimmerm <kai.zimmermann@bosch-si.com>

* Readibility.

Signed-off-by: kaizimmerm <kai.zimmermann@bosch-si.com>
This commit is contained in:
Kai Zimmermann
2017-04-12 21:40:48 +02:00
committed by GitHub
parent 84b1980d32
commit 32fe53708e
6 changed files with 111 additions and 94 deletions

View File

@@ -61,7 +61,7 @@ public class AmqpMessageDispatcherServiceIntegrationTest extends AmqpServiceInte
@Expect(type = ActionCreatedEvent.class, count = 2), @Expect(type = ActionUpdatedEvent.class, count = 1),
@Expect(type = SoftwareModuleCreatedEvent.class, count = 6),
@Expect(type = DistributionSetCreatedEvent.class, count = 2),
@Expect(type = TargetUpdatedEvent.class, count = 1), @Expect(type = TargetPollEvent.class, count = 3) })
@Expect(type = TargetUpdatedEvent.class, count = 2), @Expect(type = TargetPollEvent.class, count = 3) })
public void assignDistributionSetMultipleTimes() {
final DistributionSetAssignmentResult assignmentResult = registerTargetAndAssignDistributionSet();
@@ -98,8 +98,8 @@ public class AmqpMessageDispatcherServiceIntegrationTest extends AmqpServiceInte
waitUntil(() -> {
final Optional<Target> findTargetByControllerID = targetManagement
.findTargetByControllerID(REGISTER_TARGET);
return findTargetByControllerID.isPresent() && TargetUpdateStatus.PENDING
.equals(findTargetByControllerID.get().getUpdateStatus());
return findTargetByControllerID.isPresent()
&& TargetUpdateStatus.PENDING.equals(findTargetByControllerID.get().getUpdateStatus());
});
}

View File

@@ -886,7 +886,8 @@ public class MgmtTargetResourceTest extends AbstractManagementApiIntegrationTest
.andExpect(status().isOk())
.andExpect(jsonPath("content.[0].id", equalTo(actionStatus.get(0).getId().intValue())))
.andExpect(jsonPath("content.[0].type", equalTo("canceling")))
.andExpect(jsonPath("content.[0].messages", hasItem("manual cancelation requested")))
.andExpect(jsonPath("content.[0].messages",
hasItem("Update Server: cancel obsolete action due to new update")))
.andExpect(jsonPath("content.[0].reportedAt", equalTo(actionStatus.get(0).getCreatedAt())))
.andExpect(jsonPath("content.[1].id", equalTo(actionStatus.get(1).getId().intValue())))
.andExpect(jsonPath("content.[1].type", equalTo("running")))
@@ -912,7 +913,8 @@ public class MgmtTargetResourceTest extends AbstractManagementApiIntegrationTest
.andDo(MockMvcResultPrinter.print()).andExpect(status().isOk())
.andExpect(jsonPath("content.[0].id", equalTo(actionStatus.get(1).getId().intValue())))
.andExpect(jsonPath("content.[0].type", equalTo("canceling")))
.andExpect(jsonPath("content.[0].messages", hasItem("manual cancelation requested")))
.andExpect(jsonPath("content.[0].messages",
hasItem("Update Server: cancel obsolete action due to new update")))
.andExpect(jsonPath("content.[0].reportedAt", equalTo(actionStatus.get(1).getCreatedAt())))
.andExpect(jsonPath("content.[1].id", equalTo(actionStatus.get(0).getId().intValue())))
.andExpect(jsonPath("content.[1].type", equalTo("running")))
@@ -929,7 +931,8 @@ public class MgmtTargetResourceTest extends AbstractManagementApiIntegrationTest
.andDo(MockMvcResultPrinter.print()).andExpect(status().isOk())
.andExpect(jsonPath("content.[1].id", equalTo(actionStatus.get(1).getId().intValue())))
.andExpect(jsonPath("content.[1].type", equalTo("canceling")))
.andExpect(jsonPath("content.[1].messages", hasItem("manual cancelation requested")))
.andExpect(jsonPath("content.[1].messages",
hasItem("Update Server: cancel obsolete action due to new update")))
.andExpect(jsonPath("content.[1].reportedAt", equalTo(actionStatus.get(1).getCreatedAt())))
.andExpect(jsonPath("content.[0].id", equalTo(actionStatus.get(0).getId().intValue())))
.andExpect(jsonPath("content.[0].type", equalTo("running")))
@@ -956,7 +959,8 @@ public class MgmtTargetResourceTest extends AbstractManagementApiIntegrationTest
.andDo(MockMvcResultPrinter.print()).andExpect(status().isOk())
.andExpect(jsonPath("content.[0].id", equalTo(actionStatus.get(1).getId().intValue())))
.andExpect(jsonPath("content.[0].type", equalTo("canceling")))
.andExpect(jsonPath("content.[0].messages", hasItem("manual cancelation requested")))
.andExpect(jsonPath("content.[0].messages",
hasItem("Update Server: cancel obsolete action due to new update")))
.andExpect(jsonPath("content.[0].reportedAt", equalTo(actionStatus.get(1).getCreatedAt())))
.andExpect(jsonPath(JSON_PATH_PAGED_LIST_TOTAL, equalTo(2)))
.andExpect(jsonPath(JSON_PATH_PAGED_LIST_SIZE, equalTo(1)))

View File

@@ -11,7 +11,6 @@ package org.eclipse.hawkbit.repository.jpa;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -223,7 +222,7 @@ public class JpaDeploymentManagement implements DeploymentManagement {
if (targets.isEmpty()) {
// detaching as it is not necessary to persist the set itself
entityManager.detach(set);
entityManager.clear();
// return with nothing as all targets had the DS already assigned
return new DistributionSetAssignmentResult(Collections.emptyList(), 0, targetsWithActionType.size(),
Collections.emptyList(), targetManagement);
@@ -236,8 +235,8 @@ public class JpaDeploymentManagement implements DeploymentManagement {
// need to remember which one we have been switched to canceling state
// because for targets which we have changed to canceling we don't want
// to publish the new action update event.
final Set<Long> targetIdsCancellList = new HashSet<>();
targetIds.forEach(ids -> targetIdsCancellList.addAll(overrideObsoleteUpdateActions(ids)));
final Set<Long> targetIdsCancellList = targetIds.stream().map(this::overrideObsoleteUpdateActions)
.flatMap(Collection::stream).collect(Collectors.toSet());
// cancel all scheduled actions which are in-active, these actions were
// not active before and the manual assignment which has been done
@@ -254,9 +253,8 @@ public class JpaDeploymentManagement implements DeploymentManagement {
targetIds.forEach(tIds -> targetRepository.setAssignedDistributionSetAndUpdateStatus(TargetUpdateStatus.PENDING,
set, System.currentTimeMillis(), currentUser, tIds));
final Map<String, JpaAction> targetIdsToActions = targets.stream()
.map(t -> actionRepository
.save(createTargetAction(targetsWithActionMap, t, set, rollout, rolloutGroup)))
final Map<String, JpaAction> targetIdsToActions = targets.stream().map(
t -> actionRepository.save(createTargetAction(targetsWithActionMap, t, set, rollout, rolloutGroup)))
.collect(Collectors.toMap(a -> a.getTarget().getControllerId(), Function.identity()));
// create initial action status when action is created so we remember
@@ -267,6 +265,9 @@ public class JpaDeploymentManagement implements DeploymentManagement {
// flush to get action IDs
entityManager.flush();
// detaching as everything that needs to be stored is already flushed
entityManager.clear();
// collect updated target and actions IDs in order to return them
final DistributionSetAssignmentResult result = new DistributionSetAssignmentResult(
targets.stream().map(Target::getControllerId).collect(Collectors.toList()), targets.size(),
@@ -275,18 +276,22 @@ public class JpaDeploymentManagement implements DeploymentManagement {
LOG.debug("assignDistribution({}) finished {}", set, result);
// detaching as it is not necessary to persist the set itself
entityManager.detach(set);
sendDistributionSetAssignmentEvent(targets, targetIdsCancellList, targetIdsToActions);
sendAssignmentEvents(targets, targetIdsCancellList, targetIdsToActions);
return result;
}
private void sendDistributionSetAssignmentEvent(final List<JpaTarget> targets, final Set<Long> targetIdsCancellList,
private void sendAssignmentEvents(final List<JpaTarget> targets, final Set<Long> targetIdsCancellList,
final Map<String, JpaAction> targetIdsToActions) {
targets.stream().filter(t -> !!!targetIdsCancellList.contains(t.getId()))
.forEach(t -> assignDistributionSetEvent(targetIdsToActions.get(t.getControllerId())));
targets.forEach(target -> {
sendTargetUpdatedEvent(target);
if (targetIdsCancellList.contains(target.getId())) {
return;
}
sendTargetAssignDistributionSetEvent(targetIdsToActions.get(target.getControllerId()));
});
}
private static JpaAction createTargetAction(final Map<String, TargetWithActionType> targetsWithActionMap,
@@ -305,18 +310,19 @@ public class JpaDeploymentManagement implements DeploymentManagement {
return actionForTarget;
}
private void assignDistributionSetEvent(final Action action) {
private void sendTargetAssignDistributionSetEvent(final Action action) {
afterCommit.afterCommit(() -> eventPublisher
.publishEvent(new TargetAssignDistributionSetEvent(action, applicationContext.getId())));
}
private void sendTargetUpdatedEvent(final JpaTarget target) {
// Update is not available in the object as the update was executed
// through JQL
final JpaTarget target = (JpaTarget) action.getTarget();
target.setUpdateStatus(TargetUpdateStatus.PENDING);
entityManager.detach(target);
afterCommit.afterCommit(
() -> eventPublisher.publishEvent(new TargetUpdatedEvent(target, applicationContext.getId())));
afterCommit.afterCommit(() -> eventPublisher
.publishEvent(new TargetAssignDistributionSetEvent(action, applicationContext.getId())));
}
/**
@@ -326,7 +332,7 @@ public class JpaDeploymentManagement implements DeploymentManagement {
* @param targetsIds
* to override {@link Action}s
*/
private Set<Long> overrideObsoleteUpdateActions(final List<Long> targetsIds) {
private List<Long> overrideObsoleteUpdateActions(final Collection<Long> targetsIds) {
// Figure out if there are potential target/action combinations that
// need to be considered for cancellation
@@ -339,13 +345,13 @@ public class JpaDeploymentManagement implements DeploymentManagement {
// document that the status has been retrieved
actionStatusRepository.save(new JpaActionStatus(action, Status.CANCELING, System.currentTimeMillis(),
"manual cancelation requested"));
RepositoryConstants.SERVER_MESSAGE_PREFIX + "cancel obsolete action due to new update"));
actionRepository.save(action);
cancelAssignDistributionSetEvent(action.getTarget(), action.getId());
return action.getTarget().getId();
}).collect(Collectors.toSet());
}).collect(Collectors.toList());
}
@@ -368,7 +374,7 @@ public class JpaDeploymentManagement implements DeploymentManagement {
// document that the status has been retrieved
actionStatusRepository.save(new JpaActionStatus(action, Status.CANCELING, System.currentTimeMillis(),
"manual cancelation requested"));
RepositoryConstants.SERVER_MESSAGE_PREFIX + "manual cancelation requested"));
final Action saveAction = actionRepository.save(action);
cancelAssignDistributionSetEvent(action.getTarget(), action.getId());
@@ -412,7 +418,7 @@ public class JpaDeploymentManagement implements DeploymentManagement {
// document that the status has been retrieved
actionStatusRepository.save(new JpaActionStatus(action, Status.CANCELED, System.currentTimeMillis(),
"A force quit has been performed."));
RepositoryConstants.SERVER_MESSAGE_PREFIX + "A force quit has been performed."));
DeploymentHelper.successCancellation(action, actionRepository, targetRepository);
@@ -420,8 +426,6 @@ public class JpaDeploymentManagement implements DeploymentManagement {
}
@Override
@Modifying
@Transactional(isolation = Isolation.READ_COMMITTED)
public long startScheduledActionsByRolloutGroupParent(@NotNull final Long rolloutId,
final Long rolloutGroupParentId) {
long totalActionsCount = 0L;
@@ -438,7 +442,7 @@ public class JpaDeploymentManagement implements DeploymentManagement {
private long startScheduledActionsByRolloutGroupParentInNewTransaction(final Long rolloutId,
final Long rolloutGroupParentId, final int limit) {
final DefaultTransactionDefinition def = new DefaultTransactionDefinition();
def.setName("startScheduledActions");
def.setName("startScheduledActions-" + rolloutId);
def.setReadOnly(false);
def.setIsolationLevel(Isolation.READ_UNCOMMITTED.value());
def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
@@ -480,7 +484,7 @@ public class JpaDeploymentManagement implements DeploymentManagement {
}
// check if we need to override running update actions
final Set<Long> overrideObsoleteUpdateActions = overrideObsoleteUpdateActions(
final List<Long> overrideObsoleteUpdateActions = overrideObsoleteUpdateActions(
Collections.singletonList(action.getTarget().getId()));
action.setActive(true);

View File

@@ -9,6 +9,7 @@
package org.eclipse.hawkbit.repository.jpa;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
@@ -23,8 +24,13 @@ import java.util.stream.Collectors;
import org.eclipse.hawkbit.repository.ActionStatusFields;
import org.eclipse.hawkbit.repository.event.remote.TargetAssignDistributionSetEvent;
import org.eclipse.hawkbit.repository.event.remote.entity.ActionCreatedEvent;
import org.eclipse.hawkbit.repository.event.remote.entity.ActionUpdatedEvent;
import org.eclipse.hawkbit.repository.event.remote.entity.CancelTargetAssignmentEvent;
import org.eclipse.hawkbit.repository.event.remote.entity.DistributionSetCreatedEvent;
import org.eclipse.hawkbit.repository.event.remote.entity.SoftwareModuleCreatedEvent;
import org.eclipse.hawkbit.repository.event.remote.entity.TargetCreatedEvent;
import org.eclipse.hawkbit.repository.event.remote.entity.TargetUpdatedEvent;
import org.eclipse.hawkbit.repository.exception.EntityNotFoundException;
import org.eclipse.hawkbit.repository.exception.ForceQuitActionNotAllowedException;
import org.eclipse.hawkbit.repository.exception.IncompleteDistributionSetException;
@@ -222,53 +228,21 @@ public class DeploymentManagementTest extends AbstractJpaIntegrationTest {
final DistributionSetTag tag = tagManagement
.createDistributionSetTag(entityFactory.tag().create().name("Tag1"));
try {
distributionSetManagement.assignTag(assignDS, tag.getId());
fail("It should not be possible to assign a DS that does not exist");
} catch (final EntityNotFoundException e) {
// Ok
}
}
@Test
@Description("Ensures that distribution sets can assigned and unassigned to a distribution set tag.")
public void assignAndUnassignDistributionSetToTag() {
final List<Long> assignDS = Lists.newArrayListWithExpectedSize(4);
for (int i = 0; i < 4; i++) {
assignDS.add(testdataFactory.createDistributionSet("DS" + i, "1.0", Collections.emptyList()).getId());
}
final DistributionSetTag tag = tagManagement
.createDistributionSetTag(entityFactory.tag().create().name("Tag1"));
final List<DistributionSet> assignedDS = distributionSetManagement.assignTag(assignDS, tag.getId());
assertThat(assignedDS.size()).as("assigned ds has wrong size").isEqualTo(4);
assignedDS.forEach(ds -> assertThat(ds.getTags().size()).as("ds has wrong tag size").isEqualTo(1));
DistributionSetTag findDistributionSetTag = tagManagement.findDistributionSetTag("Tag1").get();
assertThat(assignedDS.size()).as("assigned ds has wrong size")
.isEqualTo(findDistributionSetTag.getAssignedToDistributionSet().size());
final DistributionSet unAssignDS = distributionSetManagement.unAssignTag(assignDS.get(0),
findDistributionSetTag.getId());
assertThat(unAssignDS.getId()).as("unassigned ds is wrong").isEqualTo(assignDS.get(0));
assertThat(unAssignDS.getTags().size()).as("unassigned ds has wrong tag size").isEqualTo(0);
findDistributionSetTag = tagManagement.findDistributionSetTag("Tag1").get();
assertThat(findDistributionSetTag.getAssignedToDistributionSet().size()).as("ds tag ds has wrong ds size")
.isEqualTo(3);
final List<DistributionSet> unAssignTargets = distributionSetManagement
.unAssignAllDistributionSetsByTag(findDistributionSetTag.getId());
findDistributionSetTag = tagManagement.findDistributionSetTag("Tag1").get();
assertThat(findDistributionSetTag.getAssignedToDistributionSet().size()).as("ds tag has wrong ds size")
.isEqualTo(0);
assertThat(unAssignTargets.size()).as("unassigned target has wrong size").isEqualTo(3);
unAssignTargets
.forEach(target -> assertThat(target.getTags().size()).as("target has wrong tag size").isEqualTo(0));
assertThatExceptionOfType(EntityNotFoundException.class)
.isThrownBy(() -> distributionSetManagement.assignTag(assignDS, tag.getId()))
.withMessageContaining("DistributionSet").withMessageContaining(String.valueOf(tag.getId()));
}
@Test
@Description("Test verifies that an assignment with automatic cancelation works correctly even if the update is split into multiple partitions on the database.")
@ExpectEvents({ @Expect(type = TargetCreatedEvent.class, count = Constants.MAX_ENTRIES_IN_STATEMENT + 10),
@Expect(type = TargetUpdatedEvent.class, count = 2 * (Constants.MAX_ENTRIES_IN_STATEMENT + 10)),
@Expect(type = TargetAssignDistributionSetEvent.class, count = Constants.MAX_ENTRIES_IN_STATEMENT + 10),
@Expect(type = ActionCreatedEvent.class, count = 2 * (Constants.MAX_ENTRIES_IN_STATEMENT + 10)),
@Expect(type = CancelTargetAssignmentEvent.class, count = Constants.MAX_ENTRIES_IN_STATEMENT + 10),
@Expect(type = ActionUpdatedEvent.class, count = Constants.MAX_ENTRIES_IN_STATEMENT + 10),
@Expect(type = DistributionSetCreatedEvent.class, count = 2),
@Expect(type = SoftwareModuleCreatedEvent.class, count = 6) })
public void multiAssigmentHistoryOverMultiplePagesResultsInTwoActiveAction() {
final DistributionSet cancelDs = testdataFactory.createDistributionSet("Canceled DS", "1.0",
@@ -277,15 +251,14 @@ public class DeploymentManagementTest extends AbstractJpaIntegrationTest {
final DistributionSet cancelDs2 = testdataFactory.createDistributionSet("Canceled DS", "1.2",
Collections.emptyList());
List<Target> targets = testdataFactory.createTargets(Constants.MAX_ENTRIES_IN_STATEMENT + 10);
final List<Target> targets = testdataFactory.createTargets(Constants.MAX_ENTRIES_IN_STATEMENT + 10);
targets = assignDistributionSet(cancelDs, targets).getAssignedEntity();
targets = assignDistributionSet(cancelDs2, targets).getAssignedEntity();
assertThat(deploymentManagement.countActionsAll()).isEqualTo(0);
targetManagement.findTargetsAll(pageReq).getContent().forEach(targetIdName -> {
assertThat(deploymentManagement.findActiveActionsByTarget(targetIdName.getControllerId()))
.as("active action has wrong size").hasSize(2);
});
assignDistributionSet(cancelDs, targets).getAssignedEntity();
assertThat(deploymentManagement.countActionsAll()).isEqualTo(Constants.MAX_ENTRIES_IN_STATEMENT + 10);
assignDistributionSet(cancelDs2, targets).getAssignedEntity();
assertThat(deploymentManagement.countActionsAll()).isEqualTo(2 * (Constants.MAX_ENTRIES_IN_STATEMENT + 10));
}
@Test
@@ -388,7 +361,7 @@ public class DeploymentManagementTest extends AbstractJpaIntegrationTest {
@Description("Force Quit an Assignment. Expected behaviour is that the action is canceled and is marked as deleted. The assigned Software module")
public void forceQuitSetActionToInactive() throws InterruptedException {
final Action action = prepareFinishedUpdate("4712", "installed", true);
Target target = action.getTarget();
final Target target = action.getTarget();
final DistributionSet dsInstalled = action.getDistributionSet();
final DistributionSet ds = testdataFactory.createDistributionSet("newDS", true);
@@ -403,8 +376,6 @@ public class DeploymentManagementTest extends AbstractJpaIntegrationTest {
assertThat(actionRepository.findAll()).as("wrong size of action").hasSize(2);
assertThat(actionStatusRepository.findAll()).as("wrong size of action status").hasSize(4);
target = targetManagement.findTargetByControllerID(target.getControllerId()).get();
// force quit assignment
deploymentManagement.cancelAction(assigningAction.getId());
assigningAction = deploymentManagement.findActionWithDetails(assigningAction.getId()).get();

View File

@@ -365,6 +365,43 @@ public class DistributionSetManagementTest extends AbstractJpaIntegrationTest {
assertThat(createdMetadata.getValue()).isEqualTo(knownValue);
}
@Test
@Description("Ensures that distribution sets can assigned and unassigned to a distribution set tag.")
public void assignAndUnassignDistributionSetToTag() {
final List<Long> assignDS = Lists.newArrayListWithExpectedSize(4);
for (int i = 0; i < 4; i++) {
assignDS.add(testdataFactory.createDistributionSet("DS" + i, "1.0", Collections.emptyList()).getId());
}
final DistributionSetTag tag = tagManagement
.createDistributionSetTag(entityFactory.tag().create().name("Tag1"));
final List<DistributionSet> assignedDS = distributionSetManagement.assignTag(assignDS, tag.getId());
assertThat(assignedDS.size()).as("assigned ds has wrong size").isEqualTo(4);
assignedDS.forEach(ds -> assertThat(ds.getTags().size()).as("ds has wrong tag size").isEqualTo(1));
DistributionSetTag findDistributionSetTag = tagManagement.findDistributionSetTag("Tag1").get();
assertThat(assignedDS.size()).as("assigned ds has wrong size")
.isEqualTo(findDistributionSetTag.getAssignedToDistributionSet().size());
final DistributionSet unAssignDS = distributionSetManagement.unAssignTag(assignDS.get(0),
findDistributionSetTag.getId());
assertThat(unAssignDS.getId()).as("unassigned ds is wrong").isEqualTo(assignDS.get(0));
assertThat(unAssignDS.getTags().size()).as("unassigned ds has wrong tag size").isEqualTo(0);
findDistributionSetTag = tagManagement.findDistributionSetTag("Tag1").get();
assertThat(findDistributionSetTag.getAssignedToDistributionSet().size()).as("ds tag ds has wrong ds size")
.isEqualTo(3);
final List<DistributionSet> unAssignTargets = distributionSetManagement
.unAssignAllDistributionSetsByTag(findDistributionSetTag.getId());
findDistributionSetTag = tagManagement.findDistributionSetTag("Tag1").get();
assertThat(findDistributionSetTag.getAssignedToDistributionSet().size()).as("ds tag has wrong ds size")
.isEqualTo(0);
assertThat(unAssignTargets.size()).as("unassigned target has wrong size").isEqualTo(3);
unAssignTargets
.forEach(target -> assertThat(target.getTags().size()).as("target has wrong tag size").isEqualTo(0));
}
@Test
@Description("Ensures that updates concerning the internal software structure of a DS are not possible if the DS is already assigned.")
public void updateDistributionSetForbiddedWithIllegalUpdate() {

View File

@@ -16,6 +16,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.eclipse.hawkbit.repository.test.util.TestContextProvider;
import org.junit.Assert;
@@ -27,7 +28,7 @@ import org.springframework.context.ApplicationListener;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.event.ApplicationEventMulticaster;
import com.google.common.collect.HashMultiset;
import com.google.common.collect.ConcurrentHashMultiset;
import com.google.common.collect.Multiset;
import com.google.common.collect.Sets;
import com.jayway.awaitility.Awaitility;
@@ -111,20 +112,20 @@ public class EventVerifier implements TestRule {
private static class EventCaptor implements ApplicationListener<RemoteApplicationEvent> {
private final Multiset<Class<?>> capturedEvents = HashMultiset.create();
private final Multiset<Class<?>> capturedEvents = ConcurrentHashMultiset.create();
@Override
public synchronized void onApplicationEvent(final RemoteApplicationEvent event) {
public void onApplicationEvent(final RemoteApplicationEvent event) {
capturedEvents.add(event.getClass());
}
public synchronized int getCountFor(final Class<?> expectedEvent) {
public int getCountFor(final Class<?> expectedEvent) {
return capturedEvents.count(expectedEvent);
}
public synchronized Set<Class<?>> diff(final Expect[] allEvents) {
public Set<Class<?>> diff(final Expect[] allEvents) {
return Sets.difference(capturedEvents.elementSet(),
java.util.stream.Stream.of(allEvents).map((e) -> e.type()).collect(Collectors.toSet()));
Stream.of(allEvents).map(Expect::type).collect(Collectors.toSet()));
}
}