Ensure that replicateOnStartup doesn't compete with fetch
When `gerrit.replicateOnStartup` is set to `true` then corresponding
projects will be fetched. Ensure that projects that are subject of
periodic fetch are excluded from the `replicateOnStartup` schedule as
they are going to be fetched anyway.
Bug: Issue 322146240
Change-Id: I29c6b12fc12ba0860f43a7fb2d116faa2482a1fa
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/OnStartStop.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/OnStartStop.java
index 0a75c86..6ac09f6 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/OnStartStop.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/OnStartStop.java
@@ -25,7 +25,6 @@
import com.google.inject.Inject;
import com.google.inject.Provider;
import com.googlesource.gerrit.plugins.replication.ReplicationConfig;
-import com.googlesource.gerrit.plugins.replication.ReplicationFilter;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@@ -79,7 +78,11 @@
new FetchResultProcessing.GitUpdateProcessing(eventDispatcher.get()));
fetchAllFuture.set(
fetchAll
- .create(null, ReplicationFilter.all(), state, false)
+ .create(
+ null,
+ fetchAllPeriodically.get().skipFromReplicateAllOnPluginStart(),
+ state,
+ false)
.schedule(30, TimeUnit.SECONDS));
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourceFetchPeriodically.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourceFetchPeriodically.java
index 2c2f6d8..264c880 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourceFetchPeriodically.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourceFetchPeriodically.java
@@ -15,6 +15,7 @@
package com.googlesource.gerrit.plugins.replication.pull;
import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.entities.Project;
import com.google.gerrit.extensions.registration.DynamicItem;
import com.google.gerrit.server.events.EventDispatcher;
import com.google.gerrit.server.project.ProjectCache;
@@ -26,6 +27,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
class SourceFetchPeriodically {
interface Factory {
@@ -59,11 +61,14 @@
this::scheduleFetchAll, 0L, source.fetchEvery(), TimeUnit.SECONDS);
}
+ Stream<Project.NameKey> projectsToFetch() {
+ return projects.all().stream().filter(source::wouldFetchProject);
+ }
+
private void scheduleFetchAll() {
Optional<PullReplicationApiRequestMetrics> metrics = Optional.of(metricsProvider.get());
long repositoriesToBeFetched =
- projects.all().stream()
- .filter(source::wouldFetchProject)
+ projectsToFetch()
.map(
projectToFetch ->
source.scheduleNow(
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourcesFetchPeriodically.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourcesFetchPeriodically.java
index b405a6e..de1d571 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourcesFetchPeriodically.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourcesFetchPeriodically.java
@@ -16,15 +16,20 @@
import static com.googlesource.gerrit.plugins.replication.pull.SourceConfiguration.DEFAULT_PERIODIC_FETCH_DISABLED;
import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toSet;
import com.google.common.base.Suppliers;
import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.entities.Project;
import com.google.gerrit.server.git.WorkQueue;
import com.google.inject.Inject;
import com.google.inject.Provider;
import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.replication.ReplicationFilter;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.function.Supplier;
@@ -36,7 +41,7 @@
private final WorkQueue workQueue;
private final Provider<SourcesCollection> sources;
private final Provider<SourceFetchPeriodically.Factory> fetchAllCreator;
- private final List<ScheduledFuture<?>> scheduled;
+ private final List<ScheduledSource> scheduled;
@Inject
SourcesFetchPeriodically(
@@ -53,7 +58,19 @@
scheduled.addAll(scheduleFetchAll(workQueue, sources.get(), fetchAllCreator.get()));
}
- private List<ScheduledFuture<?>> scheduleFetchAll(
+ ReplicationFilter skipFromReplicateAllOnPluginStart() {
+ Set<Project.NameKey> projectsToSkip =
+ scheduled.stream()
+ .flatMap(scheduledSource -> scheduledSource.sourceFetch.projectsToFetch())
+ .collect(toSet());
+ if (projectsToSkip.isEmpty()) {
+ return ReplicationFilter.all();
+ }
+
+ return new SkipProjectsFilter(projectsToSkip);
+ }
+
+ private List<ScheduledSource> scheduleFetchAll(
WorkQueue workQueue,
SourcesCollection sources,
SourceFetchPeriodically.Factory fetchAllCreator) {
@@ -66,12 +83,41 @@
logger.atInfo().log(
"Enabling periodic (every %ds) fetch of all refs for [%s] remote",
source.fetchEvery(), source.getRemoteConfigName());
- return fetchAllCreator.create(source).start(queue.get());
+ SourceFetchPeriodically sourceFetchAll = fetchAllCreator.create(source);
+ return new ScheduledSource(sourceFetchAll, sourceFetchAll.start(queue.get()));
})
.collect(toList());
}
void stop() {
- scheduled.forEach(schedule -> schedule.cancel(true));
+ scheduled.forEach(source -> source.schedule.cancel(true));
+ }
+
+ private static class ScheduledSource {
+ private final SourceFetchPeriodically sourceFetch;
+ private final ScheduledFuture<?> schedule;
+
+ private ScheduledSource(SourceFetchPeriodically sourceFetch, ScheduledFuture<?> schedule) {
+ this.sourceFetch = sourceFetch;
+ this.schedule = schedule;
+ }
+ }
+
+ /**
+ * The ReplicationFilter implementation that matches all projects that are not part of the
+ * internal set.
+ */
+ private static class SkipProjectsFilter extends ReplicationFilter {
+ private final Set<Project.NameKey> projectsToSkip;
+
+ private SkipProjectsFilter(Set<Project.NameKey> projectsToSkip) {
+ super(Collections.emptyList());
+ this.projectsToSkip = projectsToSkip;
+ }
+
+ @Override
+ public boolean matches(Project.NameKey name) {
+ return !projectsToSkip.contains(name);
+ }
}
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/SourcesFetchPeriodicallyTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/SourcesFetchPeriodicallyTest.java
new file mode 100644
index 0000000..47b891d
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/SourcesFetchPeriodicallyTest.java
@@ -0,0 +1,86 @@
+// Copyright (C) 2024 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.entities.Project.NameKey;
+import com.google.gerrit.server.git.WorkQueue;
+import com.google.inject.util.Providers;
+import com.googlesource.gerrit.plugins.replication.ReplicationFilter;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Stream;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class SourcesFetchPeriodicallyTest {
+ private static final NameKey RANDOM_PROJECT_NAME = Project.nameKey("random_project_name");
+
+ @Mock WorkQueue workQueueMock;
+ @Mock SourcesCollection sourcesMock;
+ @Mock SourceFetchPeriodically.Factory factoryMock;
+
+ private SourcesFetchPeriodically objectUnderTest;
+
+ @Before
+ public void setup() {
+ objectUnderTest =
+ new SourcesFetchPeriodically(
+ workQueueMock, Providers.of(sourcesMock), Providers.of(factoryMock));
+ }
+
+ @Test
+ public void shouldMatchAnyProjectWhenNoProjectsToFetchPeriodicallyAreConfigured() {
+ // given
+ when(sourcesMock.getAll()).thenReturn(Collections.emptyList());
+
+ // when
+ objectUnderTest.start();
+ ReplicationFilter filter = objectUnderTest.skipFromReplicateAllOnPluginStart();
+
+ // then
+ assertThat(filter.matches(RANDOM_PROJECT_NAME)).isTrue();
+ }
+
+ @Test
+ public void shouldNotMatchProjectConfiguredToFetchPeriodically() {
+ // given
+ Project.NameKey projectToFetch = Project.nameKey("to_be_fetched");
+ SourceFetchPeriodically fetchProjectForSource = mock(SourceFetchPeriodically.class);
+ when(fetchProjectForSource.projectsToFetch()).thenReturn(Stream.of(projectToFetch));
+
+ Source sourceWithPeriodicFetch = mock(Source.class);
+ when(sourceWithPeriodicFetch.fetchEvery()).thenReturn(1L);
+ when(sourcesMock.getAll()).thenReturn(List.of(sourceWithPeriodicFetch));
+ when(factoryMock.create(any())).thenReturn(fetchProjectForSource);
+
+ // when
+ objectUnderTest.start();
+ ReplicationFilter filter = objectUnderTest.skipFromReplicateAllOnPluginStart();
+
+ // then
+ assertThat(filter.matches(projectToFetch)).isFalse();
+ assertThat(filter.matches(RANDOM_PROJECT_NAME)).isTrue();
+ }
+}