Remove remaining event posting from synchronized sections

Posting events sometimes can take significant time, mainly due to the
event visibility checks. When this is done from a synchronized block
then all other threads trying to schedule replication to the same
(locked) Destination get blocked. Even worse, the synchronization
happens on the Destination level, so even operations on unrelated
repositories block each other.

Change-Id: I7498c7b36ee4b0b2d915a414a0fb1054160e8a1c
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
index 5316e35..6054a4a 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
@@ -73,7 +73,6 @@
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -395,13 +394,13 @@
 
   void schedule(
       Project.NameKey project, Set<String> refs, URIish uri, ReplicationState state, boolean now) {
-    Set<String> refsToSchedule = new HashSet<>();
+    ImmutableSet.Builder<String> toSchedule = ImmutableSet.builder();
     for (String ref : refs) {
       if (!shouldReplicate(project, ref, state)) {
         repLog.atFine().log("Not scheduling replication %s:%s => %s", project, ref, uri);
         continue;
       }
-      refsToSchedule.add(ref);
+      toSchedule.add(ref);
     }
     repLog.atInfo().log("scheduling replication %s:%s => %s", project, refs, uri);
 
@@ -430,11 +429,13 @@
       }
     }
 
+    ImmutableSet<String> refsToSchedule = toSchedule.build();
+    PushOne task;
     synchronized (stateLock) {
-      PushOne task = getPendingPush(uri);
+      task = getPendingPush(uri);
       if (task == null) {
         task = opFactory.create(project, uri);
-        addRefs(task, ImmutableSet.copyOf(refsToSchedule));
+        task.addRefBatch(refsToSchedule);
         task.addState(refsToSchedule, state);
         @SuppressWarnings("unused")
         ScheduledFuture<?> ignored =
@@ -444,7 +445,7 @@
             "scheduled %s:%s => %s to run %s",
             project, refsToSchedule, task, now ? "now" : "after " + config.getDelay() + "s");
       } else {
-        addRefs(task, ImmutableSet.copyOf(refsToSchedule));
+        task.addRefBatch(refsToSchedule);
         task.addState(refsToSchedule, state);
         repLog.atInfo().log(
             "consolidated %s:%s => %s with an existing pending push",
@@ -454,6 +455,7 @@
         state.increasePushTaskCount(project.get(), ref);
       }
     }
+    postReplicationScheduledEvent(task, refsToSchedule);
   }
 
   @Nullable
@@ -489,11 +491,6 @@
         pool.schedule(updateHeadFactory.create(uri, project, newHead), 0, TimeUnit.SECONDS);
   }
 
-  private void addRefs(PushOne e, ImmutableSet<String> refs) {
-    e.addRefBatch(refs);
-    postReplicationScheduledEvent(e, refs);
-  }
-
   /**
    * It schedules again a PushOp instance.
    *
@@ -516,6 +513,10 @@
    * @param pushOp The PushOp instance to be scheduled.
    */
   void reschedule(PushOne pushOp, RetryReason reason) {
+    boolean isRescheduled = false;
+    boolean isFailed = false;
+    RemoteRefUpdate.Status failedStatus = null;
+
     synchronized (stateLock) {
       URIish uri = pushOp.getURI();
       PushOne pendingPushOp = getPendingPush(uri);
@@ -571,13 +572,13 @@
           case TRANSPORT_ERROR:
           case REPOSITORY_MISSING:
           default:
-            RemoteRefUpdate.Status status =
+            failedStatus =
                 RetryReason.REPOSITORY_MISSING.equals(reason)
                     ? NON_EXISTING
                     : REJECTED_OTHER_REASON;
-            postReplicationFailedEvent(pushOp, status);
+            isFailed = true;
             if (pushOp.setToRetry()) {
-              postReplicationScheduledEvent(pushOp);
+              isRescheduled = true;
               replicationTasksStorage.get().reset(pushOp);
               @SuppressWarnings("unused")
               ScheduledFuture<?> ignored2 =
@@ -594,6 +595,12 @@
         }
       }
     }
+    if (isFailed) {
+      postReplicationFailedEvent(pushOp, failedStatus);
+    }
+    if (isRescheduled) {
+      postReplicationScheduledEvent(pushOp);
+    }
   }
 
   RunwayStatus requestRunway(PushOne op) {