package uno.anahata.satgyara.concurrent;

import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:uno/anahata/satgyara/concurrent/ParallelFifoProcessor.class */
public abstract class ParallelFifoProcessor<I, O> extends AbstractProcessorThread<I, O> {
    private static final Logger log = LoggerFactory.getLogger(ParallelFifoProcessor.class);
    private final BlockingQueue<ParallelFifoProcessor<I, O>.SequenceMapping> workersIn;
    private final List<ParallelFifoProcessor<I, O>.SequenceMapping> awaitingOut;
    private final List<ParallelFifoProcessor<I, O>.ParallelFifoProcessorWorker> workers;
    private volatile long inSeqNo;
    private volatile long outSeqNo;
    private int threadCount;

    /* loaded from: input_file:uno/anahata/satgyara/concurrent/ParallelFifoProcessor$ParallelFifoProcessorWorker.class */
    private class ParallelFifoProcessorWorker extends AbstractProcessorThread<ParallelFifoProcessor<I, O>.SequenceMapping, ParallelFifoProcessor<I, O>.SequenceMapping> {
        public ParallelFifoProcessorWorker(int i) {
            super(ParallelFifoProcessor.this.workersIn, (BlockingQueue) null);
            setName(ParallelFifoProcessor.this.getName() + "|w#" + i);
            this.tps.setPrintStats(false);
        }

        @Override // uno.anahata.satgyara.concurrent.AbstractProcessorThread
        public ParallelFifoProcessor<I, O>.SequenceMapping process(ParallelFifoProcessor<I, O>.SequenceMapping sequenceMapping) throws Exception {
            ((SequenceMapping) sequenceMapping).output = ParallelFifoProcessor.this.process(((SequenceMapping) sequenceMapping).input);
            return sequenceMapping;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // uno.anahata.satgyara.concurrent.AbstractProcessorThread
        public void doOutput(ParallelFifoProcessor<I, O>.SequenceMapping sequenceMapping) throws Exception {
            synchronized (ParallelFifoProcessor.this.awaitingOut) {
                if (((SequenceMapping) sequenceMapping).seq == ParallelFifoProcessor.this.outSeqNo) {
                    ParallelFifoProcessor.this.addToSendQueue(sequenceMapping);
                } else {
                    ParallelFifoProcessor.this.awaitingOut.add(sequenceMapping);
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // uno.anahata.satgyara.concurrent.AbstractProcessorThread
        public void onExit() throws Exception {
            super.onExit();
            ParallelFifoProcessor.this.interrupt();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:uno/anahata/satgyara/concurrent/ParallelFifoProcessor$SequenceMapping.class */
    public class SequenceMapping {
        private long seq;
        private I input;
        private O output;

        public SequenceMapping(long j, I i) {
            this.seq = j;
            this.input = i;
        }

        public String toString() {
            long j = this.seq;
            I i = this.input;
            O o = this.output;
            return "SequenceMapping{seq=" + j + ", input=" + j + ", output=" + i + "}";
        }
    }

    public ParallelFifoProcessor(int i, int i2, int i3) {
        super(i, i2);
        this.awaitingOut = new ArrayList();
        this.workers = new ArrayList();
        this.inSeqNo = 0L;
        this.outSeqNo = 0L;
        this.threadCount = i3;
        this.workersIn = new LinkedBlockingQueue(i);
    }

    public int getThreadCount() {
        return this.threadCount;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // uno.anahata.satgyara.concurrent.AbstractProcessorThread
    public void onStartup() {
        log.debug("onStartup() ");
        for (int i = 0; i < this.threadCount; i++) {
            ParallelFifoProcessor<I, O>.ParallelFifoProcessorWorker parallelFifoProcessorWorker = new ParallelFifoProcessorWorker(i);
            log.trace(getName() + " onStartup() launcing worker " + parallelFifoProcessorWorker);
            this.workers.add(parallelFifoProcessorWorker);
            parallelFifoProcessorWorker.start();
        }
        log.debug("onStartup() completed");
    }

    @Override // uno.anahata.satgyara.concurrent.AbstractProcessorThread
    public void onExit() {
        log.debug("Interrupting {} workers ", Integer.valueOf(this.workers.size()));
        for (ParallelFifoProcessor<I, O>.ParallelFifoProcessorWorker parallelFifoProcessorWorker : this.workers) {
            log.debug("Interrupting {} " + parallelFifoProcessorWorker);
            parallelFifoProcessorWorker.interrupt();
            log.debug("Interrupted {} " + parallelFifoProcessorWorker);
        }
        log.debug("{} workers interrupted " + this.workers.size());
    }

    public int getAwaitingOutSize() {
        return this.awaitingOut.size();
    }

    @Override // uno.anahata.satgyara.concurrent.AbstractProcessorThread
    protected void doLoop() throws Exception {
        I input = getInput();
        this.tps.startTick();
        synchronized (this.workersIn) {
            long j = this.inSeqNo;
            incInSeqNo();
            this.workersIn.put(new SequenceMapping(j, input));
        }
        this.tps.endTick();
    }

    private long incInSeqNo() {
        long j;
        synchronized (this.workersIn) {
            if (this.inSeqNo == Long.MAX_VALUE) {
                this.inSeqNo = Long.MIN_VALUE;
            } else {
                this.inSeqNo++;
            }
            j = this.inSeqNo;
        }
        return j;
    }

    private long incOutSeqNo() {
        long j;
        synchronized (this.awaitingOut) {
            if (this.outSeqNo == Long.MAX_VALUE) {
                this.outSeqNo = Long.MIN_VALUE;
            } else {
                this.outSeqNo++;
            }
            j = this.outSeqNo;
        }
        return j;
    }

    private void addToSendQueue(ParallelFifoProcessor<I, O>.SequenceMapping sequenceMapping) throws Exception {
        synchronized (this.awaitingOut) {
            this.awaitingOut.remove(sequenceMapping);
            incOutSeqNo();
            doOutput(((SequenceMapping) sequenceMapping).output);
            Iterator<ParallelFifoProcessor<I, O>.SequenceMapping> it = this.awaitingOut.iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                ParallelFifoProcessor<I, O>.SequenceMapping next = it.next();
                if (((SequenceMapping) next).seq == this.outSeqNo) {
                    addToSendQueue(next);
                    break;
                }
            }
        }
    }

    public static void main(String[] strArr) {
        final Random random = new Random();
        ParallelFifoProcessor<Integer, String> parallelFifoProcessor = new ParallelFifoProcessor<Integer, String>(2, 0, 2) { // from class: uno.anahata.satgyara.concurrent.ParallelFifoProcessor.1
            @Override // uno.anahata.satgyara.concurrent.AbstractProcessorThread
            public String process(Integer num) throws Exception {
                int nextInt = random.nextInt(200);
                Thread.sleep(nextInt);
                String str = num + "-" + nextInt;
                System.out.println("processed " + str);
                return str;
            }

            /* JADX INFO: Access modifiers changed from: protected */
            @Override // uno.anahata.satgyara.concurrent.AbstractProcessorThread
            public void doOutput(String str) throws Exception {
                System.out.println(getName() + " Got " + str + " awaiting out= " + getAwaitingOutSize());
            }
        };
        parallelFifoProcessor.start();
        int i = 0;
        while (true) {
            try {
                long currentTimeMillis = System.currentTimeMillis();
                i++;
                parallelFifoProcessor.getInQueue().put(Integer.valueOf(i));
                long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
                PrintStream printStream = System.out;
                parallelFifoProcessor.getInQueue();
                printStream.println("Added " + i + " to inQueue in " + currentTimeMillis2 + " inQueue: " + printStream);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}
