Fire internal listeners before external stream ones

By separating the internal listeners from the external stream event
listeners, this fixes two problems with the previous design. First, it
causes events from other primaries to only be sent to the external
stream event listeners instead of the internal listeners. This ensures
that any internal event processing (such as posting a comment on a
cherry-picked change) only happens once per cluster instead of once per
primary as it would previously. Second, since all new events resulting
from internal dispatching will have been dispatched before sending
events to stream listeners, no new events will be dispatched while
sending to stream listeners, and this avoids a sending loop which could
previously happen in these situations.

Also include some minor cleanups such a removing an unneeded volatile,
and ordering variable definitions.

Change-Id: Id95cbfef48d8b5c9bc3179dabb37949aed514227
diff --git a/src/main/java/com/googlesource/gerrit/plugins/events/FileSystemEventBroker.java b/src/main/java/com/googlesource/gerrit/plugins/events/FileSystemEventBroker.java
index f3f5855..cdba933 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/events/FileSystemEventBroker.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/events/FileSystemEventBroker.java
@@ -14,11 +14,13 @@
 
 package com.googlesource.gerrit.plugins.events;
 
+import com.google.gerrit.common.Nullable;
 import com.google.gerrit.entities.BranchNameKey;
 import com.google.gerrit.entities.Change;
 import com.google.gerrit.entities.Project;
 import com.google.gerrit.extensions.annotations.PluginName;
 import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.server.CurrentUser;
 import com.google.gerrit.server.config.GerritInstanceId;
 import com.google.gerrit.server.config.GerritServerConfigProvider;
 import com.google.gerrit.server.config.PluginConfig;
@@ -34,27 +36,32 @@
 import com.google.gerrit.server.permissions.PermissionBackend;
 import com.google.gerrit.server.permissions.PermissionBackendException;
 import com.google.gerrit.server.plugincontext.PluginSetContext;
+import com.google.gerrit.server.plugincontext.PluginSetEntryContext;
 import com.google.gerrit.server.project.ProjectCache;
 import com.google.gson.Gson;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
-import javax.annotation.Nullable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @Singleton
 public class FileSystemEventBroker extends EventBroker {
   private static final Logger log = LoggerFactory.getLogger(FileSystemEventBroker.class);
-  protected final EventStore store;
-  protected final Gson gson;
-  protected final DynamicSet<StreamEventListener> streamEventListeners;
-  protected volatile long lastSent;
+
   protected static final String KEY_FILTER = "filter";
   protected static final String FILTER_TYPE_DROP = "DROP";
   protected static final String FILTER_ELEMENT_CLASSNAME = "classname";
+
+  protected final EventStore store;
+  protected final Gson gson;
+  protected final DynamicSet<StreamEventListener> streamEventListeners;
+
+  protected long lastSent;
   protected Set<String> dropEventNames = new HashSet<>();
 
   @Inject
@@ -88,30 +95,34 @@
   @Override
   public void postEvent(Change change, ChangeEvent event) throws PermissionBackendException {
     storeEvent(event);
-    sendAllPendingEvents();
+    super.postEvent(change, event);
+    fireEventForStreamListeners();
   }
 
   @Override
   public void postEvent(Project.NameKey projectName, ProjectEvent event) {
     storeEvent(event);
+    super.postEvent(projectName, event);
     try {
-      sendAllPendingEvents();
+      fireEventForStreamListeners();
     } catch (PermissionBackendException e) {
       log.error("Permission Exception while dispatching the event. Will be tried again.", e);
     }
   }
 
   @Override
-  protected void fireEvent(BranchNameKey branchName, RefEvent event)
+  public void postEvent(BranchNameKey branchName, RefEvent event)
       throws PermissionBackendException {
     storeEvent(event);
-    sendAllPendingEvents();
+    super.postEvent(branchName, event);
+    fireEventForStreamListeners();
   }
 
   @Override
   public void postEvent(Event event) throws PermissionBackendException {
     storeEvent(event);
-    sendAllPendingEvents();
+    super.postEvent(event);
+    fireEventForStreamListeners();
   }
 
   protected void storeEvent(Event event) {
@@ -125,12 +136,13 @@
     }
   }
 
-  public synchronized void sendAllPendingEvents() throws PermissionBackendException {
+  public synchronized void fireEventForStreamListeners() throws PermissionBackendException {
     try {
       long current = store.getHead();
       while (lastSent < current) {
         long next = lastSent + 1;
-        fireEvent(gson.fromJson(store.get(next), Event.class));
+        fireEventForUserScopedEventListener(
+            Type.STREAM, gson.fromJson(store.get(next), Event.class));
         lastSent = next;
       }
     } catch (IOException e) {
@@ -141,6 +153,79 @@
     }
   }
 
+  @Override
+  protected void fireEvent(Change change, ChangeEvent event) throws PermissionBackendException {
+    setInstanceIdWhenEmpty(event);
+    for (PluginSetEntryContext<UserScopedEventListener> c : getListeners(Type.NON_STREAM)) {
+      CurrentUser user = c.call(UserScopedEventListener::getUser);
+      if (isVisibleTo(change, user)) {
+        c.run(l -> l.onEvent(event));
+      }
+    }
+    fireEventForUnrestrictedListeners(event);
+  }
+
+  @Override
+  protected void fireEvent(Project.NameKey project, ProjectEvent event) {
+    setInstanceIdWhenEmpty(event);
+    for (PluginSetEntryContext<UserScopedEventListener> c : getListeners(Type.NON_STREAM)) {
+      CurrentUser user = c.call(UserScopedEventListener::getUser);
+      if (isVisibleTo(project, user)) {
+        c.run(l -> l.onEvent(event));
+      }
+    }
+    fireEventForUnrestrictedListeners(event);
+  }
+
+  @Override
+  protected void fireEvent(BranchNameKey branchName, RefEvent event)
+      throws PermissionBackendException {
+    setInstanceIdWhenEmpty(event);
+    for (PluginSetEntryContext<UserScopedEventListener> c : getListeners(Type.NON_STREAM)) {
+      CurrentUser user = c.call(UserScopedEventListener::getUser);
+      if (isVisibleTo(branchName, user)) {
+        c.run(l -> l.onEvent(event));
+      }
+    }
+    fireEventForUnrestrictedListeners(event);
+  }
+
+  @Override
+  protected void fireEvent(Event event) throws PermissionBackendException {
+    setInstanceIdWhenEmpty(event);
+    fireEventForUserScopedEventListener(Type.NON_STREAM, event);
+    fireEventForUnrestrictedListeners(event);
+  }
+
+  protected void fireEventForUserScopedEventListener(Type type, Event event)
+      throws PermissionBackendException {
+    for (PluginSetEntryContext<UserScopedEventListener> c : getListeners(type)) {
+      CurrentUser user = c.call(UserScopedEventListener::getUser);
+      if (isVisibleTo(event, user)) {
+        c.run(l -> l.onEvent(event));
+      }
+    }
+  }
+
+  public enum Type {
+    STREAM,
+    NON_STREAM
+  }
+
+  protected List<PluginSetEntryContext<UserScopedEventListener>> getListeners(Type type) {
+    List<PluginSetEntryContext<UserScopedEventListener>> filteredListeners = new ArrayList<>();
+    for (PluginSetEntryContext<UserScopedEventListener> c : listeners) {
+      if ((type == Type.STREAM) == isStreamListener(c.get())) {
+        filteredListeners.add(c);
+      }
+    }
+    return filteredListeners;
+  }
+
+  protected boolean isStreamListener(UserScopedEventListener l) {
+    return l.getClass().getName().startsWith("com.google.gerrit.sshd.commands.StreamEvents");
+  }
+
   protected void readAndParseCfg(String pluginName, GerritServerConfigProvider configProvider) {
     PluginConfig cfg = PluginConfig.createFromGerritConfig(pluginName, configProvider.loadConfig());
     for (String filter : cfg.getStringList(KEY_FILTER)) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/events/fsstore/FsListener.java b/src/main/java/com/googlesource/gerrit/plugins/events/fsstore/FsListener.java
index 7327700..3ad34c8 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/events/fsstore/FsListener.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/events/fsstore/FsListener.java
@@ -80,7 +80,7 @@
   @Override
   public void run() {
     try {
-      broker.sendAllPendingEvents();
+      broker.fireEventForStreamListeners();
     } catch (PermissionBackendException e) {
       // Ignore
     }