package org.elasticsearch.xpack.ml.job.process;

import java.lang.Runnable;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.Strings;
import org.elasticsearch.core.SuppressForbidden;

/* loaded from: input_file:org/elasticsearch/xpack/ml/job/process/AbstractProcessWorkerExecutorService.class */
public abstract class AbstractProcessWorkerExecutorService<T extends Runnable> extends AbstractExecutorService {
    private static final Logger logger;
    protected final ThreadContext contextHolder;
    protected final String processName;
    protected final BlockingQueue<T> queue;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final CountDownLatch awaitTermination = new CountDownLatch(1);
    private final AtomicReference<Exception> error = new AtomicReference<>();
    private final AtomicBoolean running = new AtomicBoolean(true);
    private final AtomicBoolean shouldShutdownAfterCompletingWork = new AtomicBoolean(false);

    @SuppressForbidden(reason = "properly rethrowing errors, see EsExecutors.rethrowErrors")
    public AbstractProcessWorkerExecutorService(ThreadContext threadContext, String str, int i, Function<Integer, BlockingQueue<T>> function) {
        this.contextHolder = (ThreadContext) Objects.requireNonNull(threadContext);
        this.processName = (String) Objects.requireNonNull(str);
        this.queue = function.apply(Integer.valueOf(i));
    }

    public int queueSize() {
        return this.queue.size();
    }

    public void shutdownNowWithError(Exception exc) {
        this.error.set(exc);
        shutdownNow();
    }

    @Override // java.util.concurrent.ExecutorService
    public void shutdown() {
        this.shouldShutdownAfterCompletingWork.set(true);
    }

    @Override // java.util.concurrent.ExecutorService
    public List<Runnable> shutdownNow() {
        this.running.set(false);
        return new ArrayList(this.queue);
    }

    @Override // java.util.concurrent.ExecutorService
    public boolean isShutdown() {
        return !this.running.get() || this.shouldShutdownAfterCompletingWork.get();
    }

    @Override // java.util.concurrent.ExecutorService
    public boolean isTerminated() {
        return this.awaitTermination.getCount() == 0;
    }

    @Override // java.util.concurrent.ExecutorService
    public boolean awaitTermination(long j, TimeUnit timeUnit) throws InterruptedException {
        return this.awaitTermination.await(j, timeUnit);
    }

    public void start() {
        while (this.running.get()) {
            try {
                T poll = this.queue.poll(500L, TimeUnit.MILLISECONDS);
                if (poll != null) {
                    try {
                        poll.run();
                    } catch (Exception e) {
                        logger.error(() -> {
                            return "error handling process [" + this.processName + "] operation";
                        }, e);
                    }
                    EsExecutors.rethrowErrors(ThreadContext.unwrap(poll));
                } else if (this.shouldShutdownAfterCompletingWork.get()) {
                    this.running.set(false);
                }
            } catch (InterruptedException e2) {
                Thread.currentThread().interrupt();
                return;
            } finally {
                this.awaitTermination.countDown();
            }
        }
        notifyQueueRunnables();
    }

    public synchronized void notifyQueueRunnables() {
        if (!$assertionsDisabled && !isShutdown()) {
            throw new AssertionError("Queue runnables should only be drained and notified after the worker is shutdown");
        }
        if (this.queue.isEmpty()) {
            return;
        }
        logger.warn(Strings.format("[%s] notifying [%d] queued requests that have not been processed before shutdown", new Object[]{this.processName, Integer.valueOf(this.queue.size())}));
        ArrayList<AbstractRunnable> arrayList = new ArrayList();
        this.queue.drainTo(arrayList);
        String str = "unable to process as " + this.processName + " worker service has shutdown";
        Exception exc = this.error.get();
        for (AbstractRunnable abstractRunnable : arrayList) {
            if (abstractRunnable instanceof AbstractRunnable) {
                AbstractRunnable abstractRunnable2 = abstractRunnable;
                if (exc != null) {
                    abstractRunnable2.onFailure(exc);
                } else {
                    abstractRunnable2.onRejection(new EsRejectedExecutionException(str, true));
                }
            }
        }
    }

    static {
        $assertionsDisabled = !AbstractProcessWorkerExecutorService.class.desiredAssertionStatus();
        logger = LogManager.getLogger(AbstractProcessWorkerExecutorService.class);
    }
}
