/*
 * Decompiled with CFR 0.152.
 */
package discord4j.core.event;

import discord4j.common.LogUtil;
import discord4j.core.event.EventDispatcher;
import discord4j.core.event.domain.Event;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import org.reactivestreams.Subscription;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxProcessor;
import reactor.core.publisher.FluxSink;
import reactor.core.scheduler.Scheduler;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.concurrent.Queues;

public class DefaultEventDispatcher
implements EventDispatcher {
    private static final Logger log = Loggers.getLogger(DefaultEventDispatcher.class);
    private final FluxProcessor<Event, Event> eventProcessor;
    private final FluxSink<Event> sink;
    private final Scheduler eventScheduler;

    public DefaultEventDispatcher(FluxProcessor<Event, Event> eventProcessor, FluxSink.OverflowStrategy overflowStrategy, Scheduler eventScheduler) {
        this.eventProcessor = eventProcessor;
        this.sink = eventProcessor.sink(overflowStrategy);
        this.eventScheduler = eventScheduler;
    }

    @Override
    public <E extends Event> Flux<E> on(Class<E> eventClass) {
        AtomicReference subscription = new AtomicReference();
        return this.eventProcessor.publishOn(this.eventScheduler).ofType(eventClass).handle((event, sink) -> {
            if (log.isTraceEnabled()) {
                log.trace(LogUtil.format(sink.currentContext().put("discord4j.shard", event.getShardInfo().getIndex()), "{}"), event.toString());
            }
            sink.next(event);
        }).doOnSubscribe(sub -> {
            subscription.set(sub);
            if (log.isDebugEnabled()) {
                log.debug("Subscription {} to {} created", Integer.toHexString(sub.hashCode()), eventClass.getSimpleName());
            }
        }).doFinally(signal -> {
            if (log.isDebugEnabled()) {
                log.debug("Subscription {} to {} disposed due to {}", Integer.toHexString(((Subscription)subscription.get()).hashCode()), eventClass.getSimpleName(), signal);
            }
        });
    }

    @Override
    public void publish(Event event) {
        this.sink.next(event);
    }

    @Override
    public void shutdown() {
        this.sink.complete();
    }

    public static class Builder
    implements EventDispatcher.Builder {
        protected FluxProcessor<Event, Event> eventProcessor;
        protected FluxSink.OverflowStrategy overflowStrategy = FluxSink.OverflowStrategy.BUFFER;
        protected Scheduler eventScheduler;

        protected Builder() {
        }

        @Override
        public Builder eventProcessor(FluxProcessor<Event, Event> eventProcessor) {
            this.eventProcessor = Objects.requireNonNull(eventProcessor);
            return this;
        }

        @Override
        public Builder overflowStrategy(FluxSink.OverflowStrategy overflowStrategy) {
            this.overflowStrategy = Objects.requireNonNull(overflowStrategy);
            return this;
        }

        @Override
        public Builder eventScheduler(Scheduler eventScheduler) {
            this.eventScheduler = Objects.requireNonNull(eventScheduler);
            return this;
        }

        @Override
        public EventDispatcher build() {
            if (this.eventProcessor == null) {
                this.eventProcessor = EmitterProcessor.create(Queues.SMALL_BUFFER_SIZE, false);
            }
            if (this.eventScheduler == null) {
                this.eventScheduler = EventDispatcher.DEFAULT_EVENT_SCHEDULER.get();
            }
            return new DefaultEventDispatcher(this.eventProcessor, this.overflowStrategy, this.eventScheduler);
        }
    }
}

