/*
 * Decompiled with CFR 0.152.
 */
package reactor.netty.channel;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufHolder;
import io.netty.channel.Channel;
import io.netty.channel.EventLoop;
import io.netty.util.ReferenceCountUtil;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import javax.annotation.Nullable;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Operators;
import reactor.netty.ReactorNetty;
import reactor.netty.channel.AbortedException;
import reactor.netty.channel.ChannelOperations;
import reactor.util.Logger;
import reactor.util.Loggers;

final class FluxReceive
extends Flux<Object>
implements Subscription,
Disposable {
    static final int QUEUE_LOW_LIMIT = 32;
    final Channel channel;
    final ChannelOperations<?, ?> parent;
    final EventLoop eventLoop;
    CoreSubscriber<? super Object> receiver;
    boolean receiverFastpath;
    long receiverDemand;
    Queue<Object> receiverQueue;
    boolean needRead = true;
    volatile boolean inboundDone;
    Throwable inboundError;
    volatile Disposable receiverCancel;
    volatile int once;
    static final AtomicIntegerFieldUpdater<FluxReceive> ONCE = AtomicIntegerFieldUpdater.newUpdater(FluxReceive.class, "once");
    int wip;
    static final AtomicReferenceFieldUpdater<FluxReceive, Disposable> CANCEL = AtomicReferenceFieldUpdater.newUpdater(FluxReceive.class, Disposable.class, "receiverCancel");
    static final Disposable CANCELLED = () -> {};
    static final Logger log = Loggers.getLogger(FluxReceive.class);

    FluxReceive(ChannelOperations<?, ?> parent) {
        this.parent = parent;
        this.channel = parent.channel();
        this.eventLoop = this.channel.eventLoop();
        this.channel.config().setAutoRead(false);
        CANCEL.lazySet(this, () -> {
            if (this.eventLoop.inEventLoop()) {
                this.unsubscribeReceiver();
            } else {
                this.eventLoop.execute(this::unsubscribeReceiver);
            }
        });
    }

    @Override
    public void cancel() {
        this.cancelReceiver();
        if (this.eventLoop.inEventLoop()) {
            this.drainReceiver();
        } else {
            this.eventLoop.execute(this::drainReceiver);
        }
    }

    final long getPending() {
        return this.receiverQueue != null ? (long)this.receiverQueue.size() : 0L;
    }

    final boolean isCancelled() {
        return this.receiverCancel == CANCELLED;
    }

    @Override
    public void dispose() {
        this.cancel();
    }

    @Override
    public boolean isDisposed() {
        return this.inboundDone && (this.receiverQueue == null || this.receiverQueue.isEmpty());
    }

    @Override
    public void request(long n) {
        if (Operators.validate(n)) {
            if (this.eventLoop.inEventLoop()) {
                this.receiverDemand = Operators.addCap(this.receiverDemand, n);
                this.drainReceiver();
            } else {
                this.eventLoop.execute(() -> {
                    this.receiverDemand = Operators.addCap(this.receiverDemand, n);
                    this.drainReceiver();
                });
            }
        }
    }

    @Override
    public void subscribe(CoreSubscriber<? super Object> s) {
        if (this.eventLoop.inEventLoop()) {
            this.startReceiver(s);
        } else {
            this.eventLoop.execute(() -> this.startReceiver(s));
        }
    }

    final void startReceiver(CoreSubscriber<? super Object> s) {
        if (this.once == 0 && ONCE.compareAndSet(this, 0, 1)) {
            if (log.isDebugEnabled()) {
                log.debug(ReactorNetty.format(this.channel, "{}: subscribing inbound receiver"), this);
            }
            if (this.inboundDone && this.getPending() == 0L) {
                if (this.inboundError != null) {
                    Operators.error(s, this.inboundError);
                    return;
                }
                Operators.complete(s);
                return;
            }
            this.receiver = s;
            s.onSubscribe(this);
        } else if (this.inboundDone && this.getPending() == 0L) {
            if (this.inboundError != null) {
                Operators.error(s, this.inboundError);
                return;
            }
            Operators.complete(s);
        } else {
            Operators.error(s, new IllegalStateException("Only one connection receive subscriber allowed."));
        }
    }

    final boolean cancelReceiver() {
        Disposable c = this.receiverCancel;
        if (c != CANCELLED && (c = CANCEL.getAndSet(this, CANCELLED)) != CANCELLED) {
            c.dispose();
            return true;
        }
        return false;
    }

    final void cleanQueue(@Nullable Queue<Object> q) {
        if (q != null) {
            Object o;
            while ((o = q.poll()) != null) {
                if (log.isDebugEnabled()) {
                    log.debug(ReactorNetty.format(this.channel, "{}: dropping frame {}"), this, o);
                }
                ReferenceCountUtil.release(o);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Unable to fully structure code
     */
    final void drainReceiver() {
        if (this.wip++ != 0) {
            return;
        }
        missed = 1;
        do lbl-1000:
        // 3 sources

        {
            block21: {
                q = this.receiverQueue;
                a = this.receiver;
                d = this.inboundDone;
                if (a != null) break block21;
                if (this.isCancelled()) {
                    this.cleanQueue(q);
                    return;
                }
                if (d && this.getPending() == 0L) {
                    ex = this.inboundError;
                    if (ex != null) {
                        this.parent.listener.onUncaughtException(this.parent, ex);
                    }
                    return;
                }
                if ((missed = (this.wip -= missed)) != 0) ** GOTO lbl-1000
                break;
            }
            r = this.receiverDemand;
            for (e = 0L; e != r; ++e) {
                if (this.isCancelled()) {
                    this.cleanQueue(q);
                    this.terminateReceiver(q, a);
                    return;
                }
                d = this.inboundDone;
                v = q != null ? q.poll() : null;
                v0 = empty = v == null;
                if (d && empty) {
                    this.terminateReceiver(q, a);
                    return;
                }
                if (empty) break;
                try {
                    a.onNext(v);
                    continue;
                }
                finally {
                    try {
                        ReferenceCountUtil.release(v);
                    }
                    catch (Throwable t) {
                        this.inboundError = t;
                        this.cleanQueue(q);
                        this.terminateReceiver(q, a);
                    }
                }
            }
            if (this.isCancelled()) {
                this.cleanQueue(q);
                this.terminateReceiver(q, a);
                return;
            }
            if (this.inboundDone && (q == null || q.isEmpty())) {
                this.terminateReceiver(q, a);
                return;
            }
            if (r == 0x7FFFFFFFFFFFFFFFL) {
                this.receiverFastpath = true;
                if (this.needRead) {
                    this.needRead = false;
                    this.channel.config().setAutoRead(true);
                }
                if ((missed = (this.wip -= missed)) == 0) break;
            }
            if ((this.receiverDemand -= e) > 0L || e > 0L && q.size() < 32) {
                if (!this.needRead) continue;
                this.needRead = false;
                this.channel.config().setAutoRead(true);
                continue;
            }
            if (this.needRead) continue;
            this.needRead = true;
            this.channel.config().setAutoRead(false);
        } while ((missed = (this.wip -= missed)) != 0);
    }

    final void onInboundNext(Object msg) {
        if (this.inboundDone || this.isCancelled()) {
            if (log.isDebugEnabled()) {
                log.debug(ReactorNetty.format(this.channel, "{}: dropping frame {}"), this, msg);
            }
            ReferenceCountUtil.release(msg);
            return;
        }
        if (this.receiverFastpath && this.receiver != null) {
            try {
                if (log.isDebugEnabled()) {
                    if (msg instanceof ByteBuf) {
                        ((ByteBuf)msg).touch(ReactorNetty.format(this.channel, "Unbounded receiver, bypass inbound buffer queue"));
                    } else if (msg instanceof ByteBufHolder) {
                        ((ByteBufHolder)msg).touch(ReactorNetty.format(this.channel, "Unbounded receiver, bypass inbound buffer queue"));
                    }
                }
                this.receiver.onNext(msg);
            }
            finally {
                ReferenceCountUtil.release(msg);
            }
        } else {
            Queue<Object> q = this.receiverQueue;
            if (q == null) {
                this.receiverQueue = q = new ArrayDeque<Object>();
            }
            if (log.isDebugEnabled()) {
                if (msg instanceof ByteBuf) {
                    ((ByteBuf)msg).touch(ReactorNetty.format(this.channel, "Buffered ByteBuf in Inbound Flux Queue"));
                } else if (msg instanceof ByteBufHolder) {
                    ((ByteBufHolder)msg).touch(ReactorNetty.format(this.channel, "Buffered ByteBufHolder in Inbound Flux Queue"));
                }
            }
            q.offer(msg);
            this.drainReceiver();
        }
    }

    final void onInboundComplete() {
        if (this.inboundDone) {
            return;
        }
        this.inboundDone = true;
        if (this.receiverFastpath) {
            CoreSubscriber<? super Object> receiver = this.receiver;
            if (receiver != null) {
                receiver.onComplete();
            }
            return;
        }
        this.drainReceiver();
    }

    /*
     * Unable to fully structure code
     */
    final void onInboundError(Throwable err) {
        if (this.isCancelled() || this.inboundDone) {
            if (FluxReceive.log.isDebugEnabled()) {
                if (AbortedException.isConnectionReset(err)) {
                    FluxReceive.log.debug(ReactorNetty.format(this.channel, "Connection reset has been observed post termination"), err);
                } else {
                    FluxReceive.log.warn(ReactorNetty.format(this.channel, "An exception has been observed post termination"), err);
                }
            } else if (FluxReceive.log.isWarnEnabled() && !AbortedException.isConnectionReset(err)) {
                FluxReceive.log.warn(ReactorNetty.format(this.channel, "An exception has been observed post termination, use DEBUG level to see the full stack: {}"), new Object[]{err.toString()});
            }
            return;
        }
        receiver = this.receiver;
        this.inboundDone = true;
        if (this.channel.isActive()) {
            this.parent.markPersistent(false);
        }
        if (err instanceof OutOfMemoryError) {
            this.inboundError = this.parent.wrapInboundError(err);
            try {
                if (receiver == null) ** GOTO lbl26
                receiver.onError(this.inboundError);
            }
            finally {
                this.parent.terminate();
            }
        } else {
            this.inboundError = err instanceof ClosedChannelException != false ? this.parent.wrapInboundError(err) : err;
        }
lbl26:
        // 3 sources

        if (this.receiverFastpath && receiver != null) {
            receiver.onError(this.inboundError);
        } else {
            this.drainReceiver();
        }
    }

    final void terminateReceiver(@Nullable Queue<?> q, CoreSubscriber<?> a) {
        if (q != null) {
            q.clear();
        }
        Throwable ex = this.inboundError;
        this.receiver = null;
        if (ex != null) {
            a.onError(ex);
        } else {
            a.onComplete();
        }
    }

    final void unsubscribeReceiver() {
        this.receiverDemand = 0L;
        this.receiver = null;
        if (this.isCancelled()) {
            this.parent.onInboundCancel();
        }
    }

    @Override
    public String toString() {
        return "FluxReceive{pending=" + this.getPending() + ", cancelled=" + this.isCancelled() + ", inboundDone=" + this.inboundDone + ", inboundError=" + this.inboundError + '}';
    }
}

