/*
 * Decompiled with CFR 0.152.
 */
package discord4j.rest.request;

import discord4j.common.LogUtil;
import discord4j.rest.http.client.ClientException;
import discord4j.rest.http.client.ClientRequest;
import discord4j.rest.http.client.ClientResponse;
import discord4j.rest.http.client.DiscordWebClient;
import discord4j.rest.request.BucketKey;
import discord4j.rest.request.DiscardedRequestException;
import discord4j.rest.request.DiscordWebRequest;
import discord4j.rest.request.GlobalRateLimiter;
import discord4j.rest.request.RateLimitRetryOperator;
import discord4j.rest.request.RateLimitStrategy;
import discord4j.rest.request.RequestCorrelation;
import discord4j.rest.request.RequestQueue;
import discord4j.rest.request.RouterOptions;
import discord4j.rest.response.ResponseFunction;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.function.Function;
import org.reactivestreams.Subscription;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.SignalType;
import reactor.core.scheduler.Scheduler;
import reactor.netty.http.client.HttpClientResponse;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.retry.Retry;

class RequestStream {
    private static final Logger log = Loggers.getLogger(RequestStream.class);
    private final BucketKey id;
    private final RequestQueue<RequestCorrelation<ClientResponse>> requestQueue;
    private final GlobalRateLimiter globalRateLimiter;
    private final Scheduler timedTaskScheduler;
    private final List<ResponseFunction> responseFunctions;
    private final DiscordWebClient httpClient;
    private final RateLimitStrategy rateLimitStrategy;
    private final RateLimitRetryOperator rateLimitRetryOperator;

    RequestStream(BucketKey id, RouterOptions routerOptions, DiscordWebClient httpClient, RateLimitStrategy rateLimitStrategy) {
        this.id = id;
        this.requestQueue = routerOptions.getRequestQueueFactory().create();
        this.globalRateLimiter = routerOptions.getGlobalRateLimiter();
        this.timedTaskScheduler = routerOptions.getReactorResources().getTimerTaskScheduler();
        this.responseFunctions = routerOptions.getResponseTransformers();
        this.httpClient = httpClient;
        this.rateLimitStrategy = rateLimitStrategy;
        this.rateLimitRetryOperator = new RateLimitRetryOperator(this.timedTaskScheduler);
    }

    private reactor.retry.Retry<?> serverErrorRetryFactory() {
        return reactor.retry.Retry.onlyIf(ClientException.isRetryContextStatusCode(500, 502, 503, 504, 520)).withBackoffScheduler(this.timedTaskScheduler).exponentialBackoffWithJitter(Duration.ofSeconds(2L), Duration.ofSeconds(30L)).doOnRetry(ctx -> {
            if (log.isDebugEnabled()) {
                log.debug("Retry {} in bucket {} due to {} for {}", ctx.iteration(), this.id.toString(), ctx.exception().toString(), ctx.backoff());
            }
        });
    }

    void push(RequestCorrelation<ClientResponse> request) {
        this.requestQueue.push(request);
    }

    void start() {
        this.requestQueue.requests().doOnDiscard(RequestCorrelation.class, this::onDiscard).subscribe(new RequestSubscriber(this.rateLimitStrategy));
    }

    private void onDiscard(RequestCorrelation<?> requestCorrelation) {
        requestCorrelation.getResponse().onError(new DiscardedRequestException(requestCorrelation.getRequest()));
    }

    private class RequestSubscriber
    extends BaseSubscriber<RequestCorrelation<ClientResponse>> {
        private volatile Duration sleepTime = Duration.ZERO;
        private final Function<ClientResponse, Mono<ClientResponse>> responseFunction = response -> {
            Duration nextReset;
            HttpClientResponse httpResponse = response.getHttpResponse();
            if (log.isDebugEnabled()) {
                Instant requestTimestamp = Instant.ofEpochMilli((Long)httpResponse.currentContext().get("discord4j.request.timestamp"));
                Duration responseTime = Duration.between(requestTimestamp, Instant.now());
                LogUtil.traceDebug(log, trace -> LogUtil.format(httpResponse.currentContext(), "Read " + httpResponse.status() + " in " + responseTime + (trace == false ? "" : " with headers: " + httpResponse.responseHeaders())));
            }
            if (!(nextReset = strategy.apply(httpResponse)).isZero()) {
                if (log.isDebugEnabled()) {
                    log.debug(LogUtil.format(httpResponse.currentContext(), "Delaying next request by {}"), nextReset);
                }
                this.sleepTime = nextReset;
            }
            boolean global = Boolean.parseBoolean(httpResponse.responseHeaders().get("X-RateLimit-Global"));
            Mono<Object> action = Mono.empty();
            if (global) {
                long retryAfter = Long.parseLong(httpResponse.responseHeaders().get("Retry-After"));
                Duration fixedBackoff = Duration.ofMillis(retryAfter);
                action = RequestStream.this.globalRateLimiter.rateLimitFor(fixedBackoff).doOnTerminate(() -> log.debug(LogUtil.format(httpResponse.currentContext(), "Globally rate limited for {}"), fixedBackoff));
            }
            if (httpResponse.status().code() >= 400) {
                return action.then(response.createException().flatMap(Mono::error));
            }
            return action.thenReturn(response);
        };

        public RequestSubscriber(RateLimitStrategy strategy) {
        }

        @Override
        protected void hookOnSubscribe(Subscription subscription) {
            this.request(1L);
        }

        @Override
        protected void hookOnNext(RequestCorrelation<ClientResponse> correlation) {
            DiscordWebRequest request = correlation.getRequest();
            ClientRequest clientRequest = new ClientRequest(request);
            MonoProcessor<ClientResponse> callback = correlation.getResponse();
            Mono.just(clientRequest).flatMap(req -> Mono.deferWithContext(ctx -> {
                LogUtil.traceDebug(log, trace -> LogUtil.format(ctx, trace != false ? req.toString() : req.getDescription()));
                return RequestStream.this.globalRateLimiter.withLimiter(RequestStream.this.httpClient.exchange((ClientRequest)req).flatMap(this.responseFunction)).next();
            })).subscriberContext(ctx -> ctx.putAll(correlation.getContext()).put("discord4j.request", clientRequest.getId()).put("discord4j.bucket", RequestStream.this.id.toString())).retryWhen(Retry.withThrowable(RequestStream.this.rateLimitRetryOperator::apply)).transform(this.getResponseTransformers(request)).retryWhen(Retry.withThrowable(RequestStream.this.serverErrorRetryFactory())).doFinally(this::next).checkpoint("Request to " + clientRequest.getDescription() + " [RequestStream]").subscribeWith(callback).subscribe(null, t -> log.trace("Error while processing {}: {}", request, t));
        }

        private Function<Mono<ClientResponse>, Mono<ClientResponse>> getResponseTransformers(DiscordWebRequest discordRequest) {
            return RequestStream.this.responseFunctions.stream().map(rt -> rt.transform(discordRequest).andThen(mono -> mono.checkpoint("Apply " + rt + " to " + discordRequest.getDescription() + " [RequestStream]"))).reduce(Function::andThen).orElse(mono -> mono);
        }

        private void next(SignalType signal) {
            Mono<Long> timer = this.sleepTime.isZero() ? Mono.just(0L) : Mono.delay(this.sleepTime, RequestStream.this.timedTaskScheduler);
            timer.subscribe(l -> {
                if (log.isDebugEnabled()) {
                    log.debug("[B:{}] Ready to consume next request after {}", new Object[]{RequestStream.this.id.toString(), signal});
                }
                this.sleepTime = Duration.ZERO;
                this.request(1L);
            }, t -> log.error("[B:{}] Error while scheduling next request", RequestStream.this.id.toString(), t));
        }
    }
}

