blob: e7c9d358e501d298479101ac3b933cec6b277247 [file] [log] [blame]
// Copyright (C) 2021 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.googlesource.gerrit.plugins.eventseiffel.parsing;
import static com.googlesource.gerrit.plugins.eventseiffel.eiffel.dto.EiffelEventType.SCC;
import static com.googlesource.gerrit.plugins.eventseiffel.eiffel.dto.EiffelEventType.SCS;
import com.github.rholder.retry.RetryException;
import com.github.rholder.retry.Retryer;
import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.StopStrategies;
import com.github.rholder.retry.WaitStrategies;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.flogger.FluentLogger;
import com.google.gerrit.entities.Project;
import com.google.gerrit.entities.RefNames;
import com.google.gerrit.exceptions.NoSuchEntityException;
import com.google.gerrit.extensions.common.AccountInfo;
import com.google.gerrit.server.git.GitRepositoryManager;
import com.google.inject.Inject;
import com.google.inject.Provider;
import com.googlesource.gerrit.plugins.eventseiffel.EiffelEventHub;
import com.googlesource.gerrit.plugins.eventseiffel.cache.EiffelEventIdLookupException;
import com.googlesource.gerrit.plugins.eventseiffel.config.EventsFilter;
import com.googlesource.gerrit.plugins.eventseiffel.eiffel.CompositionDefinedEventKey;
import com.googlesource.gerrit.plugins.eventseiffel.eiffel.EventKey;
import com.googlesource.gerrit.plugins.eventseiffel.eiffel.SourceChangeEventKey;
import com.googlesource.gerrit.plugins.eventseiffel.eiffel.dto.EiffelEvent;
import com.googlesource.gerrit.plugins.eventseiffel.mapping.EiffelEventMapper;
import com.googlesource.gerrit.plugins.eventseiffel.parsing.CommitsWalker.EventCreate;
import com.googlesource.gerrit.plugins.eventseiffel.parsing.CommitsWalker.ScsWalker;
import com.googlesource.gerrit.plugins.eventseiffel.parsing.CommitsWalker.UnprocessedCommitsWalker;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.eclipse.jgit.errors.ConfigInvalidException;
import org.eclipse.jgit.errors.IncorrectObjectTypeException;
import org.eclipse.jgit.errors.MissingObjectException;
import org.eclipse.jgit.errors.RepositoryNotFoundException;
import org.eclipse.jgit.lib.Constants;
import org.eclipse.jgit.lib.ObjectId;
import org.eclipse.jgit.lib.Ref;
import org.eclipse.jgit.lib.Repository;
import org.eclipse.jgit.revwalk.RevCommit;
import org.eclipse.jgit.revwalk.RevWalk;
import org.eclipse.jgit.revwalk.RevWalkUtils;
/** Creates and pushes missing Eiffel events to the Eiffel event queue. */
public class EiffelEventParserImpl implements EiffelEventParser {
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
private static final int NBR_RETRIES = 3;
private final EiffelEventHub eventHub;
private final GitRepositoryManager repoManager;
private final CommitsWalker.Factory walkerFactory;
private final EiffelEventMapper mapper;
private final Provider<EventsFilter> eventsFilter;
@Inject
public EiffelEventParserImpl(
EiffelEventHub eventQueue,
GitRepositoryManager repoManager,
EiffelEventMapper mapper,
CommitsWalker.Factory walkerFactory,
Provider<EventsFilter> eventsFilter) {
this.eventHub = eventQueue;
this.repoManager = repoManager;
this.walkerFactory = walkerFactory;
this.mapper = mapper;
this.eventsFilter = eventsFilter;
}
/* (non-Javadoc)
* @see com.googlesource.gerrit.plugins.eventseiffel.parsing.EiffelEventParser#createAndScheduleSccFromEvent(com.google.gerrit.extensions.events.RevisionCreatedListener.Event)
*/
@Override
public void createAndScheduleSccFromPatchsetCreation(PatchsetCreationData data)
throws EventParsingException {
SourceChangeEventKey scc =
SourceChangeEventKey.sccKey(data.project, data.branch, data.commitId);
try {
if (eventHub.getExistingId(scc).isPresent()) {
logger.atWarning().log(
"Event %s already pushed for %d/%d", scc, data.changeNumber, data.patchsetNumber);
return;
}
List<UUID> parentUuids = Lists.newArrayList();
for (String parentId : data.parentCommitIds) {
Optional<UUID> parentUuid = eventHub.getExistingId(scc.copy(parentId));
if (parentUuid.isPresent()) {
parentUuids.add(parentUuid.get());
}
}
/* Eiffel events have been scheduled or published for all parents. */
if (parentUuids.size() == data.parentCommitIds.size()) {
pushToHub(mapper.toScc(data, parentUuids));
} else {
createAndScheduleMissingSccs(scc);
}
} catch (IOException
| ConfigInvalidException
| NoSuchEntityException
| EiffelEventIdLookupException
| InterruptedException e) {
throw new EventParsingException(
e,
"Event creation failed for: %s, %s, %s to SCC.",
data.project,
data.branch,
data.commitId);
}
}
/* (non-Javadoc)
* @see com.googlesource.gerrit.plugins.eventseiffel.parsing.EiffelEventParser#createAndScheduleSccFromBranch(java.lang.String, java.lang.String)
*/
@Override
public void createAndScheduleSccFromBranch(String repoName, String branchRef)
throws EventParsingException {
ObjectId tip = getTipOf(repoName, branchRef);
if (tip == null) {
return;
}
createAndScheduleSccFromCommit(repoName, branchRef, tip.getName());
}
/* (non-Javadoc)
* @see com.googlesource.gerrit.plugins.eventseiffel.parsing.EiffelEventParser#createAndScheduleSccFromCommit(java.lang.String, java.lang.String, java.lang.String)
*/
@Override
public void createAndScheduleSccFromCommit(String repoName, String branchRef, String commit)
throws EventParsingException {
SourceChangeEventKey scc = SourceChangeEventKey.sccKey(repoName, branchRef, commit);
try {
createAndScheduleMissingSccs(scc);
} catch (IOException
| EiffelEventIdLookupException
| NoSuchEntityException
| ConfigInvalidException
| InterruptedException e) {
throw new EventParsingException(e, "Event creation failed for: %s", scc);
}
}
@Override
public void fillGapsForScsFromBranch(String repoName, String branchRef)
throws EventParsingException {
ObjectId tip = getTipOf(repoName, branchRef);
if (tip == null) {
return;
}
SourceChangeEventKey scs = SourceChangeEventKey.scsKey(repoName, branchRef, tip.getName());
fillGapsForSc(scs);
}
@Override
public void fillGapsForSccFromBranch(String repoName, String branchRef)
throws EventParsingException {
ObjectId tip = getTipOf(repoName, branchRef);
if (tip == null) {
return;
}
fillGapsForSccFromCommit(repoName, branchRef, tip.getName());
}
@Override
public void fillGapsForSccFromCommit(String repoName, String branchRef, String commit)
throws EventParsingException {
SourceChangeEventKey scc = SourceChangeEventKey.sccKey(repoName, branchRef, commit);
fillGapsForSc(scc);
}
@VisibleForTesting
private void fillGapsForSc(SourceChangeEventKey key) throws EventParsingException {
logger.atFine().log("Start publishing missing events starting from: %s", key);
try (CommitsWalker commitFinder = walkerFactory.childWalker(key)) {
fillGapsForSc(key, commitFinder);
} catch (IOException
| EiffelEventIdLookupException
| NoSuchEntityException
| ConfigInvalidException
| InterruptedException e) {
throw new EventParsingException(e, "Event creation failed for: %s", key);
}
logger.atFine().log("Done publishing missing events for starting from: %s", key);
}
/* (non-Javadoc)
* @see com.googlesource.gerrit.plugins.eventseiffel.parsing.EiffelEventParser#createAndScheduleMissingScssFromBranch(java.lang.String, java.lang.String)
*/
@Override
public void createAndScheduleMissingScssFromBranch(String repoName, String branchRef)
throws EventParsingException {
ObjectId tip = getTipOf(repoName, branchRef);
if (tip == null) {
return;
}
SourceChangeEventKey scs = SourceChangeEventKey.scsKey(repoName, branchRef, tip.getName());
createAndScheduleMissingScss(scs, null, null, null);
}
/* (non-Javadoc)
* @see com.googlesource.gerrit.plugins.eventseiffel.parsing.EiffelEventParser#createAndScheduleMissingScss(com.googlesource.gerrit.plugins.eventseiffel.eiffel.SourceChangeEventKey, java.lang.String, com.google.gerrit.extensions.common.AccountInfo, java.lang.Long)
*/
@Override
public void createAndScheduleMissingScss(
SourceChangeEventKey scs,
String commitSha1TransactionEnd,
AccountInfo submitter,
Long submittedAt)
throws EventParsingException {
SourceChangeEventKey currentScs = scs;
SourceChangeEventKey scc = scs.copy(SCC);
try {
try (ScsWalker scsFinder =
walkerFactory.scsWalker(scs, commitSha1TransactionEnd, submitter, submittedAt)) {
if (eventHub.getExistingId(scc).isEmpty()) {
/* One or several SCC events are missing, create them first */
try (UnprocessedCommitsWalker sccFinder = scsFinder.sccWalker()) {
createAndScheduleMissingSccs(scc, sccFinder);
}
}
if (!scsFinder.hasNext()) {
logger.atFine().log("All events were already published for %s", scs);
} else {
logger.atFine().log("Start publishing events for: %s", scs);
}
while (scsFinder.hasNext()) {
EventCreate create = scsFinder.next();
currentScs = create.key;
Optional<UUID> sccId = eventHub.getExistingId(create.key.copy(SCC));
if (sccId.isEmpty()) {
throw new EiffelEventIdLookupException(
"Unable to find SCC event id: %s", create.key.copy(SCC));
}
pushToHub(
mapper.toScs(
create.commit,
create.key.repo(),
create.key.branch(),
create.submitter,
create.submittedAt,
getParentUuids(create.key, create.commit),
sccId.get()));
}
}
logger.atFine().log("Done publishing events for: %s", scs);
} catch (IOException
| EiffelEventIdLookupException
| InterruptedException
| ConfigInvalidException
| NoSuchEntityException e) {
throw new EventParsingException(e, "Failed to create Eiffel event(s) for %s.", currentScs);
}
}
/* (non-Javadoc)
* @see com.googlesource.gerrit.plugins.eventseiffel.parsing.EiffelEventParser#createAndScheduleArtc(java.lang.String, java.lang.String, java.lang.Long, boolean)
*/
@Override
public void createAndScheduleArtc(
String repoName, String tagName, Long creationTime, boolean force)
throws EventParsingException {
try {
CompositionDefinedEventKey cd =
CompositionDefinedEventKey.create(mapper.tagCompositionName(repoName), tagName);
Optional<UUID> oldCdId = eventHub.getExistingId(cd);
if (oldCdId.isEmpty() || force) {
createAndScheduleCd(repoName, tagName, creationTime, force);
Optional<UUID> cdId = eventHub.getExistingId(cd);
if (cdId.isPresent() && !cdId.equals(oldCdId)) {
pushToHub(mapper.toArtc(repoName, tagName, creationTime, cdId.get()), force);
if (oldCdId.isPresent()) {
logger.atInfo().log(
"Event Artc has been forcibly created for: %s, %s", repoName, tagName);
} else {
logger.atFine().log("Event Artc has been created for: %s, %s", repoName, tagName);
}
}
} else {
/* Artc event has already been created */
logger.atFine().log("Event Artc has already been created for: %s, %s", repoName, tagName);
}
} catch (EiffelEventIdLookupException | InterruptedException e) {
throw new EventParsingException(
e, "Event creation failed for: %s, %s to Artc", repoName, tagName);
}
}
private void createAndScheduleCd(
String projectName, String tagName, Long creationTime, boolean force)
throws EventParsingException {
Optional<UUID> scsId = Optional.empty();
List<Ref> refs = null;
try {
EventsFilter filter = eventsFilter.get();
ObjectId objectId = peelTag(projectName, tagName, filter.blockLightWeightTags());
if (objectId == null) {
return;
}
String commitId = objectId.getName();
/* Check if an event for commit~master has been created. */
SourceChangeEventKey scs =
SourceChangeEventKey.scsKey(projectName, RefNames.REFS_HEADS + "master", commitId);
scsId = eventHub.getExistingId(scs);
if (scsId.isEmpty()) {
/* No event created for commit~master. Check if event is created for any
of the other branches. */
Retryer<Optional<UUID>> retryer =
RetryerBuilder.<Optional<UUID>>newBuilder()
.retryIfResult(Optional::isEmpty)
.withWaitStrategy(WaitStrategies.fixedWait(10, TimeUnit.SECONDS))
.withStopStrategy(StopStrategies.stopAfterAttempt(2))
.build();
try (Repository repo = repoManager.openRepository(Project.nameKey(projectName))) {
refs =
repo.getRefDatabase().getRefsByPrefix(RefNames.REFS_HEADS).stream()
.collect(Collectors.toList());
} catch (IOException e) {
throw new EventParsingException(
e, "Unable to get branches for: %s:%s", projectName, tagName);
}
List<String> branches =
refs.stream()
.map(Ref::getName)
.filter(branch -> !branch.equals(RefNames.REFS_HEADS + "master"))
.collect(Collectors.toList());
try {
scsId = retryer.call(() -> findSourceChangeEventKey(projectName, commitId, branches));
} catch (RetryException | ExecutionException e) {
logger.atWarning().withCause(e).log(
"Failed to find SCS for %s in %s for one of the branches (%s) when trying to create CD for tag %s",
commitId, projectName, String.join(", ", branches), tagName);
}
}
if (scsId.isEmpty()) {
/* No event has been created for the commit. Find any non-blocked branch that
commit is merged into and create SCS events for that branch. */
List<Ref> branches;
try (Repository repo = repoManager.openRepository(Project.nameKey(projectName))) {
RevWalk rw = new RevWalk(repo);
refs =
refs.stream()
.filter(ref -> !filter.refIsBlocked(ref.getName()))
.collect(Collectors.toList());
branches = RevWalkUtils.findBranchesReachableFrom(rw.parseCommit(objectId), rw, refs);
} catch (IOException e) {
throw new EventParsingException(
e, "Unable to get reachable branches for: %s:%s", projectName, tagName);
}
if (branches.isEmpty()) {
throw new EventParsingException(
"Could not find any unblocked branch for SCS with: %s in %s so CD could not be created for tag %s",
commitId, projectName, tagName);
}
String branch = branches.get(0).getName();
scs = SourceChangeEventKey.scsKey(projectName, branch, commitId);
createAndScheduleMissingScss(scs, null, null, null);
scsId = eventHub.getExistingId(scs);
}
if (scsId.isEmpty()) {
throw new EventParsingException(
"Could not find or create SCS for %s in %s so CD could not be created for tag %s",
commitId, projectName, tagName);
}
pushToHub(mapper.toCd(projectName, tagName, creationTime, scsId.get()), force);
} catch (EiffelEventIdLookupException | InterruptedException e) {
throw new EventParsingException(
e,
"Event creation failed for: %s",
CompositionDefinedEventKey.create(mapper.tagCompositionName(projectName), tagName));
}
}
private ObjectId peelTag(String projectName, String tagName, boolean blockLightWeightTags)
throws EventParsingException {
try (Repository repo = repoManager.openRepository(Project.nameKey(projectName))) {
Ref tagRef = repo.getRefDatabase().exactRef(Constants.R_TAGS + tagName);
if (tagRef != null) {
ObjectId peeled = repo.getRefDatabase().peel(tagRef).getPeeledObjectId();
if (peeled != null) return peeled;
if (!blockLightWeightTags) return tagRef.getObjectId();
logger.atInfo().log("Creation of CD is blocked for lightweight tags");
return null;
}
throw new EventParsingException("Cannot find tag: %s:%s", projectName, tagName);
} catch (IOException e) {
throw new EventParsingException(e, "Unable to peel tag: %s:%s", projectName, tagName);
}
}
private Optional<UUID> findSourceChangeEventKey(
String projectName, String commitId, List<String> branches)
throws EiffelEventIdLookupException {
return eventHub.getScsForCommit(projectName, commitId, branches);
}
private void pushToHub(EiffelEvent toPush) throws InterruptedException {
pushToHub(toPush, false);
}
private void pushToHub(EiffelEvent toPush, boolean force) throws InterruptedException {
int failureCount = 0;
EventKey key = EventKey.fromEvent(toPush);
while (true) {
try {
eventHub.push(toPush, force);
logger.atFine().log("Successfully pushed %s to EventHub", key);
return;
} catch (InterruptedException e) {
if (!eventHub.isOpen()) {
logger.atInfo().log("EventHub is closed, aborting.");
throw e;
}
failureCount++;
if (failureCount < NBR_RETRIES) {
logger.atWarning().withCause(e).log(
"Interrupted while pushing %s to EventHub, attempt %d/%d",
key, failureCount, NBR_RETRIES);
} else {
throw e;
}
}
}
}
/* Callers are responsible for closing commitFinder. */
private void fillGapsForSc(SourceChangeEventKey tip, CommitsWalker commitFinder)
throws MissingObjectException, EiffelEventIdLookupException, IOException,
NoSuchEntityException, ConfigInvalidException, InterruptedException,
EventParsingException {
while (commitFinder.hasNext()) {
CommitsWalker.EventCreate job = commitFinder.next();
RevCommit commit = job.commit;
SourceChangeEventKey key = job.key;
logger.atFine().log("Processing event-creation for missing event: %s", key);
Optional<UUID> id = eventHub.getExistingId(key);
if (id.isEmpty()) {
id = findUuid(tip, key);
try {
EiffelEvent event;
if (key.type() == SCC) {
event = mapper.toScc(commit, key.repo(), key.branch(), getParentUuids(key, commit), id);
} else {
event =
mapper.toScs(
commit,
key.repo(),
key.branch(),
null,
null,
getParentUuids(key, commit),
getSccUuid(key),
id);
}
pushToHub(event);
} catch (InterruptedException e) {
logger.atSevere().log("Interrupted while pushing %s to EventHub.", key);
throw e;
}
}
}
}
@VisibleForTesting
void createAndScheduleMissingSccs(SourceChangeEventKey scc)
throws MissingObjectException, IncorrectObjectTypeException, IOException,
EiffelEventIdLookupException, RepositoryNotFoundException, NoSuchEntityException,
ConfigInvalidException, InterruptedException {
logger.atFine().log("Start publishing events for: %s", scc);
try (UnprocessedCommitsWalker commitFinder = walkerFactory.sccWalker(scc)) {
createAndScheduleMissingSccs(scc, commitFinder);
}
logger.atFine().log("Done publishing events for: %s", scc);
}
/* Callers are responsible for closing commitFinder. */
private void createAndScheduleMissingSccs(
SourceChangeEventKey scc, UnprocessedCommitsWalker commitFinder)
throws MissingObjectException, EiffelEventIdLookupException, IOException,
NoSuchEntityException, ConfigInvalidException, InterruptedException {
if (!commitFinder.hasNext()) {
logger.atFine().log("All events were already published for %s", scc);
} else {
logger.atFine().log("Start publishing events for: %s", scc);
}
while (commitFinder.hasNext()) {
EventCreate job = commitFinder.next();
logger.atFine().log("Processing event-creation for: %s", job.key);
try {
pushToHub(
mapper.toScc(
job.commit, job.key.repo(), job.key.branch(), getParentUuids(job.key, job.commit)));
} catch (InterruptedException e) {
logger.atSevere().log("Interrupted while pushing %s to EventHub.", job.key);
throw e;
}
}
}
private ObjectId getTipOf(String repoName, String branch) {
try (Repository repo = repoManager.openRepository(Project.nameKey(repoName))) {
Ref branchRef = repo.exactRef(branch);
if (branchRef == null) {
logger.atWarning().log("Could not find ref: %s in project: %s", branch, repoName);
return null;
}
return branchRef.getTarget().getObjectId();
} catch (IOException ioe) {
logger.atSevere().withCause(ioe).log("Unable to identify tip of (%s:%s).", repoName, branch);
return null;
}
}
private List<UUID> getParentUuids(SourceChangeEventKey key, RevCommit commit)
throws NoSuchEntityException, EiffelEventIdLookupException {
List<UUID> parentIds = Lists.newArrayList();
for (RevCommit parent : commit.getParents()) {
SourceChangeEventKey parentKey = key.copy(parent.getName());
Optional<UUID> parentId = eventHub.getExistingId(parentKey);
if (parentId.isPresent()) {
parentIds.add(parentId.get());
} else {
exceptionForMissingParent(key, parent);
}
}
return parentIds;
}
private UUID getSccUuid(SourceChangeEventKey key)
throws NoSuchEntityException, EiffelEventIdLookupException {
SourceChangeEventKey siblingKey = key.copy(SCC);
Optional<UUID> id = eventHub.getExistingId(siblingKey);
if (id.isPresent()) {
return id.get();
} else {
throw new NoSuchEntityException(
String.format(
"Unable to lookup SCC (%s) event UUID for %s even though it should exist.",
key.commit(), key));
}
}
/* Attempt to determine which UUID is used by other events that links to THIS event. */
private Optional<UUID> findUuid(SourceChangeEventKey tip, SourceChangeEventKey key)
throws EventParsingException, EiffelEventIdLookupException {
Optional<UUID> id = getUuidFromScsChangeLink(key);
if (id.isPresent()) {
return id;
}
Set<UUID> uuids = null;
/* Check which UUID the children of this event(commit) use to point to THIS event. */
try (CommitsWalker commitFinder = walkerFactory.childWalker(tip, key)) {
while (commitFinder.hasNext()) {
SourceChangeEventKey childKey = commitFinder.next().key;
Optional<List<UUID>> ids = eventHub.getParentLinks(childKey);
if (ids.isEmpty()) {
logger.atFine().log("Eiffel-event is missing for child: %s", childKey);
continue;
}
if (uuids == null) {
uuids = new HashSet<>(ids.get());
} else {
/* Do intersection between the set of parents for each child to determine the UUID for THIS event. */
uuids.retainAll(ids.get());
}
if (uuids.size() == 1) { // Found one UUID that all the processed children points to.
return Optional.of((UUID) uuids.toArray()[0]);
}
}
} catch (IOException e) {
throw new EventParsingException(e, "Unable to get reachable UUID for children of: %s", key);
}
if (uuids == null) {
/* Occur if event/s of child/s commit is missing or if the commmit of the event does not have
a child. In this situation we can generate a new UUID. */
logger.atFine().log("Could not find an event that reference: %s", key);
return Optional.empty();
}
/* If we found several potential UUID:s we can not generate a new UUID so we need to throw an
exception to prevent the creation of the event. */
String potentialUUID =
String.join(", ", uuids.stream().map(uuid -> uuid.toString()).collect(Collectors.toList()));
throw new EventParsingException(
"Found several potential UUID for %s potential UUID: %s", key, potentialUUID);
}
private Optional<UUID> getUuidFromScsChangeLink(SourceChangeEventKey key)
throws EiffelEventIdLookupException {
if (key.type().equals(SCC)) {
return eventHub.getSccEventLink(key.copy(SCS));
}
return Optional.empty();
}
private void exceptionForMissingParent(SourceChangeEventKey key, RevCommit parent)
throws NoSuchEntityException {
throw new NoSuchEntityException(
String.format(
"Unable to lookup parent (%s) event UUID for %s even though it should exist.",
parent.abbreviate(7).name(), key));
}
}