Ensure that replicateOnStartup doesn't compete with fetch

When `gerrit.replicateOnStartup` is set to `true` then corresponding
projects will be fetched. Ensure that projects that are subject of
periodic fetch are excluded from the `replicateOnStartup` schedule as
they are going to be fetched anyway.

Bug: Issue 322146240
Change-Id: I29c6b12fc12ba0860f43a7fb2d116faa2482a1fa
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/OnStartStop.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/OnStartStop.java
index 0a75c86..6ac09f6 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/OnStartStop.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/OnStartStop.java
@@ -25,7 +25,6 @@
 import com.google.inject.Inject;
 import com.google.inject.Provider;
 import com.googlesource.gerrit.plugins.replication.ReplicationConfig;
-import com.googlesource.gerrit.plugins.replication.ReplicationFilter;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
@@ -79,7 +78,11 @@
               new FetchResultProcessing.GitUpdateProcessing(eventDispatcher.get()));
       fetchAllFuture.set(
           fetchAll
-              .create(null, ReplicationFilter.all(), state, false)
+              .create(
+                  null,
+                  fetchAllPeriodically.get().skipFromReplicateAllOnPluginStart(),
+                  state,
+                  false)
               .schedule(30, TimeUnit.SECONDS));
     }
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourceFetchPeriodically.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourceFetchPeriodically.java
index 2c2f6d8..264c880 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourceFetchPeriodically.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourceFetchPeriodically.java
@@ -15,6 +15,7 @@
 package com.googlesource.gerrit.plugins.replication.pull;
 
 import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.entities.Project;
 import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.server.events.EventDispatcher;
 import com.google.gerrit.server.project.ProjectCache;
@@ -26,6 +27,7 @@
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
 
 class SourceFetchPeriodically {
   interface Factory {
@@ -59,11 +61,14 @@
         this::scheduleFetchAll, 0L, source.fetchEvery(), TimeUnit.SECONDS);
   }
 
+  Stream<Project.NameKey> projectsToFetch() {
+    return projects.all().stream().filter(source::wouldFetchProject);
+  }
+
   private void scheduleFetchAll() {
     Optional<PullReplicationApiRequestMetrics> metrics = Optional.of(metricsProvider.get());
     long repositoriesToBeFetched =
-        projects.all().stream()
-            .filter(source::wouldFetchProject)
+        projectsToFetch()
             .map(
                 projectToFetch ->
                     source.scheduleNow(
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourcesFetchPeriodically.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourcesFetchPeriodically.java
index b405a6e..de1d571 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourcesFetchPeriodically.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourcesFetchPeriodically.java
@@ -16,15 +16,20 @@
 
 import static com.googlesource.gerrit.plugins.replication.pull.SourceConfiguration.DEFAULT_PERIODIC_FETCH_DISABLED;
 import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toSet;
 
 import com.google.common.base.Suppliers;
 import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.entities.Project;
 import com.google.gerrit.server.git.WorkQueue;
 import com.google.inject.Inject;
 import com.google.inject.Provider;
 import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.replication.ReplicationFilter;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.function.Supplier;
@@ -36,7 +41,7 @@
   private final WorkQueue workQueue;
   private final Provider<SourcesCollection> sources;
   private final Provider<SourceFetchPeriodically.Factory> fetchAllCreator;
-  private final List<ScheduledFuture<?>> scheduled;
+  private final List<ScheduledSource> scheduled;
 
   @Inject
   SourcesFetchPeriodically(
@@ -53,7 +58,19 @@
     scheduled.addAll(scheduleFetchAll(workQueue, sources.get(), fetchAllCreator.get()));
   }
 
-  private List<ScheduledFuture<?>> scheduleFetchAll(
+  ReplicationFilter skipFromReplicateAllOnPluginStart() {
+    Set<Project.NameKey> projectsToSkip =
+        scheduled.stream()
+            .flatMap(scheduledSource -> scheduledSource.sourceFetch.projectsToFetch())
+            .collect(toSet());
+    if (projectsToSkip.isEmpty()) {
+      return ReplicationFilter.all();
+    }
+
+    return new SkipProjectsFilter(projectsToSkip);
+  }
+
+  private List<ScheduledSource> scheduleFetchAll(
       WorkQueue workQueue,
       SourcesCollection sources,
       SourceFetchPeriodically.Factory fetchAllCreator) {
@@ -66,12 +83,41 @@
               logger.atInfo().log(
                   "Enabling periodic (every %ds) fetch of all refs for [%s] remote",
                   source.fetchEvery(), source.getRemoteConfigName());
-              return fetchAllCreator.create(source).start(queue.get());
+              SourceFetchPeriodically sourceFetchAll = fetchAllCreator.create(source);
+              return new ScheduledSource(sourceFetchAll, sourceFetchAll.start(queue.get()));
             })
         .collect(toList());
   }
 
   void stop() {
-    scheduled.forEach(schedule -> schedule.cancel(true));
+    scheduled.forEach(source -> source.schedule.cancel(true));
+  }
+
+  private static class ScheduledSource {
+    private final SourceFetchPeriodically sourceFetch;
+    private final ScheduledFuture<?> schedule;
+
+    private ScheduledSource(SourceFetchPeriodically sourceFetch, ScheduledFuture<?> schedule) {
+      this.sourceFetch = sourceFetch;
+      this.schedule = schedule;
+    }
+  }
+
+  /**
+   * The ReplicationFilter implementation that matches all projects that are not part of the
+   * internal set.
+   */
+  private static class SkipProjectsFilter extends ReplicationFilter {
+    private final Set<Project.NameKey> projectsToSkip;
+
+    private SkipProjectsFilter(Set<Project.NameKey> projectsToSkip) {
+      super(Collections.emptyList());
+      this.projectsToSkip = projectsToSkip;
+    }
+
+    @Override
+    public boolean matches(Project.NameKey name) {
+      return !projectsToSkip.contains(name);
+    }
   }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/SourcesFetchPeriodicallyTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/SourcesFetchPeriodicallyTest.java
new file mode 100644
index 0000000..47b891d
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/SourcesFetchPeriodicallyTest.java
@@ -0,0 +1,86 @@
+// Copyright (C) 2024 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.entities.Project.NameKey;
+import com.google.gerrit.server.git.WorkQueue;
+import com.google.inject.util.Providers;
+import com.googlesource.gerrit.plugins.replication.ReplicationFilter;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Stream;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class SourcesFetchPeriodicallyTest {
+  private static final NameKey RANDOM_PROJECT_NAME = Project.nameKey("random_project_name");
+
+  @Mock WorkQueue workQueueMock;
+  @Mock SourcesCollection sourcesMock;
+  @Mock SourceFetchPeriodically.Factory factoryMock;
+
+  private SourcesFetchPeriodically objectUnderTest;
+
+  @Before
+  public void setup() {
+    objectUnderTest =
+        new SourcesFetchPeriodically(
+            workQueueMock, Providers.of(sourcesMock), Providers.of(factoryMock));
+  }
+
+  @Test
+  public void shouldMatchAnyProjectWhenNoProjectsToFetchPeriodicallyAreConfigured() {
+    // given
+    when(sourcesMock.getAll()).thenReturn(Collections.emptyList());
+
+    // when
+    objectUnderTest.start();
+    ReplicationFilter filter = objectUnderTest.skipFromReplicateAllOnPluginStart();
+
+    // then
+    assertThat(filter.matches(RANDOM_PROJECT_NAME)).isTrue();
+  }
+
+  @Test
+  public void shouldNotMatchProjectConfiguredToFetchPeriodically() {
+    // given
+    Project.NameKey projectToFetch = Project.nameKey("to_be_fetched");
+    SourceFetchPeriodically fetchProjectForSource = mock(SourceFetchPeriodically.class);
+    when(fetchProjectForSource.projectsToFetch()).thenReturn(Stream.of(projectToFetch));
+
+    Source sourceWithPeriodicFetch = mock(Source.class);
+    when(sourceWithPeriodicFetch.fetchEvery()).thenReturn(1L);
+    when(sourcesMock.getAll()).thenReturn(List.of(sourceWithPeriodicFetch));
+    when(factoryMock.create(any())).thenReturn(fetchProjectForSource);
+
+    // when
+    objectUnderTest.start();
+    ReplicationFilter filter = objectUnderTest.skipFromReplicateAllOnPluginStart();
+
+    // then
+    assertThat(filter.matches(projectToFetch)).isFalse();
+    assertThat(filter.matches(RANDOM_PROJECT_NAME)).isTrue();
+  }
+}