Implement periodic fetch for source
When `remote.NAME.fetchEvery` is configured to value greater than `0`
for a source then all its projects are fetched using a dedicated (called
`PeriodicallyFetchFromSources`) pool (with a single thread).
Notes:
* dedicated pool is created only when at least single resource is
configured to fetch periodically from the remote
* the check is performed on pull-replication plugin start therefore
plugin should be restarted in case when configuration is modified
* all refs (according to `remote.NAME.fetch` spec) are fetched for each
repository
Bug: Issue 322146240
Change-Id: I4ff6ca67ec4005710c28f6b9cee08d584da03936
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/OnStartStop.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/OnStartStop.java
index 6457f8a..0a75c86 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/OnStartStop.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/OnStartStop.java
@@ -14,6 +14,7 @@
package com.googlesource.gerrit.plugins.replication.pull;
+import com.google.common.base.Suppliers;
import com.google.common.util.concurrent.Atomics;
import com.google.gerrit.extensions.events.LifecycleListener;
import com.google.gerrit.extensions.registration.DynamicItem;
@@ -22,11 +23,13 @@
import com.google.gerrit.server.events.EventDispatcher;
import com.google.gerrit.server.git.WorkQueue;
import com.google.inject.Inject;
+import com.google.inject.Provider;
import com.googlesource.gerrit.plugins.replication.ReplicationConfig;
import com.googlesource.gerrit.plugins.replication.ReplicationFilter;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
public class OnStartStop implements LifecycleListener {
private final AtomicReference<Future<?>> fetchAllFuture;
@@ -37,6 +40,7 @@
private final ReplicationState.Factory replicationStateFactory;
private final SourcesCollection sourcesCollection;
private final WorkQueue workQueue;
+ private final Supplier<SourcesFetchPeriodically> fetchAllPeriodically;
private boolean isReplica;
@Inject
@@ -48,6 +52,7 @@
ReplicationState.Factory replicationStateFactory,
SourcesCollection sourcesCollection,
WorkQueue workQueue,
+ Provider<SourcesFetchPeriodically> fetchAllPeriodically,
@GerritIsReplica Boolean isReplica) {
this.srvInfo = srvInfo;
this.fetchAll = fetchAll;
@@ -58,10 +63,14 @@
this.sourcesCollection = sourcesCollection;
this.workQueue = workQueue;
this.isReplica = isReplica;
+ this.fetchAllPeriodically = Suppliers.memoize(() -> fetchAllPeriodically.get());
}
@Override
public void start() {
+ sourcesCollection.startup(workQueue);
+ fetchAllPeriodically.get().start();
+
if (isReplica
&& srvInfo.getState() == ServerInformation.State.STARTUP
&& config.isReplicateAllOnPluginStart()) {
@@ -73,12 +82,12 @@
.create(null, ReplicationFilter.all(), state, false)
.schedule(30, TimeUnit.SECONDS));
}
-
- sourcesCollection.startup(workQueue);
}
@Override
public void stop() {
+ fetchAllPeriodically.get().stop();
+
Future<?> f = fetchAllFuture.getAndSet(null);
if (f != null) {
f.cancel(true);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
index ddc6258..a4af8ca 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
@@ -137,6 +137,8 @@
install(new EventsBrokerConsumerModule(eventBrokerTopic, replicationConfig));
}
+ install(new FactoryModuleBuilder().build(SourceFetchPeriodically.Factory.class));
+
DynamicSet.setOf(binder(), ReplicationStateListener.class);
DynamicSet.bind(binder(), ReplicationStateListener.class).to(PullReplicationStateLogger.class);
EventTypes.register(FetchRefReplicatedEvent.TYPE, FetchRefReplicatedEvent.class);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
index 9283033..922ce32 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
@@ -448,6 +448,15 @@
return schedule(project, ref, uri, state, apiRequestMetrics, false);
}
+ public Future<?> scheduleNow(
+ Project.NameKey project,
+ String ref,
+ ReplicationState state,
+ Optional<PullReplicationApiRequestMetrics> apiRequestMetrics) {
+ URIish uri = getURI(project);
+ return schedule(project, ref, uri, state, apiRequestMetrics, true);
+ }
+
public Future<?> schedule(
Project.NameKey project,
String ref,
@@ -917,6 +926,10 @@
return config.enableBatchedRefs();
}
+ public long fetchEvery() {
+ return config.fetchEvery();
+ }
+
void scheduleUpdateHead(String apiUrl, Project.NameKey project, String newHead) {
try {
URIish apiURI = new URIish(apiUrl);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourceFetchPeriodically.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourceFetchPeriodically.java
new file mode 100644
index 0000000..2c2f6d8
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourceFetchPeriodically.java
@@ -0,0 +1,80 @@
+// Copyright (C) 2024 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.extensions.registration.DynamicItem;
+import com.google.gerrit.server.events.EventDispatcher;
+import com.google.gerrit.server.project.ProjectCache;
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import com.google.inject.assistedinject.Assisted;
+import com.googlesource.gerrit.plugins.replication.pull.api.PullReplicationApiRequestMetrics;
+import java.util.Optional;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+class SourceFetchPeriodically {
+ interface Factory {
+ SourceFetchPeriodically create(Source source);
+ }
+
+ private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+ private final ProjectCache projects;
+ private final ReplicationState.Factory fetchReplicationFactory;
+ private final Provider<PullReplicationApiRequestMetrics> metricsProvider;
+ private final DynamicItem<EventDispatcher> eventDispatcher;
+ private final Source source;
+
+ @Inject
+ SourceFetchPeriodically(
+ ProjectCache projects,
+ ReplicationState.Factory fetchReplicationFactory,
+ Provider<PullReplicationApiRequestMetrics> metricsProvider,
+ DynamicItem<EventDispatcher> eventDispatcher,
+ @Assisted Source source) {
+ this.projects = projects;
+ this.fetchReplicationFactory = fetchReplicationFactory;
+ this.metricsProvider = metricsProvider;
+ this.eventDispatcher = eventDispatcher;
+ this.source = source;
+ }
+
+ ScheduledFuture<?> start(ScheduledExecutorService pool) {
+ return pool.scheduleAtFixedRate(
+ this::scheduleFetchAll, 0L, source.fetchEvery(), TimeUnit.SECONDS);
+ }
+
+ private void scheduleFetchAll() {
+ Optional<PullReplicationApiRequestMetrics> metrics = Optional.of(metricsProvider.get());
+ long repositoriesToBeFetched =
+ projects.all().stream()
+ .filter(source::wouldFetchProject)
+ .map(
+ projectToFetch ->
+ source.scheduleNow(
+ projectToFetch,
+ FetchOne.ALL_REFS,
+ fetchReplicationFactory.create(
+ new FetchResultProcessing.GitUpdateProcessing(eventDispatcher.get())),
+ metrics))
+ .count();
+ logger.atInfo().log(
+ "The %d repositories were scheduled for %s remote to fetch %s",
+ repositoriesToBeFetched, source.getRemoteConfigName(), FetchOne.ALL_REFS);
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourcesFetchPeriodically.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourcesFetchPeriodically.java
new file mode 100644
index 0000000..b405a6e
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourcesFetchPeriodically.java
@@ -0,0 +1,77 @@
+// Copyright (C) 2024 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import static com.googlesource.gerrit.plugins.replication.pull.SourceConfiguration.DEFAULT_PERIODIC_FETCH_DISABLED;
+import static java.util.stream.Collectors.toList;
+
+import com.google.common.base.Suppliers;
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.server.git.WorkQueue;
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import com.google.inject.Singleton;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.function.Supplier;
+
+@Singleton
+class SourcesFetchPeriodically {
+ private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+ private final WorkQueue workQueue;
+ private final Provider<SourcesCollection> sources;
+ private final Provider<SourceFetchPeriodically.Factory> fetchAllCreator;
+ private final List<ScheduledFuture<?>> scheduled;
+
+ @Inject
+ SourcesFetchPeriodically(
+ WorkQueue workQueue,
+ Provider<SourcesCollection> sources,
+ Provider<SourceFetchPeriodically.Factory> fetchAllCreator) {
+ this.workQueue = workQueue;
+ this.sources = sources;
+ this.fetchAllCreator = fetchAllCreator;
+ this.scheduled = new ArrayList<>();
+ }
+
+ void start() {
+ scheduled.addAll(scheduleFetchAll(workQueue, sources.get(), fetchAllCreator.get()));
+ }
+
+ private List<ScheduledFuture<?>> scheduleFetchAll(
+ WorkQueue workQueue,
+ SourcesCollection sources,
+ SourceFetchPeriodically.Factory fetchAllCreator) {
+ Supplier<ScheduledExecutorService> queue =
+ Suppliers.memoize(() -> workQueue.createQueue(1, "PeriodicallyFetchFromSources"));
+ return sources.getAll().stream()
+ .filter(source -> source.fetchEvery() > DEFAULT_PERIODIC_FETCH_DISABLED)
+ .map(
+ source -> {
+ logger.atInfo().log(
+ "Enabling periodic (every %ds) fetch of all refs for [%s] remote",
+ source.fetchEvery(), source.getRemoteConfigName());
+ return fetchAllCreator.create(source).start(queue.get());
+ })
+ .collect(toList());
+ }
+
+ void stop() {
+ scheduled.forEach(schedule -> schedule.cancel(true));
+ }
+}
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index 5e791d4..1635e5c 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -631,6 +631,13 @@
> together with `remote.NAME.apiUrl`; such configuration is considered
> invalid and prevents the plugin from starting
+> *NOTE*: Periodic fetches are scheduled using a dedicated (single
+> threaded) pool, called `PeriodicallyFetchFromSources`. It is created only
+> when there is at least one remote configured to fetch periodically.
+
+> *NOTE*: Scheduling is performed on plugin start therefore one needs to
+> reload plugin when configuration gets changed.
+
Directory `replication`
--------------------
The optional directory `$site_path/etc/replication` contains Git-style
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/SourcesFetchPeriodicallyIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/SourcesFetchPeriodicallyIT.java
new file mode 100644
index 0000000..5ac1deb
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/SourcesFetchPeriodicallyIT.java
@@ -0,0 +1,91 @@
+// Copyright (C) 2024 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import static com.google.common.truth.Truth.assertThat;
+
+import com.google.gerrit.acceptance.PushOneCommit.Result;
+import com.google.gerrit.acceptance.SkipProjectClone;
+import com.google.gerrit.acceptance.TestPlugin;
+import com.google.gerrit.acceptance.UseLocalDisk;
+import com.google.gerrit.acceptance.config.GerritConfig;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.List;
+import java.util.Optional;
+import org.eclipse.jgit.lib.Ref;
+import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.revwalk.RevCommit;
+import org.junit.Test;
+
+@SkipProjectClone
+@UseLocalDisk
+@TestPlugin(
+ name = "pull-replication",
+ sysModule = "com.googlesource.gerrit.plugins.replication.pull.TestPullReplicationModule",
+ httpModule = "com.googlesource.gerrit.plugins.replication.pull.api.HttpModule")
+public class SourcesFetchPeriodicallyIT extends PullReplicationSetupBase {
+ private static final String TEST_FETCH_FREQUENCY = "1s";
+
+ @Override
+ protected boolean useBatchRefUpdateEvent() {
+ return false;
+ }
+
+ @Override
+ protected void setReplicationSource(
+ String remoteName, List<String> replicaSuffixes, Optional<String> project)
+ throws IOException {
+ List<String> fetchUrls =
+ buildReplicaURLs(replicaSuffixes, s -> gitPath.resolve("${name}" + s + ".git").toString());
+ config.setStringList("remote", remoteName, "url", fetchUrls);
+ project.ifPresent(prj -> config.setString("remote", remoteName, "projects", prj));
+ config.setString("remote", remoteName, "fetchEvery", TEST_FETCH_FREQUENCY);
+ config.save();
+ }
+
+ @Override
+ public void setUpTestPlugin() throws Exception {
+ setUpTestPlugin(false);
+ }
+
+ @Test
+ @GerritConfig(name = "gerrit.instanceId", value = TEST_REPLICATION_REMOTE)
+ public void shouldFetchChangesPeriodically() throws Exception {
+ testRepo = cloneProject(createTestProject(project + TEST_REPLICATION_SUFFIX));
+
+ Result pushChangeResult = createChange();
+ RevCommit changeCommit = pushChangeResult.getCommit();
+ String sourceChangeRef = pushChangeResult.getPatchSet().refName();
+
+ try (Repository repo = repoManager.openRepository(project)) {
+ waitUntil(() -> checkedGetRef(repo, sourceChangeRef) != null);
+
+ Ref targetChangeRef = getRef(repo, sourceChangeRef);
+ assertThat(targetChangeRef).isNotNull();
+ assertThat(targetChangeRef.getObjectId()).isEqualTo(changeCommit.getId());
+
+ // ensure that previous fetch was finished
+ Thread.sleep(Duration.ofSeconds(TEST_REPLICATION_DELAY).toMillis());
+
+ Ref sourceNewRef = createNewRef();
+
+ waitUntil(() -> checkedGetRef(repo, sourceNewRef.getName()) != null);
+ Ref targetNewRef = getRef(repo, sourceNewRef.getName());
+ assertThat(targetNewRef).isNotNull();
+ assertThat(targetNewRef.getObjectId()).isEqualTo(sourceNewRef.getObjectId());
+ }
+ }
+}