Implement periodic fetch for source

When `remote.NAME.fetchEvery` is configured to value greater than `0`
for a source then all its projects are fetched using a dedicated (called
`PeriodicallyFetchFromSources`) pool (with a single thread).

Notes:
* dedicated pool is created only when at least single resource is
  configured to fetch periodically from the remote
* the check is performed on pull-replication plugin start therefore
  plugin should be restarted in case when configuration is modified
* all refs (according to `remote.NAME.fetch` spec) are fetched for each
  repository

Bug: Issue 322146240
Change-Id: I4ff6ca67ec4005710c28f6b9cee08d584da03936
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/OnStartStop.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/OnStartStop.java
index 6457f8a..0a75c86 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/OnStartStop.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/OnStartStop.java
@@ -14,6 +14,7 @@
 
 package com.googlesource.gerrit.plugins.replication.pull;
 
+import com.google.common.base.Suppliers;
 import com.google.common.util.concurrent.Atomics;
 import com.google.gerrit.extensions.events.LifecycleListener;
 import com.google.gerrit.extensions.registration.DynamicItem;
@@ -22,11 +23,13 @@
 import com.google.gerrit.server.events.EventDispatcher;
 import com.google.gerrit.server.git.WorkQueue;
 import com.google.inject.Inject;
+import com.google.inject.Provider;
 import com.googlesource.gerrit.plugins.replication.ReplicationConfig;
 import com.googlesource.gerrit.plugins.replication.ReplicationFilter;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
 
 public class OnStartStop implements LifecycleListener {
   private final AtomicReference<Future<?>> fetchAllFuture;
@@ -37,6 +40,7 @@
   private final ReplicationState.Factory replicationStateFactory;
   private final SourcesCollection sourcesCollection;
   private final WorkQueue workQueue;
+  private final Supplier<SourcesFetchPeriodically> fetchAllPeriodically;
   private boolean isReplica;
 
   @Inject
@@ -48,6 +52,7 @@
       ReplicationState.Factory replicationStateFactory,
       SourcesCollection sourcesCollection,
       WorkQueue workQueue,
+      Provider<SourcesFetchPeriodically> fetchAllPeriodically,
       @GerritIsReplica Boolean isReplica) {
     this.srvInfo = srvInfo;
     this.fetchAll = fetchAll;
@@ -58,10 +63,14 @@
     this.sourcesCollection = sourcesCollection;
     this.workQueue = workQueue;
     this.isReplica = isReplica;
+    this.fetchAllPeriodically = Suppliers.memoize(() -> fetchAllPeriodically.get());
   }
 
   @Override
   public void start() {
+    sourcesCollection.startup(workQueue);
+    fetchAllPeriodically.get().start();
+
     if (isReplica
         && srvInfo.getState() == ServerInformation.State.STARTUP
         && config.isReplicateAllOnPluginStart()) {
@@ -73,12 +82,12 @@
               .create(null, ReplicationFilter.all(), state, false)
               .schedule(30, TimeUnit.SECONDS));
     }
-
-    sourcesCollection.startup(workQueue);
   }
 
   @Override
   public void stop() {
+    fetchAllPeriodically.get().stop();
+
     Future<?> f = fetchAllFuture.getAndSet(null);
     if (f != null) {
       f.cancel(true);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
index ddc6258..a4af8ca 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
@@ -137,6 +137,8 @@
       install(new EventsBrokerConsumerModule(eventBrokerTopic, replicationConfig));
     }
 
+    install(new FactoryModuleBuilder().build(SourceFetchPeriodically.Factory.class));
+
     DynamicSet.setOf(binder(), ReplicationStateListener.class);
     DynamicSet.bind(binder(), ReplicationStateListener.class).to(PullReplicationStateLogger.class);
     EventTypes.register(FetchRefReplicatedEvent.TYPE, FetchRefReplicatedEvent.class);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
index 9283033..922ce32 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
@@ -448,6 +448,15 @@
     return schedule(project, ref, uri, state, apiRequestMetrics, false);
   }
 
+  public Future<?> scheduleNow(
+      Project.NameKey project,
+      String ref,
+      ReplicationState state,
+      Optional<PullReplicationApiRequestMetrics> apiRequestMetrics) {
+    URIish uri = getURI(project);
+    return schedule(project, ref, uri, state, apiRequestMetrics, true);
+  }
+
   public Future<?> schedule(
       Project.NameKey project,
       String ref,
@@ -917,6 +926,10 @@
     return config.enableBatchedRefs();
   }
 
+  public long fetchEvery() {
+    return config.fetchEvery();
+  }
+
   void scheduleUpdateHead(String apiUrl, Project.NameKey project, String newHead) {
     try {
       URIish apiURI = new URIish(apiUrl);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourceFetchPeriodically.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourceFetchPeriodically.java
new file mode 100644
index 0000000..2c2f6d8
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourceFetchPeriodically.java
@@ -0,0 +1,80 @@
+// Copyright (C) 2024 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.extensions.registration.DynamicItem;
+import com.google.gerrit.server.events.EventDispatcher;
+import com.google.gerrit.server.project.ProjectCache;
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import com.google.inject.assistedinject.Assisted;
+import com.googlesource.gerrit.plugins.replication.pull.api.PullReplicationApiRequestMetrics;
+import java.util.Optional;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+class SourceFetchPeriodically {
+  interface Factory {
+    SourceFetchPeriodically create(Source source);
+  }
+
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+  private final ProjectCache projects;
+  private final ReplicationState.Factory fetchReplicationFactory;
+  private final Provider<PullReplicationApiRequestMetrics> metricsProvider;
+  private final DynamicItem<EventDispatcher> eventDispatcher;
+  private final Source source;
+
+  @Inject
+  SourceFetchPeriodically(
+      ProjectCache projects,
+      ReplicationState.Factory fetchReplicationFactory,
+      Provider<PullReplicationApiRequestMetrics> metricsProvider,
+      DynamicItem<EventDispatcher> eventDispatcher,
+      @Assisted Source source) {
+    this.projects = projects;
+    this.fetchReplicationFactory = fetchReplicationFactory;
+    this.metricsProvider = metricsProvider;
+    this.eventDispatcher = eventDispatcher;
+    this.source = source;
+  }
+
+  ScheduledFuture<?> start(ScheduledExecutorService pool) {
+    return pool.scheduleAtFixedRate(
+        this::scheduleFetchAll, 0L, source.fetchEvery(), TimeUnit.SECONDS);
+  }
+
+  private void scheduleFetchAll() {
+    Optional<PullReplicationApiRequestMetrics> metrics = Optional.of(metricsProvider.get());
+    long repositoriesToBeFetched =
+        projects.all().stream()
+            .filter(source::wouldFetchProject)
+            .map(
+                projectToFetch ->
+                    source.scheduleNow(
+                        projectToFetch,
+                        FetchOne.ALL_REFS,
+                        fetchReplicationFactory.create(
+                            new FetchResultProcessing.GitUpdateProcessing(eventDispatcher.get())),
+                        metrics))
+            .count();
+    logger.atInfo().log(
+        "The %d repositories were scheduled for %s remote to fetch %s",
+        repositoriesToBeFetched, source.getRemoteConfigName(), FetchOne.ALL_REFS);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourcesFetchPeriodically.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourcesFetchPeriodically.java
new file mode 100644
index 0000000..b405a6e
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourcesFetchPeriodically.java
@@ -0,0 +1,77 @@
+// Copyright (C) 2024 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import static com.googlesource.gerrit.plugins.replication.pull.SourceConfiguration.DEFAULT_PERIODIC_FETCH_DISABLED;
+import static java.util.stream.Collectors.toList;
+
+import com.google.common.base.Suppliers;
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.server.git.WorkQueue;
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import com.google.inject.Singleton;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.function.Supplier;
+
+@Singleton
+class SourcesFetchPeriodically {
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+  private final WorkQueue workQueue;
+  private final Provider<SourcesCollection> sources;
+  private final Provider<SourceFetchPeriodically.Factory> fetchAllCreator;
+  private final List<ScheduledFuture<?>> scheduled;
+
+  @Inject
+  SourcesFetchPeriodically(
+      WorkQueue workQueue,
+      Provider<SourcesCollection> sources,
+      Provider<SourceFetchPeriodically.Factory> fetchAllCreator) {
+    this.workQueue = workQueue;
+    this.sources = sources;
+    this.fetchAllCreator = fetchAllCreator;
+    this.scheduled = new ArrayList<>();
+  }
+
+  void start() {
+    scheduled.addAll(scheduleFetchAll(workQueue, sources.get(), fetchAllCreator.get()));
+  }
+
+  private List<ScheduledFuture<?>> scheduleFetchAll(
+      WorkQueue workQueue,
+      SourcesCollection sources,
+      SourceFetchPeriodically.Factory fetchAllCreator) {
+    Supplier<ScheduledExecutorService> queue =
+        Suppliers.memoize(() -> workQueue.createQueue(1, "PeriodicallyFetchFromSources"));
+    return sources.getAll().stream()
+        .filter(source -> source.fetchEvery() > DEFAULT_PERIODIC_FETCH_DISABLED)
+        .map(
+            source -> {
+              logger.atInfo().log(
+                  "Enabling periodic (every %ds) fetch of all refs for [%s] remote",
+                  source.fetchEvery(), source.getRemoteConfigName());
+              return fetchAllCreator.create(source).start(queue.get());
+            })
+        .collect(toList());
+  }
+
+  void stop() {
+    scheduled.forEach(schedule -> schedule.cancel(true));
+  }
+}
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index 5e791d4..1635e5c 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -631,6 +631,13 @@
 >	together with `remote.NAME.apiUrl`; such configuration is considered
 >	invalid and prevents the plugin from starting
 
+>	*NOTE*: Periodic fetches are scheduled using a dedicated (single
+>	threaded) pool, called `PeriodicallyFetchFromSources`. It is created only
+>	when there is at least one remote configured to fetch periodically.
+
+>	*NOTE*: Scheduling is performed on plugin start therefore one needs to
+>	reload plugin when configuration gets changed.
+
 Directory `replication`
 --------------------
 The optional directory `$site_path/etc/replication` contains Git-style
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/SourcesFetchPeriodicallyIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/SourcesFetchPeriodicallyIT.java
new file mode 100644
index 0000000..5ac1deb
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/SourcesFetchPeriodicallyIT.java
@@ -0,0 +1,91 @@
+// Copyright (C) 2024 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import static com.google.common.truth.Truth.assertThat;
+
+import com.google.gerrit.acceptance.PushOneCommit.Result;
+import com.google.gerrit.acceptance.SkipProjectClone;
+import com.google.gerrit.acceptance.TestPlugin;
+import com.google.gerrit.acceptance.UseLocalDisk;
+import com.google.gerrit.acceptance.config.GerritConfig;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.List;
+import java.util.Optional;
+import org.eclipse.jgit.lib.Ref;
+import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.revwalk.RevCommit;
+import org.junit.Test;
+
+@SkipProjectClone
+@UseLocalDisk
+@TestPlugin(
+    name = "pull-replication",
+    sysModule = "com.googlesource.gerrit.plugins.replication.pull.TestPullReplicationModule",
+    httpModule = "com.googlesource.gerrit.plugins.replication.pull.api.HttpModule")
+public class SourcesFetchPeriodicallyIT extends PullReplicationSetupBase {
+  private static final String TEST_FETCH_FREQUENCY = "1s";
+
+  @Override
+  protected boolean useBatchRefUpdateEvent() {
+    return false;
+  }
+
+  @Override
+  protected void setReplicationSource(
+      String remoteName, List<String> replicaSuffixes, Optional<String> project)
+      throws IOException {
+    List<String> fetchUrls =
+        buildReplicaURLs(replicaSuffixes, s -> gitPath.resolve("${name}" + s + ".git").toString());
+    config.setStringList("remote", remoteName, "url", fetchUrls);
+    project.ifPresent(prj -> config.setString("remote", remoteName, "projects", prj));
+    config.setString("remote", remoteName, "fetchEvery", TEST_FETCH_FREQUENCY);
+    config.save();
+  }
+
+  @Override
+  public void setUpTestPlugin() throws Exception {
+    setUpTestPlugin(false);
+  }
+
+  @Test
+  @GerritConfig(name = "gerrit.instanceId", value = TEST_REPLICATION_REMOTE)
+  public void shouldFetchChangesPeriodically() throws Exception {
+    testRepo = cloneProject(createTestProject(project + TEST_REPLICATION_SUFFIX));
+
+    Result pushChangeResult = createChange();
+    RevCommit changeCommit = pushChangeResult.getCommit();
+    String sourceChangeRef = pushChangeResult.getPatchSet().refName();
+
+    try (Repository repo = repoManager.openRepository(project)) {
+      waitUntil(() -> checkedGetRef(repo, sourceChangeRef) != null);
+
+      Ref targetChangeRef = getRef(repo, sourceChangeRef);
+      assertThat(targetChangeRef).isNotNull();
+      assertThat(targetChangeRef.getObjectId()).isEqualTo(changeCommit.getId());
+
+      // ensure that previous fetch was finished
+      Thread.sleep(Duration.ofSeconds(TEST_REPLICATION_DELAY).toMillis());
+
+      Ref sourceNewRef = createNewRef();
+
+      waitUntil(() -> checkedGetRef(repo, sourceNewRef.getName()) != null);
+      Ref targetNewRef = getRef(repo, sourceNewRef.getName());
+      assertThat(targetNewRef).isNotNull();
+      assertThat(targetNewRef.getObjectId()).isEqualTo(sourceNewRef.getObjectId());
+    }
+  }
+}