blob: e7c88695856ca0258f17fc3552df9774579aa6e6 [file] [log] [blame]
// Copyright (C) 2021 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.googlesource.gerrit.plugins.eventseiffel;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNull;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.collect.Streams;
import com.googlesource.gerrit.plugins.eventseiffel.eiffel.EventKey;
import com.googlesource.gerrit.plugins.eventseiffel.eiffel.dto.EiffelEvent;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.junit.Ignore;
@Ignore
public class TestEventPublisher implements Runnable, EiffelEventHub.Consumer {
/* Milliseconds after which the publisher-thread is expected to have handled the event. */
public static int MILLIS_TO_WAIT = 1000;
Map<EventKey, EiffelEvent> publishedEvents = Maps.newConcurrentMap();
List<EventKey> actualOrder = Lists.newArrayList();
boolean keepTrying = true;
boolean pause = false;
EiffelEventQueue queue;
Object takeTrigger = new Object();
Thread publisherThread;
@Override
public void run() {
while (keepTrying) {
try {
while (pause) {
synchronized (takeTrigger) {
takeTrigger.wait();
}
}
EiffelEvent taken = queue.take(1).iterator().next();
EventKey takenKey = EventKey.fromEvent(taken);
TestEventStorage.INSTANCE.addId(takenKey);
publishedEvents.put(takenKey, taken);
actualOrder.add(takenKey);
queue.ack(Sets.newHashSet(taken));
} catch (InterruptedException e) {
// Do nothing.
}
}
}
public void assertOrder(List<? extends EiffelEvent> events) {
List<EventKey> keys = events.stream().map(EventKey::fromEvent).collect(Collectors.toList());
waitForIt();
EventKey next = keys.remove(0);
for (EventKey key : ImmutableList.copyOf(actualOrder)) {
if (key.equals(next)) {
if (keys.isEmpty()) {
return;
}
next = keys.remove(0);
} else {
for (EventKey notNext : keys) {
assertNotEquals(String.format("Event %s came before %s", notNext, next), key, notNext);
}
}
}
assertEquals(
String.format(
"Unpublished events when verifying order: %s",
String.join(
", ",
Streams.concat(Stream.of(next.toString()), keys.stream().map(ek -> ek.toString()))
.collect(Collectors.toList()))),
0,
keys.size());
}
public void assertNotPublished(EventKey key, String message) {
waitForIt();
EiffelEvent event = publishedEvents.getOrDefault(key, null);
assertNull(message, event);
}
public EiffelEvent getPublished(EventKey key) {
int wait = 50;
for (int totalWait = 0; totalWait < MILLIS_TO_WAIT; totalWait += wait) {
if (publishedEvents.containsKey(key)) {
return publishedEvents.get(key);
}
waitFor(wait);
}
return publishedEvents.getOrDefault(key, null);
}
public void startPublishing() {
pause = false;
synchronized (takeTrigger) {
takeTrigger.notifyAll();
}
waitForIt();
}
public void stopPublishing() {
publisherThread.interrupt();
pause = true;
}
private void waitForIt() {
waitFor(MILLIS_TO_WAIT);
}
private void waitFor(int millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void start(EiffelEventQueue queue) {
this.queue = queue;
publisherThread = new Thread(this);
publisherThread.start();
}
@Override
public void stop() {
keepTrying = false;
pause = false;
publisherThread.interrupt();
}
@Override
public boolean isRunning() {
return publisherThread != null && publisherThread.isAlive();
}
}