/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.coordinator.group.streams.assignor;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.coordinator.group.streams.assignor.AssignmentMemberSpec;
import org.apache.kafka.coordinator.group.streams.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.streams.assignor.GroupSpec;
import org.apache.kafka.coordinator.group.streams.assignor.MemberAssignment;
import org.apache.kafka.coordinator.group.streams.assignor.ProcessState;
import org.apache.kafka.coordinator.group.streams.assignor.TaskAssignor;
import org.apache.kafka.coordinator.group.streams.assignor.TaskAssignorException;
import org.apache.kafka.coordinator.group.streams.assignor.TaskId;
import org.apache.kafka.coordinator.group.streams.assignor.TopologyDescriber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StickyTaskAssignor
implements TaskAssignor {
    private static final String STICKY_ASSIGNOR_NAME = "sticky";
    private static final Logger log = LoggerFactory.getLogger(StickyTaskAssignor.class);
    private LocalState localState;

    @Override
    public String name() {
        return STICKY_ASSIGNOR_NAME;
    }

    public String toString() {
        return this.name();
    }

    @Override
    public GroupAssignment assign(GroupSpec groupSpec, TopologyDescriber topologyDescriber) throws TaskAssignorException {
        this.initialize(groupSpec, topologyDescriber);
        GroupAssignment assignments = this.doAssign(groupSpec, topologyDescriber);
        this.localState = null;
        return assignments;
    }

    private GroupAssignment doAssign(GroupSpec groupSpec, TopologyDescriber topologyDescriber) {
        LinkedList<TaskId> activeTasks = this.taskIds(topologyDescriber, true);
        this.assignActive(activeTasks);
        if (this.localState.numStandbyReplicas > 0) {
            LinkedList<TaskId> statefulTasks = this.taskIds(topologyDescriber, false);
            this.assignStandby(statefulTasks);
        }
        return this.buildGroupAssignment(groupSpec.members().keySet());
    }

    private LinkedList<TaskId> taskIds(TopologyDescriber topologyDescriber, boolean isActive) {
        LinkedList<TaskId> ret = new LinkedList<TaskId>();
        for (String subtopology : topologyDescriber.subtopologies()) {
            if (!isActive && !topologyDescriber.isStateful(subtopology)) continue;
            int numberOfPartitions = topologyDescriber.maxNumInputPartitions(subtopology);
            for (int i = 0; i < numberOfPartitions; ++i) {
                ret.add(new TaskId(subtopology, i));
            }
        }
        return ret;
    }

    private void initialize(GroupSpec groupSpec, TopologyDescriber topologyDescriber) {
        this.localState = new LocalState();
        this.localState.numStandbyReplicas = groupSpec.assignmentConfigs().isEmpty() ? 0 : Integer.parseInt(groupSpec.assignmentConfigs().get("num.standby.replicas"));
        this.localState.totalActiveTasks = 0;
        this.localState.totalTasks = 0;
        for (String string : topologyDescriber.subtopologies()) {
            int numberOfPartitions = topologyDescriber.maxNumInputPartitions(string);
            this.localState.totalTasks += numberOfPartitions;
            this.localState.totalActiveTasks += numberOfPartitions;
            if (!topologyDescriber.isStateful(string)) continue;
            this.localState.totalTasks += numberOfPartitions * this.localState.numStandbyReplicas;
        }
        this.localState.totalMembersWithActiveTaskCapacity = groupSpec.members().size();
        this.localState.totalMembersWithTaskCapacity = groupSpec.members().size();
        this.localState.activeTasksPerMember = StickyTaskAssignor.computeTasksPerMember(this.localState.totalActiveTasks, this.localState.totalMembersWithActiveTaskCapacity);
        this.localState.totalTasksPerMember = StickyTaskAssignor.computeTasksPerMember(this.localState.totalTasks, this.localState.totalMembersWithTaskCapacity);
        this.localState.processIdToState = new HashMap<String, ProcessState>(this.localState.totalMembersWithActiveTaskCapacity);
        this.localState.activeTaskToPrevMember = new HashMap<TaskId, Member>(this.localState.totalActiveTasks);
        this.localState.standbyTaskToPrevMember = new HashMap<TaskId, ArrayList<Member>>(this.localState.numStandbyReplicas > 0 ? (this.localState.totalTasks - this.localState.totalActiveTasks) / this.localState.numStandbyReplicas : 0);
        for (Map.Entry entry : groupSpec.members().entrySet()) {
            Set<Integer> partitionNoSet;
            String memberId = (String)entry.getKey();
            String processId = ((AssignmentMemberSpec)entry.getValue()).processId();
            Member member = new Member(processId, memberId);
            AssignmentMemberSpec memberSpec = (AssignmentMemberSpec)entry.getValue();
            this.localState.processIdToState.putIfAbsent(processId, new ProcessState(processId));
            this.localState.processIdToState.get(processId).addMember(memberId);
            for (Map.Entry<String, Set<Integer>> entry2 : memberSpec.activeTasks().entrySet()) {
                partitionNoSet = entry2.getValue();
                for (int partitionNo : partitionNoSet) {
                    this.localState.activeTaskToPrevMember.put(new TaskId(entry2.getKey(), partitionNo), member);
                }
            }
            for (Map.Entry<String, Set<Integer>> entry2 : memberSpec.standbyTasks().entrySet()) {
                partitionNoSet = entry2.getValue();
                for (int partitionNo : partitionNoSet) {
                    TaskId taskId = new TaskId(entry2.getKey(), partitionNo);
                    this.localState.standbyTaskToPrevMember.putIfAbsent(taskId, new ArrayList(this.localState.numStandbyReplicas));
                    this.localState.standbyTaskToPrevMember.get(taskId).add(member);
                }
            }
        }
    }

    private GroupAssignment buildGroupAssignment(Set<String> members) {
        HashMap<String, MemberAssignment> memberAssignments = new HashMap<String, MemberAssignment>();
        Map<String, Set> activeTasksAssignments = this.localState.processIdToState.entrySet().stream().flatMap(entry -> ((ProcessState)entry.getValue()).assignedActiveTasksByMember().entrySet().stream()).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (set1, set2) -> {
            set1.addAll(set2);
            return set1;
        }));
        Map<String, Set> standbyTasksAssignments = this.localState.processIdToState.entrySet().stream().flatMap(entry -> ((ProcessState)entry.getValue()).assignedStandbyTasksByMember().entrySet().stream()).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (set1, set2) -> {
            set1.addAll(set2);
            return set1;
        }));
        for (String memberId : members) {
            HashMap<String, Set<Integer>> activeTasks = new HashMap<String, Set<Integer>>();
            if (activeTasksAssignments.containsKey(memberId)) {
                activeTasks.putAll(this.toCompactedTaskIds(activeTasksAssignments.get(memberId)));
            }
            HashMap<String, Set<Integer>> standByTasks = new HashMap<String, Set<Integer>>();
            if (standbyTasksAssignments.containsKey(memberId)) {
                standByTasks.putAll(this.toCompactedTaskIds(standbyTasksAssignments.get(memberId)));
            }
            memberAssignments.put(memberId, new MemberAssignment(activeTasks, standByTasks, new HashMap<String, Set<Integer>>()));
        }
        return new GroupAssignment(memberAssignments);
    }

    private Map<String, Set<Integer>> toCompactedTaskIds(Set<TaskId> taskIds) {
        HashMap<String, Set<Integer>> ret = new HashMap<String, Set<Integer>>();
        for (TaskId taskId : taskIds) {
            ret.putIfAbsent(taskId.subtopologyId(), new HashSet());
            ((Set)ret.get(taskId.subtopologyId())).add(taskId.partition());
        }
        return ret;
    }

    private void assignActive(LinkedList<TaskId> activeTasks) {
        TaskId task;
        activeTasks.sort(Comparator.comparing(TaskId::partition).thenComparing(TaskId::subtopologyId));
        Iterator it = activeTasks.iterator();
        while (it.hasNext()) {
            ProcessState processState;
            task = (TaskId)it.next();
            Member prevMember = this.localState.activeTaskToPrevMember.get(task);
            if (prevMember == null || !this.hasUnfulfilledActiveTaskQuota(processState = this.localState.processIdToState.get(prevMember.processId), prevMember)) continue;
            int newActiveTasks = processState.addTask(prevMember.memberId, task, true);
            this.maybeUpdateActiveTasksPerMember(newActiveTasks);
            this.maybeUpdateTotalTasksPerMember(newActiveTasks);
            it.remove();
        }
        it = activeTasks.iterator();
        while (it.hasNext()) {
            ProcessState processState;
            task = (TaskId)it.next();
            ArrayList<Member> prevMembers = this.localState.standbyTaskToPrevMember.get(task);
            Member prevMember = this.findPrevMemberWithLeastLoad(prevMembers, null);
            if (prevMember == null || !this.hasUnfulfilledActiveTaskQuota(processState = this.localState.processIdToState.get(prevMember.processId), prevMember)) continue;
            int newActiveTasks = processState.addTask(prevMember.memberId, task, true);
            this.maybeUpdateActiveTasksPerMember(newActiveTasks);
            this.maybeUpdateTotalTasksPerMember(newActiveTasks);
            it.remove();
        }
        activeTasks.sort(Comparator.comparing(TaskId::subtopologyId).thenComparing(TaskId::partition));
        PriorityQueue<ProcessState> processByLoad = new PriorityQueue<ProcessState>(Comparator.comparingDouble(ProcessState::load));
        processByLoad.addAll(this.localState.processIdToState.values());
        for (TaskId task2 : activeTasks) {
            ProcessState processWithLeastLoad = processByLoad.poll();
            if (processWithLeastLoad == null) {
                throw new TaskAssignorException(String.format("No process available to assign active task %s.", task2));
            }
            int newTaskCount = processWithLeastLoad.addTaskToLeastLoadedMember(task2, true);
            if (newTaskCount == -1) {
                throw new TaskAssignorException(String.format("No member available to assign active task %s.", task2));
            }
            this.maybeUpdateActiveTasksPerMember(newTaskCount);
            this.maybeUpdateTotalTasksPerMember(newTaskCount);
            processByLoad.add(processWithLeastLoad);
        }
    }

    private void maybeUpdateActiveTasksPerMember(int activeTasksNo) {
        if (activeTasksNo == this.localState.activeTasksPerMember) {
            --this.localState.totalMembersWithActiveTaskCapacity;
            this.localState.totalActiveTasks -= activeTasksNo;
            this.localState.activeTasksPerMember = StickyTaskAssignor.computeTasksPerMember(this.localState.totalActiveTasks, this.localState.totalMembersWithActiveTaskCapacity);
        }
    }

    private void maybeUpdateTotalTasksPerMember(int taskNo) {
        if (taskNo == this.localState.totalTasksPerMember) {
            --this.localState.totalMembersWithTaskCapacity;
            this.localState.totalTasks -= taskNo;
            this.localState.totalTasksPerMember = StickyTaskAssignor.computeTasksPerMember(this.localState.totalTasks, this.localState.totalMembersWithTaskCapacity);
        }
    }

    private boolean assignStandbyToMemberWithLeastLoad(PriorityQueue<ProcessState> queue, TaskId taskId) {
        ProcessState processWithLeastLoad = queue.poll();
        if (processWithLeastLoad == null) {
            return false;
        }
        boolean found = false;
        if (!processWithLeastLoad.hasTask(taskId)) {
            int newTaskCount = processWithLeastLoad.addTaskToLeastLoadedMember(taskId, false);
            if (newTaskCount != -1) {
                found = true;
                this.maybeUpdateTotalTasksPerMember(newTaskCount);
            }
        } else if (!queue.isEmpty()) {
            found = this.assignStandbyToMemberWithLeastLoad(queue, taskId);
        }
        queue.add(processWithLeastLoad);
        return found;
    }

    private Member findPrevMemberWithLeastLoad(ArrayList<Member> members, TaskId taskId) {
        if (members == null || members.isEmpty()) {
            return null;
        }
        Member candidate = members.get(0);
        ProcessState candidateProcessState = this.localState.processIdToState.get(candidate.processId);
        double candidateProcessLoad = candidateProcessState.load();
        double candidateMemberLoad = candidateProcessState.memberToTaskCounts().get(candidate.memberId).intValue();
        for (int i = 1; i < members.size(); ++i) {
            double newMemberLoad;
            Member member = members.get(i);
            ProcessState processState = this.localState.processIdToState.get(member.processId);
            double newProcessLoad = processState.load();
            if (!(newProcessLoad < candidateProcessLoad) || taskId != null && processState.hasTask(taskId) || !((newMemberLoad = (double)processState.memberToTaskCounts().get(member.memberId).intValue()) < candidateMemberLoad)) continue;
            candidateProcessLoad = newProcessLoad;
            candidateMemberLoad = newMemberLoad;
            candidate = member;
        }
        if (taskId == null || !candidateProcessState.hasTask(taskId)) {
            return candidate;
        }
        return null;
    }

    private boolean hasUnfulfilledActiveTaskQuota(ProcessState process, Member member) {
        return process.memberToTaskCounts().get(member.memberId) < this.localState.activeTasksPerMember;
    }

    private boolean hasUnfulfilledTaskQuota(ProcessState process, Member member) {
        return process.memberToTaskCounts().get(member.memberId) < this.localState.totalTasksPerMember;
    }

    private void assignStandby(LinkedList<TaskId> standbyTasks) {
        ArrayList<StandbyToAssign> toLeastLoaded = new ArrayList<StandbyToAssign>(standbyTasks.size() * this.localState.numStandbyReplicas);
        standbyTasks.sort(Comparator.comparing(TaskId::partition).thenComparing(TaskId::subtopologyId).reversed());
        block0: for (TaskId task : standbyTasks) {
            for (int i = 0; i < this.localState.numStandbyReplicas; ++i) {
                ProcessState prevStandbyMemberProcessState;
                Member prevStandbyMember;
                ProcessState prevActiveMemberProcessState;
                Member prevActiveMember = this.localState.activeTaskToPrevMember.get(task);
                if (prevActiveMember != null && !(prevActiveMemberProcessState = this.localState.processIdToState.get(prevActiveMember.processId)).hasTask(task) && this.hasUnfulfilledTaskQuota(prevActiveMemberProcessState, prevActiveMember)) {
                    int newTaskCount = prevActiveMemberProcessState.addTask(prevActiveMember.memberId, task, false);
                    this.maybeUpdateTotalTasksPerMember(newTaskCount);
                    continue;
                }
                ArrayList<Member> prevStandbyMembers = this.localState.standbyTaskToPrevMember.get(task);
                if (prevStandbyMembers != null && !prevStandbyMembers.isEmpty() && (prevStandbyMember = this.findPrevMemberWithLeastLoad(prevStandbyMembers, task)) != null && this.hasUnfulfilledTaskQuota(prevStandbyMemberProcessState = this.localState.processIdToState.get(prevStandbyMember.processId), prevStandbyMember)) {
                    int newTaskCount = prevStandbyMemberProcessState.addTask(prevStandbyMember.memberId, task, false);
                    this.maybeUpdateTotalTasksPerMember(newTaskCount);
                    continue;
                }
                toLeastLoaded.add(new StandbyToAssign(task, this.localState.numStandbyReplicas - i));
                continue block0;
            }
        }
        toLeastLoaded.sort(Comparator.comparing(x -> x.taskId.subtopologyId()).thenComparing(x -> x.taskId.partition()).reversed());
        PriorityQueue<ProcessState> processByLoad = new PriorityQueue<ProcessState>(Comparator.comparingDouble(ProcessState::load));
        processByLoad.addAll(this.localState.processIdToState.values());
        block2: for (StandbyToAssign toAssign : toLeastLoaded) {
            for (int i = 0; i < toAssign.remainingReplicas; ++i) {
                if (this.assignStandbyToMemberWithLeastLoad(processByLoad, toAssign.taskId)) continue;
                log.warn("{} There is not enough available capacity. You should increase the number of threads and/or application instances to maintain the requested number of standby replicas.", (Object)this.errorMessage(this.localState.numStandbyReplicas, i, toAssign.taskId));
                continue block2;
            }
        }
    }

    private String errorMessage(int numStandbyReplicas, int i, TaskId task) {
        return "Unable to assign " + (numStandbyReplicas - i) + " of " + numStandbyReplicas + " standby tasks for task [" + String.valueOf(task) + "].";
    }

    private static int computeTasksPerMember(int numberOfTasks, int numberOfMembers) {
        if (numberOfMembers == 0) {
            return 0;
        }
        int tasksPerMember = numberOfTasks / numberOfMembers;
        if (numberOfTasks % numberOfMembers > 0) {
            ++tasksPerMember;
        }
        return tasksPerMember;
    }

    private static class LocalState {
        Map<TaskId, Member> activeTaskToPrevMember;
        Map<TaskId, ArrayList<Member>> standbyTaskToPrevMember;
        Map<String, ProcessState> processIdToState;
        int numStandbyReplicas;
        int totalActiveTasks;
        int totalTasks;
        int totalMembersWithActiveTaskCapacity;
        int totalMembersWithTaskCapacity;
        int activeTasksPerMember;
        int totalTasksPerMember;

        private LocalState() {
        }
    }

    static class Member {
        private final String processId;
        private final String memberId;

        public Member(String processId, String memberId) {
            this.processId = processId;
            this.memberId = memberId;
        }
    }

    static class StandbyToAssign {
        private final TaskId taskId;
        private final int remainingReplicas;

        public StandbyToAssign(TaskId taskId, int remainingReplicas) {
            this.taskId = taskId;
            this.remainingReplicas = remainingReplicas;
        }
    }
}

