Use a bucketed metric for the project-specific replication lag

Previously there was a metric-per-project logic for tracking the
replicaton lags between nodes. Use the proper bucket callback
to provide all the values for the metric, avoid the metrics explosion
and also the danger to re-create the same project-specific metric
multiple times.

Change-Id: I3f8e95d296a49083ceca514412ab2b79beb788ed
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ReplicationStatus.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ReplicationStatus.java
index d677df8..0253588 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ReplicationStatus.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ReplicationStatus.java
@@ -20,7 +20,7 @@
 import com.google.gerrit.entities.Project;
 import com.google.gerrit.extensions.events.LifecycleListener;
 import com.google.gerrit.extensions.events.ProjectDeletedListener;
-import com.google.gerrit.metrics.Description;
+import com.google.gerrit.metrics.CallbackMetric1;
 import com.google.gerrit.metrics.MetricMaker;
 import com.google.gerrit.server.cache.CacheModule;
 import com.google.gerrit.server.cache.serialize.JavaCacheSerializer;
@@ -161,26 +161,27 @@
     }
   }
 
-  void incrementLagMetric(Project.NameKey projectName) {
-    logger.atFine().log("Creating replication lag metric for project %s", projectName);
-    String sanitizedProjectName = SubscriberMetrics.sanitizeProjectName(projectName.get());
-    metricMaker.newCallbackMetric(
-        String.format("%s_%s", SubscriberMetrics.REPLICATION_LAG_SEC, sanitizedProjectName),
-        Long.class,
-        new Description("Replication lag for project (sec)")
-            .setGauge()
-            .setUnit(Description.Units.SECONDS),
-        () -> getReplicationStatus(projectName.get()));
+  @VisibleForTesting
+  Runnable replicationLagMetricPerProject(CallbackMetric1<String, Long> metricCallback) {
+    return () -> {
+      if (replicationStatusPerProject.isEmpty()) {
+        metricCallback.forceCreate("");
+      } else {
+        replicationStatusPerProject.entrySet().stream()
+            .filter(e -> e.getValue() > 0)
+            .forEach(
+                e ->
+                    metricCallback.set(
+                        SubscriberMetrics.sanitizeProjectName(e.getKey()), e.getValue()));
+        metricCallback.prune();
+      }
+    };
   }
 
   @VisibleForTesting
   public void doUpdateLag(Project.NameKey projectName, Long lag) {
     cache.put(projectName.get(), lag);
     replicationStatusPerProject.put(projectName.get(), lag);
-
-    if (lag > 0) {
-      incrementLagMetric(projectName);
-    }
   }
 
   @VisibleForTesting
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetrics.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetrics.java
index 8eba666..bb971ff 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetrics.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetrics.java
@@ -14,12 +14,15 @@
 
 package com.googlesource.gerrit.plugins.multisite.consumer;
 
+import com.google.gerrit.metrics.CallbackMetric1;
 import com.google.gerrit.metrics.Counter1;
 import com.google.gerrit.metrics.Description;
+import com.google.gerrit.metrics.Field;
 import com.google.gerrit.metrics.MetricMaker;
 import com.google.gerrit.server.events.Event;
 import com.google.gerrit.server.events.ProjectEvent;
 import com.google.gerrit.server.events.RefUpdatedEvent;
+import com.google.gerrit.server.logging.Metadata;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.MultiSiteMetrics;
@@ -39,11 +42,15 @@
       "multi_site/subscriber/subscriber_replication_status/sec_behind";
   private static final String REPLICATION_LAG_MSEC =
       "multi_site/subscriber/subscriber_replication_status/msec_behind";
+  private static final String REPLICATION_LAG_MSEC_PROJECT =
+      "multi_site/subscriber/subscriber_replication_status/msec_behind/per_project";
 
   private final Counter1<String> subscriberSuccessCounter;
   private final Counter1<String> subscriberFailureCounter;
   private final ReplicationStatus replicationStatus;
   private static final Pattern isValidMetricNamePattern = Pattern.compile("[a-zA-Z0-9_-]");
+  private static final Field<String> PROJECT_NAME =
+      Field.ofString("project_name", Metadata.Builder::cacheName).build();
 
   @Inject
   public SubscriberMetrics(MetricMaker metricMaker, ReplicationStatus replicationStatus) {
@@ -75,6 +82,16 @@
             .setGauge()
             .setUnit(Description.Units.MILLISECONDS),
         replicationStatus::getMaxLagMillis);
+
+    CallbackMetric1<String, Long> metrics =
+        metricMaker.newCallbackMetric(
+            SubscriberMetrics.REPLICATION_LAG_MSEC_PROJECT,
+            Long.class,
+            new Description("Per-project replication lag (msec)")
+                .setGauge()
+                .setUnit(Description.Units.MILLISECONDS),
+            PROJECT_NAME);
+    metricMaker.newTrigger(metrics, replicationStatus.replicationLagMetricPerProject(metrics));
   }
 
   /**
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/CallbackMetricMaker.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/CallbackMetricMaker.java
index daf1cfd..a197c91 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/CallbackMetricMaker.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/CallbackMetricMaker.java
@@ -1,9 +1,9 @@
 package com.googlesource.gerrit.plugins.multisite.consumer;
 
-import com.google.common.base.Supplier;
-import com.google.gerrit.extensions.registration.RegistrationHandle;
+import com.google.gerrit.metrics.Counter1;
 import com.google.gerrit.metrics.Description;
 import com.google.gerrit.metrics.DisabledMetricMaker;
+import com.google.gerrit.metrics.Field;
 import org.junit.Ignore;
 
 @Ignore
@@ -15,14 +15,9 @@
   }
 
   @Override
-  public <V> RegistrationHandle newCallbackMetric(
-      String name, Class<V> valueClass, Description desc, Supplier<V> trigger) {
+  public <F1> Counter1<F1> newCounter(String name, Description desc, Field<F1> field1) {
     callbackMetricCounter += 1;
-    return new RegistrationHandle() {
-
-      @Override
-      public void remove() {}
-    };
+    return super.newCounter(name, desc, field1);
   }
 
   public void resetCallbackMetricCounter() {
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/ReplicationStatusTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/ReplicationStatusTest.java
index d49108c..7828062 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/ReplicationStatusTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/ReplicationStatusTest.java
@@ -15,8 +15,11 @@
 package com.googlesource.gerrit.plugins.multisite.consumer;
 
 import static com.google.common.truth.Truth.assertThat;
+import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.eq;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import com.google.common.cache.Cache;
@@ -24,6 +27,8 @@
 import com.google.common.collect.ImmutableSortedSet;
 import com.google.gerrit.entities.Project;
 import com.google.gerrit.extensions.events.ProjectDeletedListener;
+import com.google.gerrit.metrics.CallbackMetric1;
+import com.google.gerrit.metrics.DisabledMetricMaker;
 import com.google.gerrit.server.project.ProjectCache;
 import com.googlesource.gerrit.plugins.multisite.Configuration;
 import com.googlesource.gerrit.plugins.multisite.ProjectVersionLogger;
@@ -43,9 +48,9 @@
   @Mock private ProjectVersionLogger verLogger;
   @Mock private ProjectCache projectCache;
   @Mock private ProjectVersionRefUpdate projectVersionRefUpdate;
+  @Mock private CallbackMetric1<String, Long> perProjectReplicationLagMetricCallback;
   private ReplicationStatus objectUnderTest;
   private Cache<String, Long> replicationStatusCache;
-  private CallbackMetricMaker callbackMetricMaker = new CallbackMetricMaker();
 
   @Before
   public void setup() throws Exception {
@@ -53,7 +58,6 @@
         .thenReturn(
             ImmutableSortedSet.of(Project.nameKey("projectA"), Project.nameKey("projectB")));
     replicationStatusCache = CacheBuilder.newBuilder().build();
-    callbackMetricMaker.resetCallbackMetricCounter();
     objectUnderTest =
         new ReplicationStatus(
             replicationStatusCache,
@@ -62,7 +66,7 @@
             projectCache,
             Executors.newScheduledThreadPool(1),
             new Configuration(new Config(), new Config()),
-            callbackMetricMaker);
+            new DisabledMetricMaker());
   }
 
   @Test
@@ -174,10 +178,12 @@
         .thenReturn(Optional.of(projectRemoteVersion));
 
     objectUnderTest.updateReplicationLag(Project.nameKey(projectName));
+    objectUnderTest.replicationLagMetricPerProject(perProjectReplicationLagMetricCallback).run();
 
     assertThat(replicationStatusCache.getIfPresent(projectName))
         .isEqualTo(projectRemoteVersion - projectLocalVersion);
-    assertThat(callbackMetricMaker.getCallbackMetricCounter()).isEqualTo(1);
+    verify(perProjectReplicationLagMetricCallback)
+        .set(eq(projectName), eq(projectRemoteVersion - projectLocalVersion));
   }
 
   @Test
@@ -189,7 +195,8 @@
 
     objectUnderTest.updateReplicationLag(Project.nameKey(projectName));
 
-    assertThat(callbackMetricMaker.getCallbackMetricCounter()).isEqualTo(0);
+    assertThat(replicationStatusCache.getIfPresent(projectName)).isNull();
+    verify(perProjectReplicationLagMetricCallback, never()).set(any(), any());
   }
 
   @SuppressWarnings("unchecked")