Move message logging to events broker

The message log file is currently populated in an inconsistent way when
it comes to publishing messages. Publishing of some messages, like index
events, are still logged, however anything related to the stream events
topic is not, since this functionality was moved from multi-site to
events-broker. But it looks like the logging aspect was left out at the
time.

It makes sense to move all the message logging classes to the `events-
broker`, so they are together with the logic that publishes the stream
events, and used directly. This way, the clients of the library will not
even need to worry (or care) about updating the message log, this will
happen transparently.

The logic in `StreamEventPublisher` was already dealing with updating
the metrics. Incrementing the metrics is another feature that should be
tightly coupled with the publishing of the messages, however it looks
like some changes are required to allow each client of the library to
inject their own broker metrics. This will be done in a separate change.

Bug: Issue 294904654
Change-Id: I4e27619a0dde52c29626a33d2bcfb93d5945d509
diff --git a/src/main/java/com/gerritforge/gerrit/eventbroker/BrokerApiModule.java b/src/main/java/com/gerritforge/gerrit/eventbroker/BrokerApiModule.java
index 8bc4a44..f2b1eed 100644
--- a/src/main/java/com/gerritforge/gerrit/eventbroker/BrokerApiModule.java
+++ b/src/main/java/com/gerritforge/gerrit/eventbroker/BrokerApiModule.java
@@ -14,14 +14,16 @@
 
 package com.gerritforge.gerrit.eventbroker;
 
+import com.gerritforge.gerrit.eventbroker.log.Log4jMessageLogger;
+import com.gerritforge.gerrit.eventbroker.log.MessageLogger;
 import com.gerritforge.gerrit.eventbroker.metrics.BrokerMetrics;
 import com.gerritforge.gerrit.eventbroker.metrics.BrokerMetricsNoOp;
 import com.google.gerrit.extensions.registration.DynamicItem;
-import com.google.inject.AbstractModule;
+import com.google.gerrit.lifecycle.LifecycleModule;
 import com.google.inject.Inject;
 import com.google.inject.Scopes;
 
-public class BrokerApiModule extends AbstractModule {
+public class BrokerApiModule extends LifecycleModule {
   DynamicItem<BrokerApi> currentBrokerApi;
 
   @Inject(optional = true)
@@ -42,5 +44,8 @@
         .in(Scopes.SINGLETON);
 
     bind(EventDeserializer.class).in(Scopes.SINGLETON);
+
+    listener().to(Log4jMessageLogger.class);
+    bind(MessageLogger.class).to(Log4jMessageLogger.class);
   }
 }
diff --git a/src/main/java/com/gerritforge/gerrit/eventbroker/log/Log4jMessageLogger.java b/src/main/java/com/gerritforge/gerrit/eventbroker/log/Log4jMessageLogger.java
new file mode 100644
index 0000000..8a0f4f5
--- /dev/null
+++ b/src/main/java/com/gerritforge/gerrit/eventbroker/log/Log4jMessageLogger.java
@@ -0,0 +1,47 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.gerritforge.gerrit.eventbroker.log;
+
+import com.google.gerrit.extensions.systemstatus.ServerInformation;
+import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.events.EventGsonProvider;
+import com.google.gerrit.server.util.PluginLogFile;
+import com.google.gerrit.server.util.SystemLog;
+import com.google.gson.Gson;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.log4j.PatternLayout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Singleton
+public class Log4jMessageLogger extends PluginLogFile implements MessageLogger {
+  private static final String LOG_NAME = "message_log";
+  private final Logger msgLog;
+  private final Gson gson;
+
+  @Inject
+  public Log4jMessageLogger(
+      SystemLog systemLog, ServerInformation serverInfo, EventGsonProvider gsonProvider) {
+    super(systemLog, serverInfo, LOG_NAME, new PatternLayout("[%d{ISO8601}] [%t] %-5p : %m%n"));
+    this.msgLog = LoggerFactory.getLogger(LOG_NAME);
+    this.gson = gsonProvider.get();
+  }
+
+  @Override
+  public void log(Direction direction, String topic, Event event) {
+    msgLog.info("{} {} {}", direction, topic, gson.toJson(event));
+  }
+}
diff --git a/src/main/java/com/gerritforge/gerrit/eventbroker/log/MessageLogger.java b/src/main/java/com/gerritforge/gerrit/eventbroker/log/MessageLogger.java
new file mode 100644
index 0000000..c4a6c7c
--- /dev/null
+++ b/src/main/java/com/gerritforge/gerrit/eventbroker/log/MessageLogger.java
@@ -0,0 +1,26 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.gerritforge.gerrit.eventbroker.log;
+
+import com.google.gerrit.server.events.Event;
+
+public interface MessageLogger {
+  public enum Direction {
+    PUBLISH,
+    CONSUME;
+  }
+
+  public void log(Direction direction, String topic, Event event);
+}
diff --git a/src/main/java/com/gerritforge/gerrit/eventbroker/publisher/StreamEventPublisher.java b/src/main/java/com/gerritforge/gerrit/eventbroker/publisher/StreamEventPublisher.java
index e1d1398..d700952 100644
--- a/src/main/java/com/gerritforge/gerrit/eventbroker/publisher/StreamEventPublisher.java
+++ b/src/main/java/com/gerritforge/gerrit/eventbroker/publisher/StreamEventPublisher.java
@@ -15,6 +15,7 @@
 package com.gerritforge.gerrit.eventbroker.publisher;
 
 import com.gerritforge.gerrit.eventbroker.BrokerApi;
+import com.gerritforge.gerrit.eventbroker.log.MessageLogger;
 import com.gerritforge.gerrit.eventbroker.metrics.BrokerMetrics;
 import com.gerritforge.gerrit.eventbroker.publisher.executor.StreamEventPublisherExecutor;
 import com.google.common.flogger.FluentLogger;
@@ -38,6 +39,7 @@
   private final Executor executor;
   private final String instanceId;
   private final DynamicItem<BrokerMetrics> brokerMetrics;
+  private final MessageLogger msgLog;
 
   @Inject
   public StreamEventPublisher(
@@ -45,12 +47,14 @@
       StreamEventPublisherConfig config,
       @StreamEventPublisherExecutor Executor executor,
       @Nullable @GerritInstanceId String instanceId,
-      DynamicItem<BrokerMetrics> brokerMetrics) {
+      DynamicItem<BrokerMetrics> brokerMetrics,
+      MessageLogger msgLog) {
     this.brokerApiDynamicItem = brokerApi;
     this.config = config;
     this.executor = executor;
     this.instanceId = instanceId;
     this.brokerMetrics = brokerMetrics;
+    this.msgLog = msgLog;
   }
 
   @Override
@@ -77,6 +81,7 @@
           brokerApi
               .send(streamEventTopic, event)
               .get(config.getPublishingTimeoutInMillis(), TimeUnit.MILLISECONDS);
+          msgLog.log(MessageLogger.Direction.PUBLISH, streamEventTopic, event);
           brokerMetrics.get().incrementBrokerPublishedMessage();
         } catch (TimeoutException e) {
           log.atSevere().withCause(e).log(
diff --git a/src/test/java/com/gerritforge/gerrit/eventbroker/StreamEventPublisherTest.java b/src/test/java/com/gerritforge/gerrit/eventbroker/StreamEventPublisherTest.java
index 76ab3cd..795f962 100644
--- a/src/test/java/com/gerritforge/gerrit/eventbroker/StreamEventPublisherTest.java
+++ b/src/test/java/com/gerritforge/gerrit/eventbroker/StreamEventPublisherTest.java
@@ -20,6 +20,7 @@
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import com.gerritforge.gerrit.eventbroker.log.MessageLogger;
 import com.gerritforge.gerrit.eventbroker.metrics.BrokerMetrics;
 import com.gerritforge.gerrit.eventbroker.publisher.StreamEventPublisher;
 import com.gerritforge.gerrit.eventbroker.publisher.StreamEventPublisherConfig;
@@ -43,6 +44,7 @@
   @Mock private DynamicItem<BrokerMetrics> brokerMetricsDynamicItem;
   @Mock private BrokerApi brokerApi;
   @Mock private BrokerMetrics brokerMetrics;
+  @Mock private MessageLogger msgLog;
 
   private static final String STREAM_EVENTS_TOPIC = "stream-test-topic";
   private static final long PUBLISHING_TIMEOUT = 1000L;
@@ -60,7 +62,7 @@
     when(brokerApi.send(any(), any())).thenReturn(Futures.immediateFuture(true));
     objectUnderTest =
         new StreamEventPublisher(
-            brokerApiDynamicItem, config, EXECUTOR, INSTANCE_ID, brokerMetricsDynamicItem);
+            brokerApiDynamicItem, config, EXECUTOR, INSTANCE_ID, brokerMetricsDynamicItem, msgLog);
   }
 
   @Test
@@ -79,7 +81,7 @@
 
     objectUnderTest =
         new StreamEventPublisher(
-            brokerApiDynamicItem, config, EXECUTOR, null, brokerMetricsDynamicItem);
+            brokerApiDynamicItem, config, EXECUTOR, null, brokerMetricsDynamicItem, msgLog);
     objectUnderTest.onEvent(event);
     verify(brokerApi, times(1)).send(STREAM_EVENTS_TOPIC, event);
   }
@@ -91,7 +93,7 @@
 
     objectUnderTest =
         new StreamEventPublisher(
-            brokerApiDynamicItem, config, EXECUTOR, null, brokerMetricsDynamicItem);
+            brokerApiDynamicItem, config, EXECUTOR, null, brokerMetricsDynamicItem, msgLog);
     objectUnderTest.onEvent(event);
     verify(brokerApi, never()).send(STREAM_EVENTS_TOPIC, event);
   }
@@ -147,4 +149,13 @@
     objectUnderTest.onEvent(event);
     verify(brokerMetrics, times(1)).incrementBrokerFailedToPublishMessage();
   }
+
+  @Test
+  public void shouldUpdateMessageLogWhenMessageIsSuccessfullyPublished() {
+    Event event = new ProjectCreatedEvent();
+    event.instanceId = INSTANCE_ID;
+
+    objectUnderTest.onEvent(event);
+    verify(msgLog).log(MessageLogger.Direction.PUBLISH, STREAM_EVENTS_TOPIC, event);
+  }
 }