/*
 * Decompiled with CFR 0.152.
 */
package discord4j.voice;

import discord4j.common.LogUtil;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.socket.DatagramChannel;
import java.io.ByteArrayOutputStream;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.charset.StandardCharsets;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import reactor.netty.ConnectionObserver;
import reactor.netty.udp.UdpClient;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.context.Context;

public class VoiceSocket {
    private static final Logger log = Loggers.getLogger(VoiceSocket.class);
    private static final Logger senderLog = Loggers.getLogger("discord4j.voice.protocol.udp.sender");
    private static final Logger receiverLog = Loggers.getLogger("discord4j.voice.protocol.udp.receiver");
    static final String PROTOCOL = "udp";
    static final String ENCRYPTION_MODE = "xsalsa20_poly1305";
    private static final int DISCOVERY_PACKET_LENGTH = 70;
    private final EmitterProcessor<ByteBuf> inbound = EmitterProcessor.create(false);
    private final EmitterProcessor<ByteBuf> outbound = EmitterProcessor.create(false);
    private final FluxSink<ByteBuf> inboundSink = this.inbound.sink(FluxSink.OverflowStrategy.LATEST);
    private final UdpClient udpClient;

    public VoiceSocket(UdpClient udpClient) {
        this.udpClient = udpClient;
    }

    Mono<Connection> setup(String address, int port) {
        return Mono.deferWithContext(context -> this.udpClient.host(address).port(port).observe(this.getObserver((Context)context)).doOnConnected(c -> log.debug(LogUtil.format(context, "Connected to {}"), this.address((Connection)c))).doOnDisconnected(c -> log.debug(LogUtil.format(context, "Disconnected from {}"), this.address((Connection)c))).handle((in, out) -> {
            Mono<Void> inboundThen = in.receive().retain().doOnNext(buf -> this.logPayload(receiverLog, (Context)context, (ByteBuf)buf)).doOnNext(this.inboundSink::next).then();
            Mono<Void> outboundThen = out.send(this.outbound.doOnNext(buf -> this.logPayload(senderLog, (Context)context, (ByteBuf)buf))).then();
            in.withConnection(c -> c.onDispose(() -> log.debug(LogUtil.format(context, "Connection disposed"))));
            return Mono.zip(inboundThen, outboundThen).then();
        }).connect());
    }

    private SocketAddress address(Connection connection) {
        Channel c = connection.channel();
        if (c instanceof DatagramChannel) {
            SocketAddress a = c.remoteAddress();
            return a != null ? a : c.localAddress();
        }
        return c.remoteAddress();
    }

    private ConnectionObserver getObserver(Context context) {
        return (connection, newState) -> log.debug(LogUtil.format(context, "{} {}"), newState, connection);
    }

    private void logPayload(Logger logger, Context context, ByteBuf buf) {
        logger.trace(LogUtil.format(context, ByteBufUtil.hexDump(buf)));
    }

    Mono<InetSocketAddress> performIpDiscovery(int ssrc) {
        Mono sendDiscoveryPacket = Mono.fromRunnable(() -> {
            ByteBuf discoveryPacket = Unpooled.buffer(70).writeInt(ssrc).writeZero(66);
            this.outbound.onNext(discoveryPacket);
        });
        Mono<InetSocketAddress> parseResponse = this.inbound.next().map(buf -> {
            String address = VoiceSocket.getNullTerminatedString(buf, 4);
            int port = buf.getUnsignedShortLE(68);
            buf.release();
            return InetSocketAddress.createUnresolved(address, port);
        });
        return sendDiscoveryPacket.then(parseResponse);
    }

    void send(ByteBuf data) {
        this.outbound.onNext(data);
    }

    Flux<ByteBuf> getInbound() {
        return this.inbound;
    }

    private static String getNullTerminatedString(ByteBuf buffer, int offset) {
        byte c;
        buffer.skipBytes(offset);
        ByteArrayOutputStream os = new ByteArrayOutputStream(15);
        while ((c = buffer.readByte()) != 0) {
            os.write(c);
        }
        return new String(os.toByteArray(), StandardCharsets.US_ASCII);
    }
}

