/*
 * Decompiled with CFR 0.152.
 */
package discord4j.voice;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.iwebpp.crypto.TweetNaclFast;
import discord4j.common.LogUtil;
import discord4j.common.ResettableInterval;
import discord4j.common.close.CloseException;
import discord4j.common.close.CloseStatus;
import discord4j.common.close.DisconnectBehavior;
import discord4j.common.retry.ReconnectContext;
import discord4j.common.retry.ReconnectOptions;
import discord4j.common.util.Snowflake;
import discord4j.voice.AudioProvider;
import discord4j.voice.AudioReceiver;
import discord4j.voice.PacketTransformer;
import discord4j.voice.VoiceChannelRetrieveTask;
import discord4j.voice.VoiceConnection;
import discord4j.voice.VoiceDisconnectTask;
import discord4j.voice.VoiceGatewayEvent;
import discord4j.voice.VoiceGatewayOptions;
import discord4j.voice.VoiceReactorResources;
import discord4j.voice.VoiceReceiveTaskFactory;
import discord4j.voice.VoiceSendTaskFactory;
import discord4j.voice.VoiceServerOptions;
import discord4j.voice.VoiceServerUpdateTask;
import discord4j.voice.VoiceSocket;
import discord4j.voice.VoiceWebsocketHandler;
import discord4j.voice.json.Heartbeat;
import discord4j.voice.json.Hello;
import discord4j.voice.json.Identify;
import discord4j.voice.json.Ready;
import discord4j.voice.json.Resume;
import discord4j.voice.json.Resumed;
import discord4j.voice.json.SelectProtocol;
import discord4j.voice.json.SentSpeaking;
import discord4j.voice.json.SessionDescription;
import discord4j.voice.json.VoiceGatewayPayload;
import discord4j.voice.retry.VoiceGatewayException;
import discord4j.voice.retry.VoiceGatewayReconnectException;
import discord4j.voice.retry.VoiceGatewayRetrySpec;
import discord4j.voice.retry.VoiceServerUpdateReconnectException;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.HttpHeaderNames;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.MonoSink;
import reactor.core.publisher.ReplayProcessor;
import reactor.function.TupleUtils;
import reactor.netty.ConnectionObserver;
import reactor.netty.http.client.HttpClient;
import reactor.netty.http.client.WebsocketClientSpec;
import reactor.netty.http.websocket.WebsocketInbound;
import reactor.netty.http.websocket.WebsocketOutbound;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.context.Context;
import reactor.util.retry.Retry;
import reactor.util.retry.RetrySpec;

public class DefaultVoiceGatewayClient {
    private static final Logger log = Loggers.getLogger(DefaultVoiceGatewayClient.class);
    private static final Logger senderLog = Loggers.getLogger("discord4j.voice.protocol.sender");
    private static final Logger receiverLog = Loggers.getLogger("discord4j.voice.protocol.receiver");
    private final EmitterProcessor<ByteBuf> receiver = EmitterProcessor.create(false);
    private final EmitterProcessor<VoiceGatewayPayload<?>> outbound = EmitterProcessor.create(false);
    private final EmitterProcessor<VoiceGatewayEvent> events = EmitterProcessor.create(false);
    private final FluxSink<ByteBuf> receiverSink;
    private final FluxSink<VoiceGatewayPayload<?>> outboundSink;
    private final FluxSink<VoiceGatewayEvent> eventSink;
    private final Snowflake guildId;
    private final Snowflake selfId;
    private final Function<VoiceGatewayPayload<?>, Mono<ByteBuf>> payloadWriter;
    private final Function<ByteBuf, Mono<? super VoiceGatewayPayload<?>>> payloadReader;
    private final VoiceReactorResources reactorResources;
    private final ReconnectOptions reconnectOptions;
    private final ReconnectContext reconnectContext;
    private final AudioProvider audioProvider;
    private final AudioReceiver audioReceiver;
    private final VoiceSendTaskFactory sendTaskFactory;
    private final VoiceReceiveTaskFactory receiveTaskFactory;
    private final VoiceDisconnectTask disconnectTask;
    private final VoiceServerUpdateTask serverUpdateTask;
    private final VoiceChannelRetrieveTask channelRetrieveTask;
    private final Duration ipDiscoveryTimeout;
    private final RetrySpec ipDiscoveryRetrySpec;
    private final VoiceSocket voiceSocket;
    private final ResettableInterval heartbeat;
    private final Disposable.Swap cleanup = Disposables.swap();
    private final ReplayProcessor<VoiceConnection.State> state;
    private final FluxSink<VoiceConnection.State> stateChanges;
    private final AtomicReference<VoiceServerOptions> serverOptions = new AtomicReference();
    private final AtomicReference<String> session = new AtomicReference();
    private volatile int ssrc;
    private volatile MonoProcessor<CloseStatus> disconnectNotifier;
    private volatile Context currentContext;
    private volatile VoiceWebsocketHandler sessionHandler;

    public DefaultVoiceGatewayClient(VoiceGatewayOptions options) {
        this.guildId = options.getGuildId();
        this.selfId = options.getSelfId();
        ObjectMapper mapper = Objects.requireNonNull(options.getJacksonResources()).getObjectMapper();
        this.payloadWriter = payload -> Mono.fromCallable(() -> Unpooled.wrappedBuffer(mapper.writeValueAsBytes(payload)));
        this.payloadReader = buf -> Mono.fromCallable(() -> {
            VoiceGatewayPayload payload = (VoiceGatewayPayload)mapper.readValue((InputStream)new ByteBufInputStream((ByteBuf)buf), new TypeReference<VoiceGatewayPayload<?>>(){});
            return payload;
        });
        this.reactorResources = Objects.requireNonNull(options.getReactorResources());
        this.reconnectOptions = Objects.requireNonNull(options.getReconnectOptions());
        this.reconnectContext = new ReconnectContext(this.reconnectOptions.getFirstBackoff(), this.reconnectOptions.getMaxBackoffInterval());
        this.audioProvider = Objects.requireNonNull(options.getAudioProvider());
        this.audioReceiver = Objects.requireNonNull(options.getAudioReceiver());
        this.sendTaskFactory = Objects.requireNonNull(options.getSendTaskFactory());
        this.receiveTaskFactory = Objects.requireNonNull(options.getReceiveTaskFactory());
        this.disconnectTask = Objects.requireNonNull(options.getDisconnectTask());
        this.serverUpdateTask = Objects.requireNonNull(options.getServerUpdateTask());
        this.channelRetrieveTask = Objects.requireNonNull(options.getChannelRetrieveTask());
        this.ipDiscoveryTimeout = Objects.requireNonNull(options.getIpDiscoveryTimeout());
        this.ipDiscoveryRetrySpec = Objects.requireNonNull(options.getIpDiscoveryRetrySpec());
        this.voiceSocket = new VoiceSocket(this.reactorResources.getUdpClient());
        this.heartbeat = new ResettableInterval(this.reactorResources.getTimerTaskScheduler());
        this.state = ReplayProcessor.cacheLastOrDefault(VoiceConnection.State.CONNECTING);
        this.stateChanges = this.state.sink(FluxSink.OverflowStrategy.LATEST);
        this.receiverSink = this.receiver.sink(FluxSink.OverflowStrategy.BUFFER);
        this.outboundSink = this.outbound.sink(FluxSink.OverflowStrategy.ERROR);
        this.eventSink = this.events.sink(FluxSink.OverflowStrategy.LATEST);
    }

    public Mono<VoiceConnection> start(VoiceServerOptions voiceServerOptions, String session) {
        return Mono.create(sink -> sink.onRequest(d -> {
            Disposable connect = this.connect(voiceServerOptions, session, (MonoSink<VoiceConnection>)sink).subscriberContext(sink.currentContext()).subscribe(null, t -> log.debug(LogUtil.format(sink.currentContext(), "Voice gateway error: {}"), t.toString()), () -> log.debug(LogUtil.format(sink.currentContext(), "Voice gateway completed")));
            sink.onCancel(connect);
        }));
    }

    private Mono<Void> connect(VoiceServerOptions vso, String sessionId, MonoSink<VoiceConnection> voiceConnectionSink) {
        return Mono.deferWithContext(context -> {
            this.serverOptions.compareAndSet(null, vso);
            this.session.compareAndSet(null, sessionId);
            this.disconnectNotifier = MonoProcessor.create();
            this.currentContext = context;
            Flux<ByteBuf> outFlux = this.outbound.flatMap(this.payloadWriter).doOnNext(buf -> this.logPayload(senderLog, (Context)context, (ByteBuf)buf));
            this.sessionHandler = new VoiceWebsocketHandler(this.receiverSink, outFlux, (Context)context);
            Mono<VoiceConnection.State> onOpen = this.state.next().doOnNext(s -> {
                if (s == VoiceConnection.State.RESUMING) {
                    log.info(LogUtil.format(context, "Attempting to resume"));
                    this.outboundSink.next(new Resume(this.guildId.asString(), this.session.get(), this.serverOptions.get().getToken()));
                } else {
                    this.stateChanges.next(VoiceConnection.State.CONNECTING);
                    log.info(LogUtil.format(context, "Identifying"));
                    this.outboundSink.next(new Identify(this.guildId.asString(), this.selfId.asString(), this.session.get(), this.serverOptions.get().getToken()));
                }
            });
            Disposable.Composite innerCleanup = Disposables.composite();
            Mono<Void> receiverFuture = this.receiver.doOnNext(buf -> this.logPayload(receiverLog, (Context)context, (ByteBuf)buf)).flatMap(this.payloadReader).doOnNext(payload -> {
                if (payload instanceof Hello) {
                    Hello hello = (Hello)payload;
                    Duration interval = Duration.ofMillis(((Hello.Data)hello.getData()).getHeartbeatInterval());
                    this.heartbeat.start(interval, interval);
                } else if (payload instanceof Ready) {
                    log.info(LogUtil.format(context, "Waiting for session description"));
                    Ready ready = (Ready)payload;
                    this.ssrc = ((Ready.Data)ready.getData()).getSsrc();
                    this.cleanup.update(innerCleanup);
                    innerCleanup.add(Mono.defer(() -> this.voiceSocket.setup(((Ready.Data)ready.getData()).getIp(), ((Ready.Data)ready.getData()).getPort())).zipWith(this.voiceSocket.performIpDiscovery(((Ready.Data)ready.getData()).getSsrc())).timeout(this.ipDiscoveryTimeout).retryWhen(this.ipDiscoveryRetrySpec).subscriberContext((Context)context).onErrorMap(t -> new VoiceGatewayException((Context)context, "UDP socket setup error", (Throwable)t)).subscribe(TupleUtils.consumer((connection, address) -> {
                        innerCleanup.add((Disposable)connection);
                        String hostName = address.getHostName();
                        int port = address.getPort();
                        this.outboundSink.next(new SelectProtocol("udp", hostName, port, "xsalsa20_poly1305"));
                    }), t -> {
                        voiceConnectionSink.error((Throwable)t);
                        this.sessionHandler.close(DisconnectBehavior.stop(t));
                    }, () -> log.debug(LogUtil.format(context, "Voice socket setup complete"))));
                } else if (payload instanceof SessionDescription) {
                    log.info(LogUtil.format(context, "Receiving events"));
                    this.stateChanges.next(VoiceConnection.State.CONNECTED);
                    this.reconnectContext.reset();
                    SessionDescription sessionDescription = (SessionDescription)payload;
                    byte[] secretKey = ((SessionDescription.Data)sessionDescription.getData()).getSecretKey();
                    TweetNaclFast.SecretBox boxer = new TweetNaclFast.SecretBox(secretKey);
                    PacketTransformer transformer = new PacketTransformer(this.ssrc, boxer);
                    Consumer<Boolean> speakingSender = speaking -> this.outboundSink.next(new SentSpeaking((boolean)speaking, 0, this.ssrc));
                    innerCleanup.add(() -> log.debug(LogUtil.format(context, "Disposing voice tasks")));
                    innerCleanup.add(this.sendTaskFactory.create(this.reactorResources.getSendTaskScheduler(), speakingSender, this.voiceSocket::send, this.audioProvider, transformer));
                    innerCleanup.add(this.receiveTaskFactory.create(this.reactorResources.getReceiveTaskScheduler(), this.voiceSocket.getInbound(), transformer, this.audioReceiver));
                    innerCleanup.add(this.serverUpdateTask.onVoiceServerUpdate(this.guildId).subscribe(newValue -> {
                        VoiceServerOptions current = this.serverOptions.get();
                        if (!current.getEndpoint().equals(newValue.getEndpoint())) {
                            log.debug(LogUtil.format(context, "Voice server endpoint change: {}"), current.getEndpoint(), newValue.getEndpoint());
                            this.serverOptions.set((VoiceServerOptions)newValue);
                            this.sessionHandler.close(DisconnectBehavior.retryAbruptly(new VoiceServerUpdateReconnectException((Context)context)));
                        }
                    }));
                    voiceConnectionSink.success(this.acquireConnection());
                } else if (payload instanceof Resumed) {
                    log.info(LogUtil.format(context, "Resumed"));
                    this.stateChanges.next(VoiceConnection.State.CONNECTED);
                    this.reconnectContext.reset();
                }
                this.eventSink.next((VoiceGatewayEvent)payload);
            }).then();
            Mono<Void> heartbeatHandler = this.heartbeat.ticks().map(Heartbeat::new).doOnNext(this.outboundSink::next).then();
            Mono<Void> httpFuture = ((HttpClient.WebsocketSender)this.reactorResources.getHttpClient().headers(headers -> headers.add((CharSequence)HttpHeaderNames.USER_AGENT, (Object)"DiscordBot(https://discord4j.com, 3)")).observe(this.getObserver((Context)context)).websocket(((WebsocketClientSpec.Builder)WebsocketClientSpec.builder().maxFramePayloadLength(Integer.MAX_VALUE)).build()).uri(this.serverOptions.get().getEndpoint() + "?v=4")).handle((in, out) -> onOpen.then(this.sessionHandler.handle((WebsocketInbound)in, (WebsocketOutbound)out))).subscriberContext(LogUtil.clearContext()).flatMap(t2 -> this.handleClose((DisconnectBehavior)t2.getT1(), (CloseStatus)t2.getT2())).then();
            return Mono.zip(httpFuture, receiverFuture, heartbeatHandler).doOnError(t -> log.error(LogUtil.format(context, "{}"), t.toString())).doOnTerminate(this.heartbeat::stop).doOnCancel(() -> this.sessionHandler.close()).then();
        }).subscriberContext(ctx -> ctx.put("discord4j.guild", this.guildId.asString())).retryWhen(this.retryFactory()).then(Mono.defer(() -> this.disconnectNotifier.then())).doOnSubscribe(s -> {
            if (this.disconnectNotifier != null) {
                throw new IllegalStateException("connect can only be subscribed once");
            }
        });
    }

    private ConnectionObserver getObserver(Context context) {
        return (connection, newState) -> log.debug(LogUtil.format(context, "{} {}"), newState, connection);
    }

    private VoiceConnection acquireConnection() {
        return new VoiceConnection(){

            @Override
            public Flux<VoiceGatewayEvent> events() {
                return DefaultVoiceGatewayClient.this.events;
            }

            @Override
            public Flux<VoiceConnection.State> stateEvents() {
                return DefaultVoiceGatewayClient.this.state;
            }

            @Override
            public Mono<Void> disconnect() {
                return this.onConnectOrDisconnect().flatMap(s -> s.equals((Object)VoiceConnection.State.CONNECTED) ? DefaultVoiceGatewayClient.this.stop() : Mono.empty()).then();
            }

            @Override
            public Snowflake getGuildId() {
                return DefaultVoiceGatewayClient.this.guildId;
            }

            @Override
            public Mono<Snowflake> getChannelId() {
                return this.onConnectOrDisconnect().flatMap(s -> s.equals((Object)VoiceConnection.State.CONNECTED) ? DefaultVoiceGatewayClient.this.channelRetrieveTask.onRequest() : Mono.empty());
            }

            @Override
            public Mono<Void> reconnect() {
                return this.reconnect(VoiceGatewayReconnectException::new);
            }

            @Override
            public Mono<Void> reconnect(Function<Context, Throwable> errorCause) {
                return this.onConnectOrDisconnect().flatMap(s -> s.equals((Object)VoiceConnection.State.CONNECTED) ? Mono.fromRunnable(() -> DefaultVoiceGatewayClient.this.sessionHandler.close(DisconnectBehavior.retryAbruptly((Throwable)errorCause.apply(DefaultVoiceGatewayClient.this.currentContext)))).then(this.stateEvents().filter(ss -> ss.equals((Object)VoiceConnection.State.CONNECTED)).next()) : Mono.error(new IllegalStateException("Voice connection has already disconnected"))).then();
            }
        };
    }

    public Mono<Void> stop() {
        return Mono.defer(() -> {
            if (this.sessionHandler == null || this.disconnectNotifier == null) {
                return Mono.error(new IllegalStateException("Gateway client is not active!"));
            }
            if (!this.disconnectNotifier.isTerminated()) {
                this.sessionHandler.close(DisconnectBehavior.stop(null));
            }
            return this.disconnectNotifier.then();
        });
    }

    private void logPayload(Logger logger, Context context, ByteBuf buf) {
        logger.trace(LogUtil.format(context, buf.toString(StandardCharsets.UTF_8).replaceAll("(\"token\": ?\")([A-Za-z0-9._-]*)(\")", "$1hunter2$3")));
    }

    private Retry retryFactory() {
        return VoiceGatewayRetrySpec.create(this.reconnectOptions, this.reconnectContext).doBeforeRetry(retry -> {
            this.stateChanges.next(retry.nextState());
            long attempt = retry.iteration();
            Duration backoff = retry.nextBackoff();
            log.debug(LogUtil.format(this.getContextFromException(retry.failure()), "{} in {} (attempts: {})"), new Object[]{retry.nextState(), backoff, attempt});
        });
    }

    private Context getContextFromException(Throwable t) {
        if (t instanceof CloseException) {
            return ((CloseException)t).getContext();
        }
        if (t instanceof VoiceGatewayException) {
            return ((VoiceGatewayException)t).getContext();
        }
        return Context.empty();
    }

    private Mono<CloseStatus> handleClose(DisconnectBehavior sourceBehavior, CloseStatus closeStatus) {
        return Mono.deferWithContext(ctx -> {
            DisconnectBehavior behavior = VoiceGatewayRetrySpec.NON_RETRYABLE_STATUS_CODES.contains(closeStatus.getCode()) ? DisconnectBehavior.stop(sourceBehavior.getCause()) : sourceBehavior;
            log.debug(LogUtil.format(ctx, "Closing and {} with status {}"), behavior, closeStatus);
            this.heartbeat.stop();
            if (behavior.getAction() == DisconnectBehavior.Action.STOP) {
                this.cleanup.dispose();
            }
            switch (behavior.getAction()) {
                case STOP_ABRUPTLY: 
                case STOP: {
                    if (behavior.getCause() != null) {
                        return Mono.just(new CloseException(closeStatus, (Context)ctx, behavior.getCause())).flatMap(ex -> {
                            this.stateChanges.next(VoiceConnection.State.DISCONNECTED);
                            this.disconnectNotifier.onError((Throwable)ex);
                            Mono<CloseStatus> thenMono = closeStatus.getCode() == 4014 ? Mono.just(closeStatus) : Mono.error(ex);
                            return this.disconnectTask.onDisconnect(this.guildId).then(thenMono);
                        });
                    }
                    return Mono.just(closeStatus).flatMap(status -> {
                        this.stateChanges.next(VoiceConnection.State.DISCONNECTED);
                        this.disconnectNotifier.onNext(closeStatus);
                        return this.disconnectTask.onDisconnect(this.guildId).thenReturn(closeStatus);
                    });
                }
            }
            return Mono.error(new CloseException(closeStatus, (Context)ctx, behavior.getCause()));
        });
    }
}

