Introduce new interface method for disconnecting consumers by groupId

Allow plugins to isolate their consumers using groupId, so that
multiple consumers can work independently.

Allow the broker API to disconnect only specific consumers on
topics by groupId, so that unloading one plugin would not impact
the functionality of other plugins consuming messages on the
same topics.

Bug:Issue 327226782
Change-Id: I859cddd012295e576fc4df4a66af65eedfd24d29
diff --git a/src/main/java/com/gerritforge/gerrit/eventbroker/BrokerApi.java b/src/main/java/com/gerritforge/gerrit/eventbroker/BrokerApi.java
index a0d218d..e8c336a 100644
--- a/src/main/java/com/gerritforge/gerrit/eventbroker/BrokerApi.java
+++ b/src/main/java/com/gerritforge/gerrit/eventbroker/BrokerApi.java
@@ -15,6 +15,7 @@
 package com.gerritforge.gerrit.eventbroker;
 
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.gerrit.common.Nullable;
 import com.google.gerrit.server.events.Event;
 import java.util.Set;
 import java.util.function.Consumer;
@@ -50,6 +51,14 @@
   void disconnect();
 
   /**
+   * Disconnect from broker and cancel all active consumers on the specified topic.
+   *
+   * @param topic topic name of the consumers to cancel
+   * @param groupId when not null, filter the consumers to cancel by groupId
+   */
+  void disconnect(String topic, @Nullable String groupId);
+
+  /**
    * Redeliver all stored messages for specified topic
    *
    * @param topic topic name
diff --git a/src/main/java/com/gerritforge/gerrit/eventbroker/InProcessBrokerApi.java b/src/main/java/com/gerritforge/gerrit/eventbroker/InProcessBrokerApi.java
index 4128375..6f0692a 100644
--- a/src/main/java/com/gerritforge/gerrit/eventbroker/InProcessBrokerApi.java
+++ b/src/main/java/com/gerritforge/gerrit/eventbroker/InProcessBrokerApi.java
@@ -20,10 +20,12 @@
 import com.google.common.collect.ImmutableSet;
 import com.google.common.flogger.FluentLogger;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.gerrit.common.Nullable;
 import com.google.gerrit.server.events.Event;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.function.Consumer;
+import java.util.stream.Collectors;
 
 public class InProcessBrokerApi implements BrokerApi {
   private static final FluentLogger log = FluentLogger.forEnclosingClass();
@@ -68,6 +70,24 @@
   }
 
   @Override
+  public void disconnect(String topic, @Nullable String groupId) {
+    Set<TopicSubscriber> topicsToRemove =
+        topicSubscribers.stream()
+            .filter(ts -> topic.equals(ts.topic()))
+            .collect(Collectors.toSet());
+    topicSubscribers.removeAll(topicsToRemove);
+
+    Set<TopicSubscriberWithGroupId> topicsWithGroupIdToRemove =
+        topicSubscribersWithGroupId.stream()
+            .filter(
+                tsg ->
+                    topic.equals(tsg.topicSubscriber().topic())
+                        && (groupId == null || groupId.equals(tsg.groupId())))
+            .collect(Collectors.toSet());
+    topicSubscribersWithGroupId.removeAll(topicsWithGroupIdToRemove);
+  }
+
+  @Override
   public void replayAllEvents(String topic) {
     unsupported();
   }