/*
 * Decompiled with CFR 0.152.
 */
package discord4j.gateway;

import discord4j.common.GitProperties;
import discord4j.common.LogUtil;
import discord4j.common.ResettableInterval;
import discord4j.common.close.CloseException;
import discord4j.common.close.CloseStatus;
import discord4j.common.close.DisconnectBehavior;
import discord4j.common.operator.RateLimitOperator;
import discord4j.common.retry.ReconnectContext;
import discord4j.common.retry.ReconnectOptions;
import discord4j.common.sinks.EmissionStrategy;
import discord4j.discordjson.json.gateway.Dispatch;
import discord4j.discordjson.json.gateway.Heartbeat;
import discord4j.discordjson.json.gateway.Hello;
import discord4j.discordjson.json.gateway.Identify;
import discord4j.discordjson.json.gateway.ImmutableHeartbeat;
import discord4j.discordjson.json.gateway.ImmutableIdentify;
import discord4j.discordjson.json.gateway.ImmutableIdentifyProperties;
import discord4j.discordjson.json.gateway.ImmutableResume;
import discord4j.discordjson.json.gateway.InvalidSession;
import discord4j.discordjson.json.gateway.Opcode;
import discord4j.discordjson.json.gateway.PayloadData;
import discord4j.discordjson.json.gateway.Ready;
import discord4j.discordjson.json.gateway.Resumed;
import discord4j.discordjson.possible.Possible;
import discord4j.gateway.GatewayClient;
import discord4j.gateway.GatewayConnection;
import discord4j.gateway.GatewayObserver;
import discord4j.gateway.GatewayOptions;
import discord4j.gateway.GatewayReactorResources;
import discord4j.gateway.GatewayWebsocketHandler;
import discord4j.gateway.IdentifyOptions;
import discord4j.gateway.PayloadHandler;
import discord4j.gateway.SessionInfo;
import discord4j.gateway.json.GatewayPayload;
import discord4j.gateway.limiter.PayloadTransformer;
import discord4j.gateway.payload.PayloadReader;
import discord4j.gateway.payload.PayloadWriter;
import discord4j.gateway.retry.GatewayException;
import discord4j.gateway.retry.GatewayRetrySpec;
import discord4j.gateway.retry.GatewayStateChange;
import discord4j.gateway.retry.InvalidSessionException;
import discord4j.gateway.retry.ReconnectException;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.QueryStringDecoder;
import io.netty.util.IllegalReferenceCountException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.function.TupleUtils;
import reactor.netty.ConnectionObserver;
import reactor.netty.http.client.HttpClient;
import reactor.netty.http.client.WebsocketClientSpec;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.concurrent.Queues;
import reactor.util.context.Context;
import reactor.util.context.ContextView;
import reactor.util.retry.Retry;

public class DefaultGatewayClient
implements GatewayClient {
    private static final Logger log = Loggers.getLogger(DefaultGatewayClient.class);
    private static final Logger senderLog = Loggers.getLogger("discord4j.gateway.protocol.sender");
    private static final Logger receiverLog = Loggers.getLogger("discord4j.gateway.protocol.receiver");
    private final GatewayReactorResources reactorResources;
    private final PayloadReader payloadReader;
    private final PayloadWriter payloadWriter;
    private final ReconnectOptions reconnectOptions;
    private final ReconnectContext reconnectContext;
    private final IdentifyOptions identifyOptions;
    private final String token;
    private final GatewayObserver observer;
    private final PayloadTransformer identifyLimiter;
    private final ResettableInterval heartbeatEmitter;
    private final int maxMissedHeartbeatAck;
    private final boolean unpooled;
    private final EmissionStrategy emissionStrategy;
    private final Map<Opcode<?>, PayloadHandler<?>> handlerMap = new HashMap();
    private final HttpClient httpClient;
    private final Sinks.Many<ByteBuf> receiver;
    private final Sinks.Many<ByteBuf> sender;
    private final Sinks.Many<Dispatch> dispatch;
    private final Sinks.Many<GatewayPayload<?>> outbound;
    private final Sinks.Many<GatewayPayload<Heartbeat>> heartbeats;
    private final Sinks.Many<GatewayConnection.State> state;
    private final AtomicInteger sequence = new AtomicInteger(0);
    private final AtomicReference<String> sessionId = new AtomicReference<String>("");
    private final AtomicReference<String> resumeUrl = new AtomicReference();
    private final AtomicLong lastSent = new AtomicLong(0L);
    private final AtomicLong lastAck = new AtomicLong(0L);
    private final AtomicInteger missedAck = new AtomicInteger(0);
    private volatile long responseTime = 0L;
    private volatile Sinks.One<CloseStatus> disconnectNotifier;
    private volatile GatewayWebsocketHandler sessionHandler;
    private volatile ContextView currentContext;
    private static final String OUTBOUND_CAPACITY_PROPERTY = "discord4j.gateway.outbound.capacity";

    public DefaultGatewayClient(GatewayOptions options) {
        this.token = Objects.requireNonNull(options.getToken());
        this.reactorResources = Objects.requireNonNull(options.getReactorResources());
        this.payloadReader = Objects.requireNonNull(options.getPayloadReader());
        this.payloadWriter = Objects.requireNonNull(options.getPayloadWriter());
        this.reconnectOptions = options.getReconnectOptions();
        this.reconnectContext = new ReconnectContext(this.reconnectOptions.getFirstBackoff(), this.reconnectOptions.getMaxBackoffInterval());
        this.identifyOptions = Objects.requireNonNull(options.getIdentifyOptions());
        this.observer = options.getInitialObserver();
        this.identifyLimiter = Objects.requireNonNull(options.getIdentifyLimiter());
        this.maxMissedHeartbeatAck = Math.max(0, options.getMaxMissedHeartbeatAck());
        this.unpooled = options.isUnpooled();
        this.emissionStrategy = options.getEmissionStrategy();
        this.addHandler(Opcode.DISPATCH, this::handleDispatch);
        this.addHandler(Opcode.HEARTBEAT, this::handleHeartbeat);
        this.addHandler(Opcode.RECONNECT, this::handleReconnect);
        this.addHandler(Opcode.INVALID_SESSION, this::handleInvalidSession);
        this.addHandler(Opcode.HELLO, this::handleHello);
        this.addHandler(Opcode.HEARTBEAT_ACK, this::handleHeartbeatAck);
        this.httpClient = this.initHttpClient();
        this.receiver = DefaultGatewayClient.newEmitterSink();
        this.sender = DefaultGatewayClient.newEmitterSink();
        this.dispatch = DefaultGatewayClient.newEmitterSink();
        this.outbound = DefaultGatewayClient.newEmitterSink();
        this.heartbeats = DefaultGatewayClient.newEmitterSink();
        this.heartbeatEmitter = new ResettableInterval(this.reactorResources.getTimerTaskScheduler());
        SessionInfo resumeSession = this.identifyOptions.getResumeSession().orElse(null);
        if (resumeSession != null) {
            this.sequence.set(resumeSession.getSequence());
            this.sessionId.set(resumeSession.getId());
            this.state = Sinks.many().replay().latestOrDefault(GatewayConnection.State.START_RESUMING);
        } else {
            this.state = Sinks.many().replay().latestOrDefault(GatewayConnection.State.START_IDENTIFYING);
        }
    }

    private <T extends PayloadData> void addHandler(Opcode<T> op, PayloadHandler<T> handler) {
        this.handlerMap.put(op, handler);
    }

    private static <T> Sinks.Many<T> newEmitterSink() {
        return Sinks.many().multicast().onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false);
    }

    @Override
    public Mono<Void> execute(String gatewayUrl) {
        return Mono.deferContextual(context -> {
            this.currentContext = context;
            this.disconnectNotifier = Sinks.one();
            this.lastAck.set(0L);
            this.lastSent.set(0L);
            this.missedAck.set(0);
            Sinks.Empty ping = Sinks.empty();
            Mono<Void> onConnected = this.state.asFlux().filter(s2 -> s2 == GatewayConnection.State.CONNECTED).next().then();
            Flux heartbeatFlux = this.heartbeats.asFlux().flatMap(payload -> Flux.from(this.payloadWriter.write((GatewayPayload<?>)payload)));
            Flux<ByteBuf> identifyFlux = this.outbound.asFlux().filter(payload -> Opcode.IDENTIFY.equals(payload.getOp())).delayUntil(__ -> ping.asMono()).flatMap(payload -> Flux.from(this.payloadWriter.write((GatewayPayload<?>)payload))).transform(this.identifyLimiter);
            Flux resumeFlux = this.outbound.asFlux().filter(payload -> Opcode.RESUME.equals(payload.getOp())).flatMap(payload -> Flux.from(this.payloadWriter.write((GatewayPayload<?>)payload)));
            Flux payloadFlux = this.outbound.asFlux().filter(DefaultGatewayClient::isNotStartupPayload).delayUntil(__ -> onConnected).flatMap(payload -> Flux.from(this.payloadWriter.write((GatewayPayload<?>)payload))).transform(buf -> Flux.merge(buf, this.sender.asFlux())).transform(new RateLimitOperator(this.outboundLimiterCapacity(), Duration.ofSeconds(60L), this.reactorResources.getTimerTaskScheduler(), this.reactorResources.getPayloadSenderScheduler()));
            Flux<ByteBuf> outFlux = Flux.merge(heartbeatFlux, identifyFlux, resumeFlux, payloadFlux).doOnNext(buf -> this.logPayload(senderLog, (ContextView)context, (ByteBuf)buf)).doOnDiscard(ByteBuf.class, DefaultGatewayClient::safeRelease);
            this.sessionHandler = new GatewayWebsocketHandler(this.receiver, outFlux, (ContextView)context);
            Mono<Void> readyHandler = this.dispatch.asFlux().filter(DefaultGatewayClient::isReadyOrResumed).zipWith(this.state.asFlux().next().repeat()).doOnNext(TupleUtils.consumer((event, currentState) -> {
                ConnectionObserver.State observerState;
                if (currentState == GatewayConnection.State.START_IDENTIFYING || currentState == GatewayConnection.State.START_RESUMING) {
                    log.info(LogUtil.format(context, "Connected to Gateway"));
                    this.emissionStrategy.emitNext(this.dispatch, GatewayStateChange.connected());
                    observerState = GatewayObserver.CONNECTED;
                } else {
                    log.info(LogUtil.format(context, "Reconnected to Gateway"));
                    this.emissionStrategy.emitNext(this.dispatch, GatewayStateChange.retrySucceeded(this.reconnectContext.getAttempts()));
                    observerState = GatewayObserver.RETRY_SUCCEEDED;
                }
                this.reconnectContext.reset();
                this.state.emitNext(GatewayConnection.State.CONNECTED, Sinks.EmitFailureHandler.FAIL_FAST);
                this.notifyObserver(observerState);
            })).then();
            Mono<Void> receiverFuture = this.receiver.asFlux().map(buf -> this.unpooled ? buf : buf.retain()).doOnNext(buf -> this.logPayload(receiverLog, (ContextView)context, (ByteBuf)buf)).flatMap(this.payloadReader::read).doOnDiscard(ByteBuf.class, DefaultGatewayClient::safeRelease).doOnNext(payload -> {
                if (Opcode.HEARTBEAT_ACK.equals(payload.getOp())) {
                    ping.emitEmpty(Sinks.EmitFailureHandler.FAIL_FAST);
                }
            }).map(this::updateSequence).flatMap(this::handlePayload).then();
            Mono<Void> senderFuture = this.outbound.asFlux().doOnComplete(this.sessionHandler::close).doOnNext(payload -> {
                if (Opcode.RECONNECT.equals(payload.getOp())) {
                    this.sessionHandler.error(new GatewayException((ContextView)context, "Reconnecting due to user action"));
                }
            }).then();
            Mono<Void> heartbeatHandler = this.heartbeatEmitter.ticks().flatMap(t -> {
                long now = System.nanoTime();
                this.lastAck.compareAndSet(0L, now);
                long delay = now - this.lastAck.get();
                if (this.lastSent.get() - this.lastAck.get() > 0L && this.missedAck.incrementAndGet() > this.maxMissedHeartbeatAck) {
                    log.warn(LogUtil.format(context, "Missing heartbeat ACK for {} (tick: {}, seq: {})"), Duration.ofNanos(delay), t, this.sequence.get());
                    this.sessionHandler.error(new GatewayException((ContextView)context, "Reconnecting due to zombie or failed connection"));
                    return Mono.empty();
                }
                log.debug(LogUtil.format(context, "Sending heartbeat {} after last ACK"), Duration.ofNanos(delay));
                this.lastSent.set(now);
                return Mono.just(GatewayPayload.heartbeat(ImmutableHeartbeat.of(this.sequence.get())));
            }).doOnNext(tick -> this.emissionStrategy.emitNext(this.heartbeats, tick)).then();
            Mono<Void> httpFuture = ((HttpClient.WebsocketSender)this.httpClient.websocket(((WebsocketClientSpec.Builder)WebsocketClientSpec.builder().maxFramePayloadLength(Integer.MAX_VALUE)).build()).uri(this.buildGatewayUrl(gatewayUrl))).handle(this.sessionHandler::handle).contextWrite(LogUtil.clearContext()).flatMap(t2 -> this.handleClose((DisconnectBehavior)t2.getT1(), (CloseStatus)t2.getT2())).then();
            return Mono.zip(httpFuture, readyHandler, receiverFuture, senderFuture, heartbeatHandler).doOnError(t -> {
                if (t instanceof ReconnectException) {
                    log.info(LogUtil.format(context, "{}"), t.getMessage());
                } else if (t instanceof CloseException || t instanceof GatewayException) {
                    log.warn(LogUtil.format(context, "{}"), t.toString());
                } else {
                    log.error(LogUtil.format(context, "Gateway client error"), (Throwable)t);
                }
            }).doOnTerminate(this.heartbeatEmitter::stop).doOnCancel(() -> this.sessionHandler.close()).then();
        }).contextWrite(ctx -> ctx.put("discord4j.shard", this.identifyOptions.getShardInfo().getIndex())).retryWhen(this.retryFactory()).then(Mono.defer(() -> this.disconnectNotifier.asMono().then())).doOnSubscribe(s2 -> {
            if (this.disconnectNotifier != null) {
                throw new IllegalStateException("execute can only be subscribed once");
            }
        });
    }

    private String buildGatewayUrl(String identifyGatewayUrl) {
        QueryStringDecoder query = new QueryStringDecoder(identifyGatewayUrl);
        return Optional.ofNullable(this.resumeUrl.get()).map(url -> url + '?' + query.rawQuery()).orElse(identifyGatewayUrl);
    }

    private HttpClient initHttpClient() {
        HttpClient client = this.reactorResources.getHttpClient().headers(headers -> headers.add((CharSequence)HttpHeaderNames.USER_AGENT, (Object)this.initUserAgent()));
        if (this.observer == GatewayObserver.NOOP_LISTENER) {
            return client;
        }
        return client.observe((connection, newState) -> this.notifyObserver(newState));
    }

    private String initUserAgent() {
        Properties properties = GitProperties.getProperties();
        String version = properties.getProperty("git.build.version", "3");
        String url = properties.getProperty("application.url", "https://discord4j.com");
        return "DiscordBot(" + url + ", " + version + ")";
    }

    private void notifyObserver(ConnectionObserver.State state) {
        this.observer.onStateChange(state, this);
    }

    private void logPayload(Logger logger, ContextView context, ByteBuf buf) {
        if (logger.isTraceEnabled()) {
            logger.trace(LogUtil.format(context, buf.toString(StandardCharsets.UTF_8).replaceAll("(\"token\": ?\")([A-Za-z0-9._-]*)(\")", "$1hunter2$3")));
        }
    }

    private static boolean isNotStartupPayload(GatewayPayload<?> payload) {
        return !Opcode.IDENTIFY.equals(payload.getOp()) && !Opcode.RESUME.equals(payload.getOp());
    }

    private static boolean isReadyOrResumed(Dispatch d) {
        return Ready.class.isAssignableFrom(d.getClass()) || Resumed.class.isAssignableFrom(d.getClass());
    }

    private GatewayPayload<?> updateSequence(GatewayPayload<?> payload) {
        if (payload.getSequence() != null) {
            this.sequence.set(payload.getSequence());
            this.notifyObserver(GatewayObserver.SEQUENCE);
        }
        return payload;
    }

    private <T extends PayloadData> Mono<Void> handlePayload(GatewayPayload<T> payload) {
        PayloadHandler<?> handler = this.handlerMap.get(payload.getOp());
        if (handler == null) {
            log.warn(LogUtil.format(this.currentContext, "Handler not found from: {}"), payload);
            return Mono.empty();
        }
        return Mono.defer(() -> handler.handle(payload)).checkpoint("Dispatch handled for OP " + payload.getOp().getRawOp() + " seq " + payload.getSequence() + " type " + payload.getType());
    }

    private Mono<Void> handleDispatch(GatewayPayload<Dispatch> payload) {
        if (payload.getData() instanceof Ready) {
            Ready ready = (Ready)payload.getData();
            this.sessionId.set(ready.sessionId());
            this.resumeUrl.set(ready.resumeGatewayUrl());
        }
        if (payload.getData() != null) {
            this.emissionStrategy.emitNext(this.dispatch, payload.getData());
        }
        return Mono.empty();
    }

    private Mono<Void> handleHeartbeat(GatewayPayload<Heartbeat> payload) {
        log.debug(LogUtil.format(this.currentContext, "Received heartbeat"));
        this.emissionStrategy.emitNext(this.outbound, GatewayPayload.heartbeat(ImmutableHeartbeat.of(this.sequence.get())));
        return Mono.empty();
    }

    private Mono<Void> handleReconnect(GatewayPayload<?> payload) {
        this.sessionHandler.error(new ReconnectException(this.currentContext, "Reconnecting due to reconnect packet received"));
        return Mono.empty();
    }

    private Mono<Void> handleInvalidSession(GatewayPayload<InvalidSession> payload) {
        if (payload.getData().resumable()) {
            this.emissionStrategy.emitNext(this.outbound, GatewayPayload.resume(ImmutableResume.of(this.token, this.sessionId.get(), this.sequence.get())));
        } else {
            this.resumeUrl.set(null);
            this.sessionHandler.error(new InvalidSessionException(this.currentContext, "Reconnecting due to non-resumable session invalidation"));
        }
        return Mono.empty();
    }

    private Mono<Void> handleHello(GatewayPayload<Hello> payload) {
        Duration interval = Duration.ofMillis(payload.getData().heartbeatInterval());
        this.heartbeatEmitter.start(Duration.ZERO, interval);
        return this.state.asFlux().next().doOnNext(state -> {
            if (state == GatewayConnection.State.START_RESUMING || state == GatewayConnection.State.RESUMING) {
                this.doResume(payload);
            } else {
                this.doIdentify(payload);
            }
        }).then();
    }

    private void doResume(GatewayPayload<Hello> payload) {
        log.debug(LogUtil.format(this.currentContext, "Resuming Gateway session from {}"), this.sequence.get());
        this.emissionStrategy.emitNext(this.outbound, GatewayPayload.resume(ImmutableResume.of(this.token, this.sessionId.get(), this.sequence.get())));
    }

    private void doIdentify(GatewayPayload<Hello> payload) {
        ImmutableIdentifyProperties props = ImmutableIdentifyProperties.of(System.getProperty("os.name"), "Discord4J", "Discord4J");
        ImmutableIdentify identify = Identify.builder().token(this.token).intents(this.identifyOptions.getIntents().map(set -> Possible.of(set.getRawValue())).orElse(Possible.absent())).properties(props).compress(false).largeThreshold(this.identifyOptions.getLargeThreshold()).shard(this.identifyOptions.getShardInfo().asArray()).presence(this.identifyOptions.getInitialStatus().map(Possible::of).orElse(Possible.absent())).build();
        log.debug(LogUtil.format(this.currentContext, "Identifying to Gateway"), this.sequence.get());
        this.emissionStrategy.emitNext(this.outbound, GatewayPayload.identify(identify));
    }

    private Mono<Void> handleHeartbeatAck(GatewayPayload<?> context) {
        this.responseTime = this.lastAck.updateAndGet(x -> System.nanoTime()) - this.lastSent.get();
        this.missedAck.set(0);
        log.debug(LogUtil.format(this.currentContext, "Heartbeat acknowledged after {}"), this.getResponseTime());
        return Mono.empty();
    }

    private Retry retryFactory() {
        return GatewayRetrySpec.create(this.reconnectOptions, this.reconnectContext).doBeforeRetry(retry -> {
            this.state.emitNext(retry.nextState(), Sinks.EmitFailureHandler.FAIL_FAST);
            long attempt = retry.iteration();
            Duration backoff = retry.nextBackoff();
            log.debug(LogUtil.format(this.getContextFromException(retry.failure()), "{} in {} (attempts: {})"), new Object[]{retry.nextState(), backoff, attempt});
            if (retry.iteration() == 1L) {
                if (retry.nextState() == GatewayConnection.State.RESUMING) {
                    this.emissionStrategy.emitNext(this.dispatch, GatewayStateChange.retryStarted(backoff));
                    this.notifyObserver(GatewayObserver.RETRY_STARTED);
                } else {
                    this.emissionStrategy.emitNext(this.dispatch, GatewayStateChange.retryStartedResume(backoff));
                    this.notifyObserver(GatewayObserver.RETRY_RESUME_STARTED);
                }
            } else {
                this.emissionStrategy.emitNext(this.dispatch, GatewayStateChange.retryFailed(attempt - 1L, backoff));
                this.notifyObserver(GatewayObserver.RETRY_FAILED);
            }
            if (retry.nextState() == GatewayConnection.State.RECONNECTING) {
                this.emissionStrategy.emitNext(this.dispatch, GatewayStateChange.sessionInvalidated());
            }
        });
    }

    private ContextView getContextFromException(Throwable t) {
        if (t instanceof CloseException) {
            return ((CloseException)t).getContext();
        }
        if (t instanceof GatewayException) {
            return ((GatewayException)t).getContext();
        }
        return Context.empty();
    }

    private Mono<CloseStatus> handleClose(DisconnectBehavior sourceBehavior, CloseStatus closeStatus) {
        return Mono.deferContextual(ctx -> {
            DisconnectBehavior behavior = GatewayRetrySpec.NON_RETRYABLE_STATUS_CODES.contains(closeStatus.getCode()) ? DisconnectBehavior.stop(sourceBehavior.getCause()) : sourceBehavior;
            log.debug(LogUtil.format(ctx, "Closing and {} with status {}"), behavior, closeStatus);
            this.state.emitNext(GatewayConnection.State.DISCONNECTING, Sinks.EmitFailureHandler.FAIL_FAST);
            this.heartbeatEmitter.stop();
            if (behavior.getAction() == DisconnectBehavior.Action.STOP_ABRUPTLY) {
                this.emissionStrategy.emitNext(this.dispatch, GatewayStateChange.disconnectedResume());
                this.notifyObserver(GatewayObserver.DISCONNECTED_RESUME);
            } else if (behavior.getAction() == DisconnectBehavior.Action.STOP) {
                this.emissionStrategy.emitNext(this.dispatch, GatewayStateChange.disconnected(sourceBehavior, closeStatus));
                this.sequence.set(0);
                this.sessionId.set("");
                this.notifyObserver(GatewayObserver.DISCONNECTED);
            }
            switch (behavior.getAction()) {
                case STOP_ABRUPTLY: 
                case STOP: {
                    this.reconnectContext.clear();
                    this.responseTime = 0L;
                    this.lastSent.set(0L);
                    this.lastAck.set(0L);
                    this.state.emitNext(GatewayConnection.State.DISCONNECTED, Sinks.EmitFailureHandler.FAIL_FAST);
                    if (behavior.getCause() != null) {
                        return Mono.just(new CloseException(closeStatus, (ContextView)ctx, behavior.getCause())).flatMap(ex -> {
                            this.disconnectNotifier.emitError((Throwable)ex, Sinks.EmitFailureHandler.FAIL_FAST);
                            return Mono.error(ex);
                        });
                    }
                    return Mono.just(closeStatus).doOnNext(status -> this.disconnectNotifier.emitValue(closeStatus, Sinks.EmitFailureHandler.FAIL_FAST));
                }
            }
            return Mono.error(new CloseException(closeStatus, (ContextView)ctx, behavior.getCause()));
        });
    }

    @Override
    public Mono<CloseStatus> close(boolean allowResume) {
        return Mono.defer(() -> {
            if (this.sessionHandler == null || this.disconnectNotifier == null) {
                return Mono.error(new IllegalStateException("Gateway client is not active!"));
            }
            if (allowResume) {
                this.sessionHandler.close(DisconnectBehavior.stopAbruptly(null));
            } else {
                this.sessionHandler.close(DisconnectBehavior.stop(null));
            }
            return this.disconnectNotifier.asMono();
        });
    }

    @Override
    public Flux<Dispatch> dispatch() {
        return this.dispatch.asFlux();
    }

    @Override
    public Flux<GatewayPayload<?>> receiver() {
        return this.receiver(this.payloadReader::read);
    }

    @Override
    public <T> Flux<T> receiver(Function<ByteBuf, Publisher<? extends T>> mapper) {
        return this.receiver.asFlux().map(ByteBuf::retainedDuplicate).doOnDiscard(ByteBuf.class, DefaultGatewayClient::safeRelease).flatMap(mapper);
    }

    private static void safeRelease(ByteBuf buf) {
        block3: {
            if (buf.refCnt() > 0) {
                try {
                    buf.release();
                }
                catch (IllegalReferenceCountException e) {
                    if (!log.isDebugEnabled()) break block3;
                    log.debug("", e);
                }
            }
        }
    }

    @Override
    public Sinks.Many<GatewayPayload<?>> sender() {
        return this.outbound;
    }

    @Override
    public Mono<Void> sendBuffer(Publisher<ByteBuf> publisher) {
        return Flux.from(publisher).doOnNext(buf -> this.emissionStrategy.emitNext(this.sender, buf)).then();
    }

    @Override
    public int getShardCount() {
        return this.identifyOptions.getShardInfo().getCount();
    }

    @Override
    public String getSessionId() {
        return this.sessionId.get();
    }

    @Override
    public int getSequence() {
        return this.sequence.get();
    }

    @Override
    public Flux<GatewayConnection.State> stateEvents() {
        return this.state.asFlux();
    }

    @Override
    public Mono<Boolean> isConnected() {
        return this.state.asFlux().next().filter(s2 -> s2 == GatewayConnection.State.CONNECTED).hasElement().defaultIfEmpty(false);
    }

    @Override
    public Duration getResponseTime() {
        return Duration.ofNanos(this.responseTime);
    }

    private int outboundLimiterCapacity() {
        String capacityValue = System.getProperty(OUTBOUND_CAPACITY_PROPERTY);
        if (capacityValue != null) {
            try {
                int capacity = Integer.parseInt(capacityValue);
                log.info("Overriding default outbound limiter capacity: {}", capacity);
                return capacity;
            }
            catch (NumberFormatException e) {
                log.warn("Invalid custom outbound limiter capacity: {}", capacityValue);
            }
        }
        return 115;
    }
}

