/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.ad.task;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import java.io.IOException;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Semaphore;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.search.TotalHits;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchStatusException;
import org.opensearch.ResourceAlreadyExistsException;
import org.opensearch.Version;
import org.opensearch.action.ActionListenerResponseHandler;
import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionType;
import org.opensearch.action.DocWriteResponse;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.update.UpdateResponse;
import org.opensearch.ad.ADTaskProfileRunner;
import org.opensearch.ad.constant.ADCommonMessages;
import org.opensearch.ad.indices.ADIndex;
import org.opensearch.ad.indices.ADIndexManagement;
import org.opensearch.ad.model.ADTask;
import org.opensearch.ad.model.ADTaskAction;
import org.opensearch.ad.model.ADTaskProfile;
import org.opensearch.ad.model.ADTaskType;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.settings.AnomalyDetectorSettings;
import org.opensearch.ad.task.ADTaskCacheManager;
import org.opensearch.ad.task.ADTaskCancellationState;
import org.opensearch.ad.transport.ADBatchAnomalyResultAction;
import org.opensearch.ad.transport.ADBatchAnomalyResultRequest;
import org.opensearch.ad.transport.ADCancelTaskAction;
import org.opensearch.ad.transport.ADCancelTaskRequest;
import org.opensearch.ad.transport.ADStatsNodesAction;
import org.opensearch.ad.transport.ForwardADTaskAction;
import org.opensearch.ad.transport.ForwardADTaskRequest;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.commons.authuser.User;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.DeprecationHandler;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.core.xcontent.XContentParserUtils;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.query.BoolQueryBuilder;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.index.query.TermQueryBuilder;
import org.opensearch.index.query.TermsQueryBuilder;
import org.opensearch.search.SearchHit;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.search.sort.SortOrder;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.timeseries.AnalysisType;
import org.opensearch.timeseries.NodeStateManager;
import org.opensearch.timeseries.cluster.HashRing;
import org.opensearch.timeseries.common.exception.DuplicateTaskException;
import org.opensearch.timeseries.common.exception.EndRunException;
import org.opensearch.timeseries.common.exception.LimitExceededException;
import org.opensearch.timeseries.common.exception.ResourceNotFoundException;
import org.opensearch.timeseries.common.exception.TaskCancelledException;
import org.opensearch.timeseries.common.exception.TimeSeriesException;
import org.opensearch.timeseries.function.BiCheckedFunction;
import org.opensearch.timeseries.function.ExecutorFunction;
import org.opensearch.timeseries.function.ResponseTransformer;
import org.opensearch.timeseries.model.Config;
import org.opensearch.timeseries.model.DateRange;
import org.opensearch.timeseries.model.Entity;
import org.opensearch.timeseries.model.EntityTaskProfile;
import org.opensearch.timeseries.model.TaskState;
import org.opensearch.timeseries.model.TaskType;
import org.opensearch.timeseries.model.TimeSeriesTask;
import org.opensearch.timeseries.stats.InternalStatNames;
import org.opensearch.timeseries.task.TaskManager;
import org.opensearch.timeseries.transport.JobResponse;
import org.opensearch.timeseries.transport.StatsNodeResponse;
import org.opensearch.timeseries.transport.StatsRequest;
import org.opensearch.timeseries.util.DiscoveryNodeFilterer;
import org.opensearch.timeseries.util.ExceptionUtil;
import org.opensearch.timeseries.util.ParseUtils;
import org.opensearch.timeseries.util.RestHandlerUtils;
import org.opensearch.transport.TransportRequest;
import org.opensearch.transport.TransportRequestOptions;
import org.opensearch.transport.TransportResponseHandler;
import org.opensearch.transport.TransportService;
import org.opensearch.transport.client.Client;

public class ADTaskManager
extends TaskManager<ADTaskCacheManager, ADTaskType, ADTask, ADIndex, ADIndexManagement> {
    public static final String AD_TASK_LEAD_NODE_MODEL_ID = "ad_task_lead_node_model_id";
    public static final String AD_TASK_MAINTAINENCE_NODE_MODEL_ID = "ad_task_maintainence_node_model_id";
    public static final int HC_BATCH_TASK_CACHE_TIMEOUT_IN_MILLIS = 600000;
    public final Logger logger = LogManager.getLogger(this.getClass());
    static final String STATE_INDEX_NOT_EXIST_MSG = "State index does not exist.";
    private final Set<String> retryableErrors = ImmutableSet.of((Object)ADCommonMessages.EXCEED_HISTORICAL_ANALYSIS_LIMIT, (Object)ADCommonMessages.NO_ELIGIBLE_NODE_TO_RUN_DETECTOR);
    private final DiscoveryNodeFilterer nodeFilter;
    private final HashRing hashRing;
    private volatile Integer pieceIntervalSeconds;
    private volatile TransportRequestOptions transportRequestOptions;
    private final Semaphore checkingTaskSlot;
    private volatile Integer maxAdBatchTaskPerNode;
    private volatile Integer maxRunningEntitiesPerDetector;
    private final Semaphore scaleEntityTaskLane;
    private static final int SCALE_ENTITY_TASK_LANE_INTERVAL_IN_MILLIS = 10000;
    private final ADTaskProfileRunner taskProfileRunner;

    public ADTaskManager(Settings settings, ClusterService clusterService, Client client, NamedXContentRegistry xContentRegistry, ADIndexManagement detectionIndices, DiscoveryNodeFilterer nodeFilter, HashRing hashRing, ADTaskCacheManager adTaskCacheManager, ThreadPool threadPool, NodeStateManager nodeStateManager, ADTaskProfileRunner taskProfileRunner) {
        super(adTaskCacheManager, clusterService, client, ".opendistro-anomaly-detection-state", ADTaskType.REALTIME_TASK_TYPES, ADTaskType.HISTORICAL_DETECTOR_TASK_TYPES, Collections.emptyList(), detectionIndices, nodeStateManager, AnalysisType.AD, xContentRegistry, "detector_id", AnomalyDetectorSettings.MAX_OLD_AD_TASK_DOCS_PER_DETECTOR, settings, threadPool, ".opendistro-anomaly-results*", "ad-batch-task-threadpool", AnomalyDetectorSettings.DELETE_AD_RESULT_WHEN_DELETE_DETECTOR, TaskState.STOPPED);
        this.nodeFilter = nodeFilter;
        this.hashRing = hashRing;
        this.pieceIntervalSeconds = (Integer)AnomalyDetectorSettings.BATCH_TASK_PIECE_INTERVAL_SECONDS.get(settings);
        clusterService.getClusterSettings().addSettingsUpdateConsumer(AnomalyDetectorSettings.BATCH_TASK_PIECE_INTERVAL_SECONDS, it -> {
            this.pieceIntervalSeconds = it;
        });
        this.maxAdBatchTaskPerNode = (Integer)AnomalyDetectorSettings.MAX_BATCH_TASK_PER_NODE.get(settings);
        clusterService.getClusterSettings().addSettingsUpdateConsumer(AnomalyDetectorSettings.MAX_BATCH_TASK_PER_NODE, it -> {
            this.maxAdBatchTaskPerNode = it;
        });
        this.maxRunningEntitiesPerDetector = (Integer)AnomalyDetectorSettings.MAX_RUNNING_ENTITIES_PER_DETECTOR_FOR_HISTORICAL_ANALYSIS.get(settings);
        clusterService.getClusterSettings().addSettingsUpdateConsumer(AnomalyDetectorSettings.MAX_RUNNING_ENTITIES_PER_DETECTOR_FOR_HISTORICAL_ANALYSIS, it -> {
            this.maxRunningEntitiesPerDetector = it;
        });
        this.transportRequestOptions = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.REG).withTimeout((TimeValue)AnomalyDetectorSettings.AD_REQUEST_TIMEOUT.get(settings)).build();
        clusterService.getClusterSettings().addSettingsUpdateConsumer(AnomalyDetectorSettings.AD_REQUEST_TIMEOUT, it -> {
            this.transportRequestOptions = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.REG).withTimeout(it).build();
        });
        this.checkingTaskSlot = new Semaphore(1);
        this.scaleEntityTaskLane = new Semaphore(1);
        this.taskProfileRunner = taskProfileRunner;
    }

    @Override
    public void startHistorical(Config config, DateRange detectionDateRange, User user, TransportService transportService, ActionListener<JobResponse> listener) {
        ForwardADTaskRequest forwardADTaskRequest = new ForwardADTaskRequest((AnomalyDetector)config, detectionDateRange, user, ADTaskAction.APPLY_FOR_TASK_SLOTS);
        this.forwardRequestToLeadNode(forwardADTaskRequest, transportService, listener);
    }

    public void forwardScaleTaskSlotRequestToLeadNode(ADTask adTask, TransportService transportService, ActionListener<JobResponse> listener) {
        this.forwardRequestToLeadNode(new ForwardADTaskRequest(adTask, ADTaskAction.CHECK_AVAILABLE_TASK_SLOTS), transportService, listener);
    }

    public void forwardRequestToLeadNode(ForwardADTaskRequest forwardADTaskRequest, TransportService transportService, ActionListener<JobResponse> listener) {
        this.hashRing.buildAndGetOwningNodeWithSameLocalVersion(AD_TASK_LEAD_NODE_MODEL_ID, node -> {
            if (!node.isPresent()) {
                listener.onFailure((Exception)new ResourceNotFoundException("Can't find AD task lead node"));
                return;
            }
            transportService.sendRequest((DiscoveryNode)node.get(), ForwardADTaskAction.NAME, (TransportRequest)forwardADTaskRequest, this.transportRequestOptions, (TransportResponseHandler)new ActionListenerResponseHandler(listener, JobResponse::new));
        }, listener);
    }

    public void startHistoricalAnalysis(AnomalyDetector detector, DateRange detectionDateRange, User user, int availableTaskSlots, TransportService transportService, ActionListener<JobResponse> listener) {
        String detectorId = detector.getId();
        this.hashRing.buildAndGetOwningNodeWithSameLocalVersion(detectorId, owningNode -> {
            if (!owningNode.isPresent()) {
                this.logger.debug("Can't find eligible node to run as AD task's coordinating node");
                listener.onFailure((Exception)new OpenSearchStatusException("No eligible node to run detector", RestStatus.INTERNAL_SERVER_ERROR, new Object[0]));
                return;
            }
            this.logger.debug("coordinating node is : {} for detector: {}", (Object)((DiscoveryNode)owningNode.get()).getId(), (Object)detectorId);
            this.forwardDetectRequestToCoordinatingNode(detector, detectionDateRange, user, availableTaskSlots, ADTaskAction.START, transportService, (DiscoveryNode)owningNode.get(), listener);
        }, listener);
    }

    protected void forwardDetectRequestToCoordinatingNode(AnomalyDetector detector, DateRange detectionDateRange, User user, Integer availableTaskSlots, ADTaskAction adTaskAction, TransportService transportService, DiscoveryNode node, ActionListener<JobResponse> listener) {
        Version adVersion = this.hashRing.getVersion(node.getId());
        transportService.sendRequest(node, ForwardADTaskAction.NAME, (TransportRequest)new ForwardADTaskRequest(detector, detectionDateRange, user, adTaskAction, availableTaskSlots, adVersion), this.transportRequestOptions, (TransportResponseHandler)new ActionListenerResponseHandler(listener, JobResponse::new));
    }

    protected void forwardADTaskToCoordinatingNode(ADTask adTask, ADTaskAction adTaskAction, TransportService transportService, ActionListener<JobResponse> listener) {
        this.logger.debug("Forward AD task to coordinating node, task id: {}, action: {}", (Object)adTask.getTaskId(), (Object)adTaskAction.name());
        transportService.sendRequest(this.getCoordinatingNode(adTask), ForwardADTaskAction.NAME, (TransportRequest)new ForwardADTaskRequest(adTask, adTaskAction), this.transportRequestOptions, (TransportResponseHandler)new ActionListenerResponseHandler(listener, JobResponse::new));
    }

    protected void forwardStaleRunningEntitiesToCoordinatingNode(ADTask adTask, ADTaskAction adTaskAction, TransportService transportService, List<String> staleRunningEntity, ActionListener<JobResponse> listener) {
        transportService.sendRequest(this.getCoordinatingNode(adTask), ForwardADTaskAction.NAME, (TransportRequest)new ForwardADTaskRequest(adTask, adTaskAction, staleRunningEntity), this.transportRequestOptions, (TransportResponseHandler)new ActionListenerResponseHandler(listener, JobResponse::new));
    }

    public void checkTaskSlots(ADTask adTask, AnomalyDetector detector, DateRange detectionDateRange, User user, ADTaskAction afterCheckAction, TransportService transportService, ActionListener<JobResponse> listener) {
        String detectorId = detector.getId();
        this.logger.debug("Start checking task slots for detector: {}, task action: {}", (Object)detectorId, (Object)afterCheckAction);
        if (!this.checkingTaskSlot.tryAcquire()) {
            this.logger.info("Can't acquire checking task slot semaphore for detector {}", (Object)detectorId);
            listener.onFailure((Exception)new OpenSearchStatusException("Too many historical analysis requests in short time. Please retry later.", RestStatus.FORBIDDEN, new Object[0]));
            return;
        }
        ActionListener wrappedActionListener = ActionListener.runAfter(listener, () -> {
            this.checkingTaskSlot.release(1);
            this.logger.debug("Release checking task slot semaphore on lead node for detector {}", (Object)detectorId);
        });
        this.hashRing.getNodesWithSameLocalVersion(nodes -> {
            int maxAdTaskSlots = ((DiscoveryNode[])nodes).length * this.maxAdBatchTaskPerNode;
            StatsRequest adStatsRequest = new StatsRequest((DiscoveryNode)nodes);
            adStatsRequest.addAll((Set<String>)ImmutableSet.of((Object)InternalStatNames.AD_USED_BATCH_TASK_SLOT_COUNT.getName(), (Object)InternalStatNames.AD_DETECTOR_ASSIGNED_BATCH_TASK_SLOT_COUNT.getName()));
            this.client.execute((ActionType)ADStatsNodesAction.INSTANCE, (ActionRequest)adStatsRequest, ActionListener.wrap(adStatsResponse -> {
                int totalUsedTaskSlots = 0;
                int totalAssignedTaskSlots = 0;
                for (StatsNodeResponse response : adStatsResponse.getNodes()) {
                    totalUsedTaskSlots += ((Integer)response.getStatsMap().get(InternalStatNames.AD_USED_BATCH_TASK_SLOT_COUNT.getName())).intValue();
                    totalAssignedTaskSlots += ((Integer)response.getStatsMap().get(InternalStatNames.AD_DETECTOR_ASSIGNED_BATCH_TASK_SLOT_COUNT.getName())).intValue();
                }
                this.logger.info("Current total used task slots is {}, total detector assigned task slots is {} when start historical analysis for detector {}", (Object)totalUsedTaskSlots, (Object)totalAssignedTaskSlots, (Object)detectorId);
                int currentUsedTaskSlots = Math.max(totalUsedTaskSlots, totalAssignedTaskSlots);
                if (currentUsedTaskSlots >= maxAdTaskSlots) {
                    wrappedActionListener.onFailure((Exception)new OpenSearchStatusException("No available task slot", RestStatus.BAD_REQUEST, new Object[0]));
                    return;
                }
                int availableAdTaskSlots = maxAdTaskSlots - currentUsedTaskSlots;
                this.logger.info("Current available task slots is {} for historical analysis of detector {}", (Object)availableAdTaskSlots, (Object)detectorId);
                if (ADTaskAction.SCALE_ENTITY_TASK_SLOTS == afterCheckAction) {
                    this.forwardToCoordinatingNode(adTask, detector, detectionDateRange, user, afterCheckAction, transportService, (ActionListener<JobResponse>)wrappedActionListener, availableAdTaskSlots);
                    return;
                }
                int approvedTaskSlots = detector.isHighCardinality() ? Math.min(this.maxRunningEntitiesPerDetector, availableAdTaskSlots) : 1;
                this.forwardToCoordinatingNode(adTask, detector, detectionDateRange, user, afterCheckAction, transportService, (ActionListener<JobResponse>)wrappedActionListener, approvedTaskSlots);
            }, exception -> {
                this.logger.error("Failed to get node's task stats for detector " + detectorId, (Throwable)exception);
                wrappedActionListener.onFailure(exception);
            }));
        }, wrappedActionListener);
    }

    private void forwardToCoordinatingNode(ADTask adTask, AnomalyDetector detector, DateRange detectionDateRange, User user, ADTaskAction targetActionOfTaskSlotChecking, TransportService transportService, ActionListener<JobResponse> wrappedActionListener, int approvedTaskSlots) {
        switch (targetActionOfTaskSlotChecking) {
            case START: {
                this.logger.info("Will assign {} task slots to run historical analysis for detector {}", (Object)approvedTaskSlots, (Object)detector.getId());
                this.startHistoricalAnalysis(detector, detectionDateRange, user, approvedTaskSlots, transportService, wrappedActionListener);
                break;
            }
            case SCALE_ENTITY_TASK_SLOTS: {
                this.logger.info("There are {} task slots available now to scale historical analysis task lane for detector {}", (Object)approvedTaskSlots, (Object)adTask.getConfigId());
                this.scaleTaskLaneOnCoordinatingNode(adTask, approvedTaskSlots, transportService, wrappedActionListener);
                break;
            }
            default: {
                wrappedActionListener.onFailure((Exception)new TimeSeriesException("Unknown task action " + String.valueOf((Object)targetActionOfTaskSlotChecking)));
            }
        }
    }

    protected void scaleTaskLaneOnCoordinatingNode(ADTask adTask, int approvedTaskSlot, TransportService transportService, ActionListener<JobResponse> listener) {
        transportService.sendRequest(this.getCoordinatingNode(adTask), ForwardADTaskAction.NAME, (TransportRequest)new ForwardADTaskRequest(adTask, approvedTaskSlot, ADTaskAction.SCALE_ENTITY_TASK_SLOTS), this.transportRequestOptions, (TransportResponseHandler)new ActionListenerResponseHandler(listener, JobResponse::new));
    }

    private DiscoveryNode getCoordinatingNode(ADTask adTask) {
        try {
            DiscoveryNode[] eligibleDataNodes;
            String coordinatingNodeId = adTask.getCoordinatingNode();
            Version coordinatingNodeVersion = this.hashRing.getVersion(coordinatingNodeId);
            Version localNodeVersion = this.hashRing.getVersion(this.clusterService.localNode().getId());
            if (!this.isSameVersion(coordinatingNodeVersion, localNodeVersion)) {
                throw new ResourceNotFoundException(adTask.getConfigId(), "AD task coordinating node has different version than local node");
            }
            for (DiscoveryNode node : eligibleDataNodes = this.nodeFilter.getEligibleDataNodes()) {
                String nodeId = node.getId();
                if (nodeId == null || !nodeId.equals(coordinatingNodeId)) continue;
                return node;
            }
            throw new ResourceNotFoundException(adTask.getConfigId(), "AD task coordinating node not found");
        }
        catch (Exception e) {
            this.logger.error("Error locating coordinating node", (Throwable)e);
            throw new ResourceNotFoundException(adTask.getConfigId(), "AD task coordinating node not found");
        }
    }

    private boolean isSameVersion(Version version1, Version version2) {
        return version1 == null && version2 == null || version1 != null && version2 != null && version1.compareTo(version2) == 0;
    }

    @Override
    protected TaskType getTaskType(Config config, DateRange detectionDateRange, boolean runOnce) {
        if (detectionDateRange == null) {
            return config.isHighCardinality() ? ADTaskType.REALTIME_HC_DETECTOR : ADTaskType.REALTIME_SINGLE_ENTITY;
        }
        return config.isHighCardinality() ? ADTaskType.HISTORICAL_HC_DETECTOR : ADTaskType.HISTORICAL_SINGLE_ENTITY;
    }

    protected <T> void resetHistoricalConfigTaskState(List<TimeSeriesTask> runningHistoricalTasks, ExecutorFunction function, TransportService transportService, ActionListener<T> listener) {
        if (ParseUtils.isNullOrEmpty(runningHistoricalTasks)) {
            function.execute();
            return;
        }
        ADTask adTask = (ADTask)runningHistoricalTasks.get(0);
        if (!this.lastUpdateTimeOfHistoricalTaskExpired(adTask)) {
            function.execute();
            return;
        }
        String taskId = adTask.getTaskId();
        AnomalyDetector detector = adTask.getDetector();
        this.taskProfileRunner.getTaskProfile(adTask, (ActionListener<ADTaskProfile>)ActionListener.wrap(taskProfile -> {
            boolean taskStopped = this.isTaskStopped(taskId, detector, (ADTaskProfile)taskProfile);
            if (taskStopped) {
                this.logger.debug("Reset task state as stopped, task id: {}", (Object)adTask.getTaskId());
                if (taskProfile.getTaskId() == null && detector.isHighCardinality() && !ParseUtils.isNullOrEmpty(taskProfile.getEntityTaskProfiles())) {
                    this.stopHistoricalAnalysis(adTask.getConfigId(), Optional.of(adTask), null, (ActionListener<JobResponse>)ActionListener.wrap(r -> {
                        this.logger.debug("Restop detector successfully");
                        this.resetTaskStateAsStopped(adTask, function, transportService, listener);
                    }, e -> {
                        this.logger.error("Failed to restop detector ", (Throwable)e);
                        listener.onFailure(e);
                    }));
                } else {
                    this.resetTaskStateAsStopped(adTask, function, transportService, listener);
                }
            } else {
                function.execute();
                if (ADTaskType.HISTORICAL_HC_DETECTOR.name().equals(adTask.getTaskType()) && !ParseUtils.isNullOrEmpty(taskProfile.getRunningEntities()) && this.hcBatchTaskExpired(taskProfile.getLatestHCTaskRunTime())) {
                    ArrayList<String> runningTasksInCoordinatingNodeCache = new ArrayList<String>(taskProfile.getRunningEntities());
                    ArrayList runningTasksOnWorkerNode = new ArrayList();
                    if (taskProfile.getEntityTaskProfiles() != null && taskProfile.getEntityTaskProfiles().size() > 0) {
                        taskProfile.getEntityTaskProfiles().forEach(entryTask -> runningTasksOnWorkerNode.add(this.convertEntityToString(entryTask.getEntity(), detector)));
                    }
                    if (runningTasksInCoordinatingNodeCache.size() > runningTasksOnWorkerNode.size()) {
                        runningTasksInCoordinatingNodeCache.removeAll(runningTasksOnWorkerNode);
                        this.forwardStaleRunningEntitiesToCoordinatingNode(adTask, ADTaskAction.CLEAN_STALE_RUNNING_ENTITIES, transportService, runningTasksInCoordinatingNodeCache, (ActionListener<JobResponse>)ActionListener.wrap(res -> this.logger.debug("Forwarded task to clean stale running entity, task id {}", (Object)taskId), ex -> this.logger.error("Failed to forward clean stale running entity for task " + taskId, (Throwable)ex)));
                    }
                }
            }
        }, e -> {
            this.logger.error("Failed to get AD task profile for task " + adTask.getTaskId(), (Throwable)e);
            function.execute();
        }));
    }

    private boolean isTaskStopped(String taskId, AnomalyDetector detector, ADTaskProfile taskProfile) {
        String detectorId = detector.getId();
        if (taskProfile == null || !Objects.equals(taskId, taskProfile.getTaskId())) {
            this.logger.debug("AD task not found for task {} detector {}", (Object)taskId, (Object)detectorId);
            return true;
        }
        if (!detector.isHighCardinality() && taskProfile.getNodeId() == null) {
            this.logger.debug("AD task not running for single entity detector {}, task {}", (Object)detectorId, (Object)taskId);
            return true;
        }
        if (detector.isHighCardinality() && taskProfile.getTotalEntitiesInited() && ParseUtils.isNullOrEmpty(taskProfile.getRunningEntities()) && ParseUtils.isNullOrEmpty(taskProfile.getEntityTaskProfiles()) && this.hcBatchTaskExpired(taskProfile.getLatestHCTaskRunTime())) {
            this.logger.debug("AD task not running for HC detector {}, task {}", (Object)detectorId, (Object)taskId);
            return true;
        }
        return false;
    }

    public boolean hcBatchTaskExpired(Long latestHCTaskRunTime) {
        if (latestHCTaskRunTime == null) {
            return true;
        }
        return latestHCTaskRunTime + 600000L < Instant.now().toEpochMilli();
    }

    public void stopHistoricalAnalysis(String detectorId, Optional<ADTask> adTask, User user, ActionListener<JobResponse> listener) {
        if (!adTask.isPresent()) {
            listener.onFailure((Exception)new ResourceNotFoundException(detectorId, "Detector not started"));
            return;
        }
        if (adTask.get().isDone()) {
            listener.onFailure((Exception)new ResourceNotFoundException(detectorId, "No running task found"));
            return;
        }
        String taskId = adTask.get().getTaskId();
        DiscoveryNode[] dataNodes = this.hashRing.getNodesWithSameLocalVersion();
        String userName = user == null ? null : user.getName();
        ADCancelTaskRequest cancelTaskRequest = new ADCancelTaskRequest(detectorId, taskId, userName, dataNodes);
        this.client.execute((ActionType)ADCancelTaskAction.INSTANCE, (ActionRequest)cancelTaskRequest, ActionListener.wrap(response -> listener.onResponse((Object)new JobResponse(taskId)), e -> {
            this.logger.error("Failed to cancel AD task " + taskId + ", detector id: " + detectorId, (Throwable)e);
            listener.onFailure(e);
        }));
    }

    private boolean lastUpdateTimeOfHistoricalTaskExpired(TimeSeriesTask adTask) {
        int waitingTime = Math.max(2 * this.pieceIntervalSeconds, 10);
        return adTask.getLastUpdateTime().plus((long)waitingTime, ChronoUnit.SECONDS).isBefore(Instant.now());
    }

    @Override
    protected boolean isHistoricalHCTask(TimeSeriesTask task) {
        return ADTaskType.HISTORICAL_HC_DETECTOR.name().equals(task.getTaskType());
    }

    @Override
    public <T> void cleanConfigCache(TimeSeriesTask adTask, TransportService transportService, ExecutorFunction function, ActionListener<T> listener) {
        String coordinatingNode = adTask.getCoordinatingNode();
        String detectorId = adTask.getConfigId();
        String taskId = adTask.getTaskId();
        try {
            this.forwardADTaskToCoordinatingNode((ADTask)adTask, ADTaskAction.CLEAN_CACHE, transportService, (ActionListener<JobResponse>)ActionListener.wrap(r -> function.execute(), e -> {
                this.logger.error("Failed to clear detector cache on coordinating node " + coordinatingNode, (Throwable)e);
                listener.onFailure(e);
            }));
        }
        catch (ResourceNotFoundException e2) {
            this.logger.warn("Task coordinating node left cluster or has different software version, taskId: {}, detectorId: {}, coordinatingNode: {}", (Object)taskId, (Object)detectorId, (Object)coordinatingNode);
            function.execute();
        }
        catch (Exception e3) {
            this.logger.error("Failed to forward clean cache event for detector " + detectorId + ", task " + taskId, (Throwable)e3);
            listener.onFailure(e3);
        }
    }

    protected void cleanDetectorCache(ADTask adTask, TransportService transportService, ExecutorFunction function) {
        String detectorId = adTask.getConfigId();
        String taskId = adTask.getTaskId();
        this.cleanConfigCache(adTask, transportService, function, ActionListener.wrap(r -> this.logger.debug("Successfully cleaned cache for detector {}, task {}", (Object)detectorId, (Object)taskId), e -> this.logger.error("Failed to clean cache for detector " + detectorId + ", task " + taskId, (Throwable)e)));
    }

    @Override
    protected <T> void createNewTask(Config config, DateRange detectionDateRange, boolean runOnce, User user, String coordinatingNode, TaskState initialState, ActionListener<T> listener) {
        String userName = user == null ? null : user.getName();
        Instant now = Instant.now();
        String taskType = this.getTaskType(config, detectionDateRange, runOnce).name();
        ADTask adTask = ((ADTask.Builder)((ADTask.Builder)((ADTask.Builder)((ADTask.Builder)((ADTask.Builder)((ADTask.Builder)((ADTask.Builder)((ADTask.Builder)((ADTask.Builder)((ADTask.Builder)((ADTask.Builder)new ADTask.Builder().configId(config.getId())).detector((AnomalyDetector)config).isLatest(true)).taskType(taskType)).executionStartTime(now)).taskProgress(Float.valueOf(0.0f))).initProgress(Float.valueOf(0.0f))).state(TaskState.CREATED.name())).lastUpdateTime(now)).startedBy(userName)).coordinatingNode(coordinatingNode)).detectionDateRange(detectionDateRange).user(user)).build();
        this.createTaskDirectly(adTask, r -> this.onIndexConfigTaskResponse((IndexResponse)r, adTask, (IndexResponse response, ActionListener<T> delegatedListener) -> this.cleanOldConfigTaskDocs((IndexResponse)response, adTask, indexResponse -> new JobResponse(indexResponse.getId()), delegatedListener), listener), listener);
    }

    @Override
    protected <T> void onIndexConfigTaskResponse(IndexResponse response, ADTask adTask, BiConsumer<IndexResponse, ActionListener<T>> function, ActionListener<T> listener) {
        if (response == null || response.getResult() != DocWriteResponse.Result.CREATED) {
            String errorMsg = ExceptionUtil.getShardsFailure(response);
            listener.onFailure((Exception)new OpenSearchStatusException(errorMsg, response.status(), new Object[0]));
            return;
        }
        adTask.setTaskId(response.getId());
        ActionListener delegatedListener = ActionListener.wrap(r -> listener.onResponse(r), e -> {
            this.handleTaskException(adTask, (Exception)e);
            if (e instanceof DuplicateTaskException) {
                listener.onFailure((Exception)new OpenSearchStatusException(ADCommonMessages.DETECTOR_IS_RUNNING, RestStatus.BAD_REQUEST, new Object[0]));
            } else {
                if (adTask.isHistoricalTask()) {
                    ((ADTaskCacheManager)this.taskCacheManager).removeHistoricalTaskCache(adTask.getConfigId());
                }
                listener.onFailure(e);
            }
        });
        try {
            if (adTask.isHistoricalTask()) {
                ((ADTaskCacheManager)this.taskCacheManager).add(adTask.getConfigId(), adTask);
            }
        }
        catch (Exception e2) {
            delegatedListener.onFailure(e2);
            return;
        }
        if (function != null) {
            function.accept(response, delegatedListener);
        }
    }

    @Override
    protected <T> void runBatchResultAction(IndexResponse response, ADTask adTask, ResponseTransformer<IndexResponse, T> responseTransformer, ActionListener<T> listener) {
        this.client.execute((ActionType)ADBatchAnomalyResultAction.INSTANCE, (ActionRequest)new ADBatchAnomalyResultRequest(adTask), ActionListener.wrap(r -> {
            String remoteOrLocal = r.isRunTaskRemotely() ? "remote" : "local";
            this.logger.info("AD task {} of detector {} dispatched to {} node {}", (Object)adTask.getTaskId(), (Object)adTask.getConfigId(), (Object)remoteOrLocal, (Object)r.getNodeId());
            listener.onResponse(responseTransformer.transform(response));
        }, e -> listener.onFailure(e)));
    }

    public ADTaskCancellationState cancelLocalTaskByDetectorId(String detectorId, String detectorTaskId, String reason, String userName) {
        ADTaskCancellationState cancellationState = ((ADTaskCacheManager)this.taskCacheManager).cancelByDetectorId(detectorId, detectorTaskId, reason, userName);
        this.logger.debug("Cancelled AD task for detector: {}, state: {}, cancelled by: {}, reason: {}", (Object)detectorId, (Object)cancellationState, (Object)userName, (Object)reason);
        return cancellationState;
    }

    @Override
    public void initRealtimeTaskCacheAndCleanupStaleCache(String detectorId, Config config, TransportService transportService, ActionListener<Boolean> listener) {
        try {
            if (((ADTaskCacheManager)this.taskCacheManager).getRealtimeTaskCache(detectorId) != null) {
                listener.onResponse((Object)false);
                return;
            }
            AnomalyDetector detector = (AnomalyDetector)config;
            this.getAndExecuteOnLatestConfigLevelTask(detectorId, ADTaskType.REALTIME_TASK_TYPES, adTaskOptional -> {
                if (!adTaskOptional.isPresent()) {
                    this.logger.debug("Can't find realtime task for detector {}, init realtime task cache directly", (Object)detectorId);
                    ExecutorFunction function = () -> this.createNewTask(detector, null, false, detector.getUser(), this.clusterService.localNode().getId(), TaskState.CREATED, ActionListener.wrap(r -> {
                        this.logger.info("Recreate realtime task successfully for detector {}", (Object)detectorId);
                        ((ADTaskCacheManager)this.taskCacheManager).initRealtimeTaskCache(detectorId, detector.getIntervalInMilliseconds());
                        listener.onResponse((Object)true);
                    }, e -> {
                        this.logger.error("Failed to recreate realtime task for detector " + detectorId, (Throwable)e);
                        listener.onFailure(e);
                    }));
                    this.recreateRealtimeTask(function, listener);
                    return;
                }
                ADTask adTask = (ADTask)adTaskOptional.get();
                String localNodeId = this.clusterService.localNode().getId();
                String oldCoordinatingNode = adTask.getCoordinatingNode();
                if (oldCoordinatingNode != null && !localNodeId.equals(oldCoordinatingNode)) {
                    this.logger.warn("AD realtime job coordinating node changed from {} to this node {} for detector {}", (Object)oldCoordinatingNode, (Object)localNodeId, (Object)detectorId);
                    this.cleanConfigCache(adTask, transportService, () -> {
                        this.logger.info("Realtime task cache cleaned on old coordinating node {} for detector {}", (Object)oldCoordinatingNode, (Object)detectorId);
                        ((ADTaskCacheManager)this.taskCacheManager).initRealtimeTaskCache(detectorId, detector.getIntervalInMilliseconds());
                        listener.onResponse((Object)true);
                    }, listener);
                } else {
                    this.logger.info("Init realtime task cache for detector {}", (Object)detectorId);
                    ((ADTaskCacheManager)this.taskCacheManager).initRealtimeTaskCache(detectorId, detector.getIntervalInMilliseconds());
                    listener.onResponse((Object)true);
                }
            }, transportService, false, listener);
        }
        catch (Exception e) {
            this.logger.error("Failed to init realtime task cache for " + detectorId, (Throwable)e);
            listener.onFailure(e);
        }
    }

    private void recreateRealtimeTask(ExecutorFunction function, ActionListener<Boolean> listener) {
        if (((ADIndexManagement)this.indexManagement).doesStateIndexExist()) {
            function.execute();
        } else {
            ((ADIndexManagement)this.indexManagement).initStateIndex((ActionListener<CreateIndexResponse>)ActionListener.wrap(r -> {
                if (r.isAcknowledged()) {
                    this.logger.info("Created {} with mappings.", (Object)".opendistro-anomaly-detection-state");
                    function.execute();
                } else {
                    String error = String.format(Locale.ROOT, "Create index %S not acknowledged by OpenSearch core", ".opendistro-anomaly-detection-state");
                    this.logger.warn(error);
                    listener.onFailure((Exception)new OpenSearchStatusException(error, RestStatus.INTERNAL_SERVER_ERROR, new Object[0]));
                }
            }, e -> {
                if (ExceptionsHelper.unwrapCause((Throwable)e) instanceof ResourceAlreadyExistsException) {
                    function.execute();
                } else {
                    this.logger.error("Failed to init anomaly detection state index", (Throwable)e);
                    listener.onFailure(e);
                }
            }));
        }
    }

    protected void entityTaskDone(ADTask adTask, Exception exception, TransportService transportService) {
        this.entityTaskDone(adTask, exception, transportService, (ActionListener<JobResponse>)ActionListener.wrap(r -> this.logger.debug("AD task forwarded to coordinating node, task id {}", (Object)adTask.getTaskId()), e -> this.logger.error("AD task failed to forward to coordinating node " + adTask.getCoordinatingNode() + " for task " + adTask.getTaskId(), (Throwable)e)));
    }

    private void entityTaskDone(ADTask adTask, Exception exception, TransportService transportService, ActionListener<JobResponse> listener) {
        try {
            ADTaskAction action = this.getAdEntityTaskAction(adTask, exception);
            this.forwardADTaskToCoordinatingNode(adTask, action, transportService, listener);
        }
        catch (Exception e) {
            listener.onFailure(e);
        }
    }

    private ADTaskAction getAdEntityTaskAction(ADTask adTask, Exception exception) {
        ADTaskAction action = ADTaskAction.NEXT_ENTITY;
        if (exception != null) {
            adTask.setError(ExceptionUtil.getErrorMessage(exception));
            if (exception instanceof LimitExceededException && this.isRetryableError(exception.getMessage())) {
                action = ADTaskAction.PUSH_BACK_ENTITY;
            } else if (exception instanceof TaskCancelledException || exception instanceof EndRunException) {
                action = ADTaskAction.CANCEL;
            }
        }
        return action;
    }

    public boolean isRetryableError(String error) {
        if (error == null) {
            return false;
        }
        return this.retryableErrors.stream().filter(e -> error.contains((CharSequence)e)).findFirst().isPresent();
    }

    public void setHCDetectorTaskDone(ADTask adTask, TaskState state, ActionListener<JobResponse> listener) {
        String detectorId = adTask.getConfigId();
        String taskId = adTask.isHistoricalEntityTask() ? adTask.getParentTaskId() : adTask.getTaskId();
        String detectorTaskId = adTask.getConfigLevelTaskId();
        ActionListener wrappedListener = ActionListener.wrap(response -> {
            this.logger.info("Historical HC detector done with state: {}. Remove from cache, detector id:{}", (Object)state.name(), (Object)detectorId);
            ((ADTaskCacheManager)this.taskCacheManager).removeHistoricalTaskCache(detectorId);
        }, e -> {
            if (e instanceof LimitExceededException && e.getMessage().contains(ADCommonMessages.HC_DETECTOR_TASK_IS_UPDATING)) {
                this.logger.warn("HC task is updating, skip this update for task: " + taskId);
            } else {
                this.logger.error("Failed to update task: " + taskId, (Throwable)e);
            }
            ((ADTaskCacheManager)this.taskCacheManager).removeHistoricalTaskCache(detectorId);
        });
        long timeoutInMillis = 2000L;
        if (state == TaskState.FINISHED) {
            this.countEntityTasksByState(detectorTaskId, (List<TaskState>)ImmutableList.of((Object)((Object)TaskState.FINISHED)), (ActionListener<Long>)ActionListener.wrap(r -> {
                this.logger.info("number of finished entity tasks: {}, for detector {}", r, (Object)adTask.getConfigId());
                TaskState hcDetectorTaskState = r == 0L ? TaskState.FAILED : TaskState.FINISHED;
                this.threadPool.executor("ad-batch-task-threadpool").execute(() -> this.updateADHCDetectorTask(detectorId, taskId, (Map<String, Object>)ImmutableMap.of((Object)"state", (Object)hcDetectorTaskState.name(), (Object)"task_progress", (Object)1.0, (Object)"execution_end_time", (Object)Instant.now().toEpochMilli()), timeoutInMillis, (ActionListener<UpdateResponse>)wrappedListener));
            }, e -> {
                this.logger.error("Failed to get finished entity tasks", (Throwable)e);
                String errorMessage = ExceptionUtil.getErrorMessage(e);
                this.threadPool.executor("ad-batch-task-threadpool").execute(() -> this.updateADHCDetectorTask(detectorId, taskId, (Map<String, Object>)ImmutableMap.of((Object)"state", (Object)TaskState.FAILED.name(), (Object)"task_progress", (Object)1.0, (Object)"error", (Object)errorMessage, (Object)"execution_end_time", (Object)Instant.now().toEpochMilli()), timeoutInMillis, (ActionListener<UpdateResponse>)wrappedListener));
            }));
        } else {
            this.threadPool.executor("ad-batch-task-threadpool").execute(() -> this.updateADHCDetectorTask(detectorId, taskId, (Map<String, Object>)ImmutableMap.of((Object)"state", (Object)state.name(), (Object)"error", (Object)adTask.getError(), (Object)"execution_end_time", (Object)Instant.now().toEpochMilli()), timeoutInMillis, (ActionListener<UpdateResponse>)wrappedListener));
        }
        listener.onResponse((Object)new JobResponse(taskId));
    }

    public void countEntityTasksByState(String detectorTaskId, List<TaskState> taskStates, ActionListener<Long> listener) {
        BoolQueryBuilder queryBuilder = new BoolQueryBuilder();
        queryBuilder.filter((QueryBuilder)new TermQueryBuilder("parent_task_id", detectorTaskId));
        if (taskStates != null && taskStates.size() > 0) {
            queryBuilder.filter((QueryBuilder)new TermsQueryBuilder("state", (Iterable)taskStates.stream().map(s -> s.name()).collect(Collectors.toList())));
        }
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        sourceBuilder.query((QueryBuilder)queryBuilder);
        sourceBuilder.size(0);
        sourceBuilder.trackTotalHits(true);
        SearchRequest request = new SearchRequest();
        request.source(sourceBuilder);
        request.indices(new String[]{".opendistro-anomaly-detection-state"});
        this.client.search(request, ActionListener.wrap(r -> {
            TotalHits totalHits = r.getHits().getTotalHits();
            listener.onResponse((Object)totalHits.value());
        }, e -> listener.onFailure(e)));
    }

    public void updateADHCDetectorTask(String detectorId, String taskId, Map<String, Object> updatedFields) {
        this.updateADHCDetectorTask(detectorId, taskId, updatedFields, 0L, (ActionListener<UpdateResponse>)ActionListener.wrap(response -> {
            if (response == null) {
                this.logger.debug("Skip updating AD task: {}", (Object)taskId);
            } else if (response.status() == RestStatus.OK) {
                this.logger.debug("Updated AD task successfully: {}, taskId: {}", (Object)response.status(), (Object)taskId);
            } else {
                this.logger.error("Failed to update AD task {}, status: {}", (Object)taskId, (Object)response.status());
            }
        }, e -> {
            if (e instanceof LimitExceededException && e.getMessage().contains(ADCommonMessages.HC_DETECTOR_TASK_IS_UPDATING)) {
                this.logger.warn("AD HC detector task is updating, skip this update for task: " + taskId);
            } else {
                this.logger.error("Failed to update AD HC detector task: " + taskId, (Throwable)e);
            }
        }));
    }

    private void updateADHCDetectorTask(String detectorId, String taskId, Map<String, Object> updatedFields, long timeoutInMillis, ActionListener<UpdateResponse> listener) {
        try {
            if (((ADTaskCacheManager)this.taskCacheManager).tryAcquireTaskUpdatingSemaphore(detectorId, timeoutInMillis)) {
                try {
                    this.updateTask(taskId, updatedFields, (ActionListener<UpdateResponse>)ActionListener.runAfter(listener, () -> ((ADTaskCacheManager)this.taskCacheManager).releaseTaskUpdatingSemaphore(detectorId)));
                }
                catch (Exception e) {
                    this.logger.error("Failed to update detector task " + taskId, (Throwable)e);
                    ((ADTaskCacheManager)this.taskCacheManager).releaseTaskUpdatingSemaphore(detectorId);
                    listener.onFailure(e);
                }
            } else if (!((ADTaskCacheManager)this.taskCacheManager).isHCTaskCoordinatingNode(detectorId)) {
                this.logger.info("HC detector task cache does not exist, detectorId:{}, taskId:{}", (Object)detectorId, (Object)taskId);
                listener.onResponse(null);
            } else {
                this.logger.info("HC detector task is updating, detectorId:{}, taskId:{}", (Object)detectorId, (Object)taskId);
                listener.onFailure((Exception)new LimitExceededException(ADCommonMessages.HC_DETECTOR_TASK_IS_UPDATING));
            }
        }
        catch (Exception e) {
            this.logger.error("Failed to get AD HC detector task updating semaphore " + taskId, (Throwable)e);
            listener.onFailure(e);
        }
    }

    public void runNextEntityForHCADHistorical(ADTask adTask, TransportService transportService, ActionListener<JobResponse> listener) {
        String detectorId = adTask.getConfigId();
        int scaleDelta = this.scaleTaskSlots(adTask, transportService, (ActionListener<JobResponse>)ActionListener.wrap(r -> this.logger.debug("Scale up task slots done for detector {}, task {}", (Object)detectorId, (Object)adTask.getTaskId()), e -> this.logger.error("Failed to scale up task slots for task " + adTask.getTaskId(), (Throwable)e)));
        if (scaleDelta < 0) {
            this.logger.warn("Have scaled down task slots. Will not poll next entity for detector {}, task {}, task slots: {}", (Object)detectorId, (Object)adTask.getTaskId(), (Object)((ADTaskCacheManager)this.taskCacheManager).getDetectorTaskSlots(detectorId));
            listener.onResponse((Object)new JobResponse(detectorId));
            return;
        }
        this.client.execute((ActionType)ADBatchAnomalyResultAction.INSTANCE, (ActionRequest)new ADBatchAnomalyResultRequest(adTask), ActionListener.wrap(r -> {
            String remoteOrLocal = r.isRunTaskRemotely() ? "remote" : "local";
            this.logger.info("AD entity task {} of detector {} dispatched to {} node {}", (Object)adTask.getTaskId(), (Object)detectorId, (Object)remoteOrLocal, (Object)r.getNodeId());
            JobResponse anomalyDetectorJobResponse = new JobResponse(detectorId);
            listener.onResponse((Object)anomalyDetectorJobResponse);
        }, e -> listener.onFailure(e)));
    }

    protected int scaleTaskSlots(ADTask adTask, TransportService transportService, ActionListener<JobResponse> scaleUpListener) {
        String detectorId = adTask.getConfigId();
        if (!this.scaleEntityTaskLane.tryAcquire()) {
            this.logger.debug("Can't get scaleEntityTaskLane semaphore");
            return 0;
        }
        try {
            int scaleDelta = this.detectorTaskSlotScaleDelta(detectorId);
            this.logger.debug("start to scale task slots for detector {} with delta {}", (Object)detectorId, (Object)scaleDelta);
            if (((ADTaskCacheManager)this.taskCacheManager).getAvailableNewEntityTaskLanes(detectorId) <= 0 && scaleDelta > 0) {
                Instant lastScaleEntityTaskLaneTime = ((ADTaskCacheManager)this.taskCacheManager).getLastScaleEntityTaskLaneTime(detectorId);
                if (lastScaleEntityTaskLaneTime == null) {
                    this.logger.debug("lastScaleEntityTaskLaneTime is null for detector {}", (Object)detectorId);
                    this.scaleEntityTaskLane.release();
                    return 0;
                }
                boolean lastScaleTimeExpired = lastScaleEntityTaskLaneTime.plusMillis(10000L).isBefore(Instant.now());
                if (lastScaleTimeExpired) {
                    ((ADTaskCacheManager)this.taskCacheManager).refreshLastScaleEntityTaskLaneTime(detectorId);
                    this.logger.debug("Forward scale entity task lane request to lead node for detector {}", (Object)detectorId);
                    this.forwardScaleTaskSlotRequestToLeadNode(adTask, transportService, (ActionListener<JobResponse>)ActionListener.runAfter(scaleUpListener, () -> this.scaleEntityTaskLane.release()));
                } else {
                    this.logger.debug("lastScaleEntityTaskLaneTime is not expired yet: {} for detector {}", (Object)lastScaleEntityTaskLaneTime, (Object)detectorId);
                    this.scaleEntityTaskLane.release();
                }
            } else {
                if (scaleDelta < 0) {
                    int runningEntityCount = ((ADTaskCacheManager)this.taskCacheManager).getRunningEntityCount(detectorId) + ((ADTaskCacheManager)this.taskCacheManager).getTempEntityCount(detectorId);
                    int assignedTaskSlots = ((ADTaskCacheManager)this.taskCacheManager).getDetectorTaskSlots(detectorId);
                    int scaleDownDelta = Math.min(assignedTaskSlots - runningEntityCount, 0 - scaleDelta);
                    this.logger.debug("Scale down task slots, scaleDelta: {}, assignedTaskSlots: {}, runningEntityCount: {}, scaleDownDelta: {}", (Object)scaleDelta, (Object)assignedTaskSlots, (Object)runningEntityCount, (Object)scaleDownDelta);
                    ((ADTaskCacheManager)this.taskCacheManager).scaleDownHCDetectorTaskSlots(detectorId, scaleDownDelta);
                }
                this.scaleEntityTaskLane.release();
            }
            return scaleDelta;
        }
        catch (Exception e) {
            this.logger.error("Failed to forward scale entity task lane request to lead node for detector " + detectorId, (Throwable)e);
            this.scaleEntityTaskLane.release();
            return 0;
        }
    }

    public int detectorTaskSlotScaleDelta(String detectorId) {
        DiscoveryNode[] eligibleDataNodes = this.hashRing.getNodesWithSameLocalVersion();
        int unfinishedEntities = ((ADTaskCacheManager)this.taskCacheManager).getUnfinishedEntityCount(detectorId);
        int totalTaskSlots = eligibleDataNodes.length * this.maxAdBatchTaskPerNode;
        int taskLaneLimit = Math.min(unfinishedEntities, Math.min(totalTaskSlots, this.maxRunningEntitiesPerDetector));
        ((ADTaskCacheManager)this.taskCacheManager).setDetectorTaskLaneLimit(detectorId, taskLaneLimit);
        int assignedTaskSlots = ((ADTaskCacheManager)this.taskCacheManager).getDetectorTaskSlots(detectorId);
        int scaleDelta = taskLaneLimit - assignedTaskSlots;
        this.logger.debug("Calculate task slot scale delta for detector {}, totalTaskSlots: {}, maxRunningEntitiesPerDetector: {}, unfinishedEntities: {}, taskLaneLimit: {}, assignedTaskSlots: {}, scaleDelta: {}", (Object)detectorId, (Object)totalTaskSlots, (Object)this.maxRunningEntitiesPerDetector, (Object)unfinishedEntities, (Object)taskLaneLimit, (Object)assignedTaskSlots, (Object)scaleDelta);
        return scaleDelta;
    }

    public float hcDetectorProgress(String detectorId) {
        int entityCount = ((ADTaskCacheManager)this.taskCacheManager).getTopEntityCount(detectorId);
        int leftEntities = ((ADTaskCacheManager)this.taskCacheManager).getPendingEntityCount(detectorId) + ((ADTaskCacheManager)this.taskCacheManager).getRunningEntityCount(detectorId);
        return 1.0f - (float)leftEntities / (float)entityCount;
    }

    public ADTaskProfile getLocalADTaskProfilesByDetectorId(String detectorId) {
        List<String> tasksOfDetector = ((ADTaskCacheManager)this.taskCacheManager).getTasksOfDetector(detectorId);
        ADTaskProfile detectorTaskProfile = null;
        String localNodeId = this.clusterService.localNode().getId();
        if (((ADTaskCacheManager)this.taskCacheManager).isHCTaskRunning(detectorId)) {
            detectorTaskProfile = new ADTaskProfile();
            if (((ADTaskCacheManager)this.taskCacheManager).isHCTaskCoordinatingNode(detectorId)) {
                detectorTaskProfile.setNodeId(localNodeId);
                detectorTaskProfile.setTaskId(((ADTaskCacheManager)this.taskCacheManager).getDetectorTaskId(detectorId));
                detectorTaskProfile.setDetectorTaskSlots(((ADTaskCacheManager)this.taskCacheManager).getDetectorTaskSlots(detectorId));
                detectorTaskProfile.setTotalEntitiesInited(((ADTaskCacheManager)this.taskCacheManager).topEntityInited(detectorId));
                detectorTaskProfile.setTotalEntitiesCount(((ADTaskCacheManager)this.taskCacheManager).getTopEntityCount(detectorId));
                detectorTaskProfile.setPendingEntitiesCount(((ADTaskCacheManager)this.taskCacheManager).getPendingEntityCount(detectorId));
                detectorTaskProfile.setRunningEntitiesCount(((ADTaskCacheManager)this.taskCacheManager).getRunningEntityCount(detectorId));
                detectorTaskProfile.setRunningEntities(((ADTaskCacheManager)this.taskCacheManager).getRunningEntities(detectorId));
                detectorTaskProfile.setTaskType(ADTaskType.HISTORICAL_HC_DETECTOR.name());
                Instant latestHCTaskRunTime = ((ADTaskCacheManager)this.taskCacheManager).getLatestHCTaskRunTime(detectorId);
                if (latestHCTaskRunTime != null) {
                    detectorTaskProfile.setLatestHCTaskRunTime(latestHCTaskRunTime.toEpochMilli());
                }
            }
            if (tasksOfDetector.size() > 0) {
                ArrayList<EntityTaskProfile> entityTaskProfiles = new ArrayList<EntityTaskProfile>();
                tasksOfDetector.forEach(taskId -> {
                    EntityTaskProfile entityTaskProfile = new EntityTaskProfile(((ADTaskCacheManager)this.taskCacheManager).getTRcfModel((String)taskId).getForest().getTotalUpdates(), ((ADTaskCacheManager)this.taskCacheManager).isThresholdModelTrained((String)taskId), ((ADTaskCacheManager)this.taskCacheManager).getThresholdModelTrainingDataSize((String)taskId), ((ADTaskCacheManager)this.taskCacheManager).getModelSize((String)taskId), localNodeId, ((ADTaskCacheManager)this.taskCacheManager).getEntity((String)taskId), (String)taskId, ADTaskType.HISTORICAL_HC_ENTITY.name());
                    entityTaskProfiles.add(entityTaskProfile);
                });
                detectorTaskProfile.setEntityTaskProfiles(entityTaskProfiles);
            }
        } else {
            if (tasksOfDetector.size() > 1) {
                String error = "Multiple tasks are running for detector: " + detectorId + ". You can stop detector to kill all running tasks.";
                this.logger.error(error);
                throw new LimitExceededException(error);
            }
            if (tasksOfDetector.size() == 1) {
                String taskId2 = tasksOfDetector.get(0);
                detectorTaskProfile = new ADTaskProfile(((ADTaskCacheManager)this.taskCacheManager).getDetectorTaskId(detectorId), ((ADTaskCacheManager)this.taskCacheManager).getTRcfModel(taskId2).getForest().getTotalUpdates(), ((ADTaskCacheManager)this.taskCacheManager).isThresholdModelTrained(taskId2), ((ADTaskCacheManager)this.taskCacheManager).getThresholdModelTrainingDataSize(taskId2), ((ADTaskCacheManager)this.taskCacheManager).getModelSize(taskId2), localNodeId);
                detectorTaskProfile.setDetectorTaskSlots(1);
            }
        }
        this.threadPool.executor("ad-batch-task-threadpool").execute(() -> ((ADTaskCacheManager)this.taskCacheManager).cleanExpiredHCBatchTaskRunStates());
        this.logger.debug("Local AD task profile of detector {}: {}", (Object)detectorId, (Object)detectorTaskProfile);
        return detectorTaskProfile;
    }

    public synchronized void removeStaleRunningEntity(ADTask adTask, String entity, TransportService transportService, ActionListener<JobResponse> listener) {
        String detectorId = adTask.getConfigId();
        boolean removed = ((ADTaskCacheManager)this.taskCacheManager).removeRunningEntity(detectorId, entity);
        if (removed && ((ADTaskCacheManager)this.taskCacheManager).getPendingEntityCount(detectorId) > 0) {
            this.logger.debug("kick off next pending entities");
            this.runNextEntityForHCADHistorical(adTask, transportService, listener);
        } else if (!((ADTaskCacheManager)this.taskCacheManager).hasEntity(detectorId)) {
            this.setHCDetectorTaskDone(adTask, TaskState.STOPPED, listener);
        }
    }

    public String convertEntityToString(ADTask adTask) {
        if (adTask == null || !adTask.isHistoricalEntityTask()) {
            return null;
        }
        AnomalyDetector detector = adTask.getDetector();
        return this.convertEntityToString(adTask.getEntity(), detector);
    }

    public String convertEntityToString(Entity entity, AnomalyDetector detector) {
        if (detector.hasMultipleCategories()) {
            try {
                XContentBuilder builder = entity.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS);
                return BytesReference.bytes((XContentBuilder)builder).utf8ToString();
            }
            catch (IOException e) {
                String error = "Failed to parse entity into string";
                this.logger.debug(error, (Throwable)e);
                throw new TimeSeriesException(error);
            }
        }
        if (detector.isHighCardinality()) {
            String categoryField = detector.getCategoryFields().get(0);
            return entity.getAttributes().get(categoryField);
        }
        return null;
    }

    public Entity parseEntityFromString(String entityValue, ADTask adTask) {
        AnomalyDetector detector = adTask.getDetector();
        if (detector.hasMultipleCategories()) {
            try {
                XContentParser parser = XContentType.JSON.xContent().createParser(this.xContentRegistry, (DeprecationHandler)LoggingDeprecationHandler.INSTANCE, entityValue);
                XContentParserUtils.ensureExpectedToken((XContentParser.Token)XContentParser.Token.START_ARRAY, (XContentParser.Token)parser.nextToken(), (XContentParser)parser);
                return Entity.parse(parser);
            }
            catch (IOException e) {
                String error = "Failed to parse string into entity";
                this.logger.debug(error, (Throwable)e);
                throw new TimeSeriesException(error);
            }
        }
        if (detector.isHighCardinality()) {
            return Entity.createSingleAttributeEntity(detector.getCategoryFields().get(0), entityValue);
        }
        throw new IllegalArgumentException("Fail to parse to Entity for single flow detector");
    }

    public int getLocalAdUsedBatchTaskSlot() {
        return ((ADTaskCacheManager)this.taskCacheManager).getTotalBatchTaskCount();
    }

    public int getLocalAdAssignedBatchTaskSlot() {
        return ((ADTaskCacheManager)this.taskCacheManager).getTotalDetectorTaskSlots();
    }

    public void maintainRunningHistoricalTasks(TransportService transportService, int size) {
        ((ADTaskCacheManager)this.taskCacheManager).cleanExpiredHCBatchTaskRunStates();
        Optional<DiscoveryNode> owningNode = this.hashRing.getOwningNodeWithHighestVersion(AD_TASK_MAINTAINENCE_NODE_MODEL_ID);
        if (!owningNode.isPresent() || !this.clusterService.localNode().getId().equals(owningNode.get().getId())) {
            return;
        }
        this.logger.info("Start to maintain running historical tasks");
        BoolQueryBuilder query = new BoolQueryBuilder();
        query.filter((QueryBuilder)new TermQueryBuilder("is_latest", true));
        query.filter((QueryBuilder)new TermsQueryBuilder("task_type", TaskType.taskTypeToString(ADTaskType.HISTORICAL_DETECTOR_TASK_TYPES)));
        query.filter((QueryBuilder)new TermsQueryBuilder("state", TaskState.NOT_ENDED_STATES));
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        sourceBuilder.query((QueryBuilder)query).sort("last_update_time", SortOrder.DESC).size(size);
        SearchRequest searchRequest = new SearchRequest();
        searchRequest.source(sourceBuilder);
        searchRequest.indices(new String[]{".opendistro-anomaly-detection-state"});
        this.client.search(searchRequest, ActionListener.wrap(r -> {
            if (r == null || r.getHits().getTotalHits() == null || r.getHits().getTotalHits().value() == 0L) {
                return;
            }
            ConcurrentLinkedQueue<ADTask> taskQueue = new ConcurrentLinkedQueue<ADTask>();
            for (SearchHit searchHit : r.getHits()) {
                try {
                    XContentParser parser = RestHandlerUtils.createXContentParserFromRegistry(this.xContentRegistry, searchHit.getSourceRef());
                    try {
                        XContentParserUtils.ensureExpectedToken((XContentParser.Token)XContentParser.Token.START_OBJECT, (XContentParser.Token)parser.nextToken(), (XContentParser)parser);
                        taskQueue.add(ADTask.parse(parser, searchHit.getId()));
                    }
                    finally {
                        if (parser == null) continue;
                        parser.close();
                    }
                }
                catch (Exception e) {
                    this.logger.error("Maintaining running historical task: failed to parse AD task " + searchHit.getId(), (Throwable)e);
                }
            }
            this.maintainRunningHistoricalTask(taskQueue, transportService);
        }, e -> {
            if (e instanceof IndexNotFoundException) {
                this.logger.debug(STATE_INDEX_NOT_EXIST_MSG);
            } else {
                this.logger.error("Failed to search historical tasks in maintaining job", (Throwable)e);
            }
        }));
    }

    private void maintainRunningHistoricalTask(ConcurrentLinkedQueue<ADTask> taskQueue, TransportService transportService) {
        ADTask adTask = taskQueue.poll();
        if (adTask == null) {
            return;
        }
        this.threadPool.schedule(() -> this.resetHistoricalConfigTaskState((List<TimeSeriesTask>)ImmutableList.of((Object)adTask), () -> {
            this.logger.debug("Finished maintaining running historical task {}", (Object)adTask.getTaskId());
            this.maintainRunningHistoricalTask(taskQueue, transportService);
        }, transportService, ActionListener.wrap(r -> this.logger.debug("Reset historical task state done for task {}, detector {}", (Object)adTask.getTaskId(), (Object)adTask.getConfigId()), e -> this.logger.error("Failed to reset historical task state for task " + adTask.getTaskId(), (Throwable)e))), TimeValue.timeValueSeconds((long)DEFAULT_MAINTAIN_INTERVAL_IN_SECONDS), "ad-batch-task-threadpool");
    }

    @Override
    protected BiCheckedFunction<XContentParser, String, ADTask, IOException> getTaskParser() {
        return ADTask::parse;
    }

    @Override
    public void createRunOnceTaskAndCleanupStaleTasks(String configId, Config config, TransportService transportService, ActionListener<ADTask> listener) {
        throw new UnsupportedOperationException("AD has no run once yet");
    }

    @Override
    public List<ADTaskType> getTaskTypes(DateRange dateRange, boolean runOnce) {
        if (dateRange == null) {
            return ADTaskType.REALTIME_TASK_TYPES;
        }
        return ADTaskType.HISTORICAL_DETECTOR_TASK_TYPES;
    }

    @Override
    protected <T> void resetLatestConfigTaskState(List<ADTask> tasks, Consumer<List<ADTask>> function, TransportService transportService, ActionListener<T> listener) {
        ArrayList<TimeSeriesTask> runningHistoricalTasks = new ArrayList<TimeSeriesTask>();
        ArrayList<TimeSeriesTask> runningRealtimeTasks = new ArrayList<TimeSeriesTask>();
        for (TimeSeriesTask timeSeriesTask : tasks) {
            if (timeSeriesTask.isHistoricalEntityTask() || timeSeriesTask.isDone()) continue;
            if (timeSeriesTask.isRealTimeTask()) {
                runningRealtimeTasks.add(timeSeriesTask);
                continue;
            }
            if (!timeSeriesTask.isHistoricalTask()) continue;
            runningHistoricalTasks.add(timeSeriesTask);
        }
        this.resetHistoricalConfigTaskState(runningHistoricalTasks, () -> this.resetRealtimeConfigTaskState(runningRealtimeTasks, () -> function.accept(tasks), transportService, listener), transportService, listener);
    }

    @Override
    protected String triageState(Boolean hasResult, String error, Long rcfTotalUpdates) {
        if (hasResult != null && hasResult.booleanValue()) {
            return TaskState.RUNNING.name();
        }
        if (rcfTotalUpdates == null || rcfTotalUpdates < 32L) {
            return TaskState.INIT.name();
        }
        return TaskState.RUNNING.name();
    }

    @Override
    protected boolean forbidOverrideChange(String configId, String newState, String oldState) {
        return TaskState.INIT.name().equals(newState) && TaskState.RUNNING.name().equals(oldState);
    }
}

