Merge branch 'stable-3.5' into stable-3.6

* stable-3.5:
  Consume events-broker from source
  Bump up events-broker version to 3.5.1

Change-Id: I2c12e60802bdb4cf7a008d91972a34cc0ff8d212
diff --git a/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriber.java
index fb3daaf..b8f4d18 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriber.java
@@ -21,6 +21,8 @@
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.flogger.FluentLogger;
 import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.util.ManualRequestContext;
+import com.google.gerrit.server.util.OneOffRequestContext;
 import com.google.inject.Inject;
 import com.google.inject.assistedinject.Assisted;
 import com.google.pubsub.v1.PubsubMessage;
@@ -39,6 +41,7 @@
 
   private final EventDeserializer eventsDeserializer;
   private final PubSubSubscriberMetrics subscriberMetrics;
+  private final OneOffRequestContext oneOffRequestContext;
   private final String topic;
   private final Consumer<Event> messageProcessor;
   private final SubscriberProvider subscriberProvider;
@@ -51,10 +54,12 @@
       SubscriberProvider subscriberProvider,
       PubSubConfiguration config,
       PubSubSubscriberMetrics subscriberMetrics,
+      OneOffRequestContext oneOffRequestContext,
       @Assisted String topic,
       @Assisted Consumer<Event> messageProcessor) {
     this.eventsDeserializer = eventsDeserializer;
     this.subscriberMetrics = subscriberMetrics;
+    this.oneOffRequestContext = oneOffRequestContext;
     this.topic = topic;
     this.messageProcessor = messageProcessor;
     this.subscriberProvider = subscriberProvider;
@@ -99,7 +104,7 @@
   @VisibleForTesting
   MessageReceiver getMessageReceiver() {
     return (PubsubMessage message, AckReplyConsumer consumer) -> {
-      try {
+      try (ManualRequestContext ctx = oneOffRequestContext.open()) {
         Event event = eventsDeserializer.deserialize(message.getData().toStringUtf8());
         messageProcessor.accept(event);
         subscriberMetrics.incrementSucceedToConsumeMessage();
diff --git a/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriberTest.java b/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriberTest.java
index c144d3b..d9d9fac 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriberTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriberTest.java
@@ -26,6 +26,7 @@
 import com.google.gerrit.server.events.Event;
 import com.google.gerrit.server.events.EventGsonProvider;
 import com.google.gerrit.server.events.ProjectCreatedEvent;
+import com.google.gerrit.server.util.OneOffRequestContext;
 import com.google.gson.Gson;
 import com.google.protobuf.ByteString;
 import com.google.pubsub.v1.PubsubMessage;
@@ -43,6 +44,7 @@
   @Mock PubSubConfiguration confMock;
   @Mock SubscriberProvider subscriberProviderMock;
   @Mock PubSubSubscriberMetrics pubSubSubscriberMetricsMock;
+  @Mock OneOffRequestContext oneOffRequestContext;
   @Mock AckReplyConsumer ackReplyConsumerMock;
   @Mock Consumer<Event> succeedingConsumer;
   @Captor ArgumentCaptor<Event> eventMessageCaptor;
@@ -123,6 +125,7 @@
             subscriberProviderMock,
             confMock,
             pubSubSubscriberMetricsMock,
+            oneOffRequestContext,
             TOPIC,
             consumer)
         .getMessageReceiver();