package io.moquette.broker;

import io.moquette.broker.ISessionsRepository;
import io.moquette.broker.Session;
import io.moquette.broker.subscriptions.ISubscriptionsDirectory;
import io.moquette.broker.subscriptions.Subscription;
import io.moquette.broker.subscriptions.Topic;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttVersion;
import java.time.Clock;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/moquette/broker/SessionRegistry.class */
public class SessionRegistry {
    private final ScheduledFuture<?> scheduledExpiredSessions;
    private int globalExpirySeconds;
    private final SessionEventLoopGroup loopsGroup;
    static final Duration EXPIRED_SESSION_CLEANER_TASK_INTERVAL = Duration.ofSeconds(1);
    private static final Logger LOG = LoggerFactory.getLogger(SessionRegistry.class);
    private final ConcurrentMap<String, Session> pool;
    private final ISubscriptionsDirectory subscriptionsDirectory;
    private final ISessionsRepository sessionsRepository;
    private final IQueueRepository queueRepository;
    private final Authorizator authorizator;
    private final DelayQueue<ISessionsRepository.SessionData> removableSessions;
    private final Clock clock;

    /* loaded from: input_file:io/moquette/broker/SessionRegistry$CreationModeEnum.class */
    public enum CreationModeEnum {
        CREATED_CLEAN_NEW,
        REOPEN_EXISTING,
        DROP_EXISTING
    }

    /* loaded from: input_file:io/moquette/broker/SessionRegistry$EnqueuedMessage.class */
    public static abstract class EnqueuedMessage {
        public void release() {
        }

        public void retain() {
        }
    }

    /* loaded from: input_file:io/moquette/broker/SessionRegistry$PubRelMarker.class */
    public static final class PubRelMarker extends EnqueuedMessage {
    }

    /* loaded from: input_file:io/moquette/broker/SessionRegistry$PublishedMessage.class */
    public static class PublishedMessage extends EnqueuedMessage {
        final Topic topic;
        final MqttQoS publishingQos;
        final ByteBuf payload;
        final boolean retained = false;

        public PublishedMessage(Topic topic, MqttQoS mqttQoS, ByteBuf byteBuf, boolean z) {
            this.topic = topic;
            this.publishingQos = mqttQoS;
            this.payload = byteBuf;
        }

        public Topic getTopic() {
            return this.topic;
        }

        public MqttQoS getPublishingQos() {
            return this.publishingQos;
        }

        public ByteBuf getPayload() {
            return this.payload;
        }

        @Override // io.moquette.broker.SessionRegistry.EnqueuedMessage
        public void release() {
            this.payload.release();
        }

        @Override // io.moquette.broker.SessionRegistry.EnqueuedMessage
        public void retain() {
            this.payload.retain();
        }
    }

    /* loaded from: input_file:io/moquette/broker/SessionRegistry$SessionCreationResult.class */
    public static class SessionCreationResult {
        final Session session;
        final CreationModeEnum mode;
        final boolean alreadyStored;

        public SessionCreationResult(Session session, CreationModeEnum creationModeEnum, boolean z) {
            this.session = session;
            this.mode = creationModeEnum;
            this.alreadyStored = z;
        }
    }

    SessionRegistry(ISubscriptionsDirectory iSubscriptionsDirectory, ISessionsRepository iSessionsRepository, IQueueRepository iQueueRepository, Authorizator authorizator, ScheduledExecutorService scheduledExecutorService, SessionEventLoopGroup sessionEventLoopGroup) {
        this(iSubscriptionsDirectory, iSessionsRepository, iQueueRepository, authorizator, scheduledExecutorService, Clock.systemDefaultZone(), Integer.MAX_VALUE, sessionEventLoopGroup);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SessionRegistry(ISubscriptionsDirectory iSubscriptionsDirectory, ISessionsRepository iSessionsRepository, IQueueRepository iQueueRepository, Authorizator authorizator, ScheduledExecutorService scheduledExecutorService, Clock clock, int i, SessionEventLoopGroup sessionEventLoopGroup) {
        this.pool = new ConcurrentHashMap();
        this.removableSessions = new DelayQueue<>();
        this.subscriptionsDirectory = iSubscriptionsDirectory;
        this.sessionsRepository = iSessionsRepository;
        this.queueRepository = iQueueRepository;
        this.authorizator = authorizator;
        this.scheduledExpiredSessions = scheduledExecutorService.scheduleWithFixedDelay(this::checkExpiredSessions, EXPIRED_SESSION_CLEANER_TASK_INTERVAL.getSeconds(), EXPIRED_SESSION_CLEANER_TASK_INTERVAL.getSeconds(), TimeUnit.SECONDS);
        this.clock = clock;
        this.globalExpirySeconds = i;
        this.loopsGroup = sessionEventLoopGroup;
        recreateSessionPool();
    }

    private void checkExpiredSessions() {
        ArrayList<ISessionsRepository.SessionData> arrayList = new ArrayList();
        LOG.debug("Retrieved {} expired sessions or {}", Integer.valueOf(this.removableSessions.drainTo(arrayList)), Integer.valueOf(this.removableSessions.size()));
        for (ISessionsRepository.SessionData sessionData : arrayList) {
            LOG.debug("Removing session {}, expired on {}", sessionData.clientId(), (String) sessionData.expireAt().map((v0) -> {
                return v0.toString();
            }).orElse("UNDEFINED"));
            remove(sessionData.clientId());
            this.sessionsRepository.delete(sessionData);
        }
    }

    private void trackForRemovalOnExpiration(ISessionsRepository.SessionData sessionData) {
        if (!sessionData.expireAt().isPresent()) {
            throw new RuntimeException("Can't track for expiration a session without expiry instant, client_id: " + sessionData.clientId());
        }
        LOG.debug("start tracking the session {} for removal", sessionData.clientId());
        this.removableSessions.add((DelayQueue<ISessionsRepository.SessionData>) sessionData);
    }

    private void untrackFromRemovalOnExpiration(ISessionsRepository.SessionData sessionData) {
        this.removableSessions.remove(sessionData);
    }

    private void recreateSessionPool() {
        Set<String> listQueueNames = this.queueRepository.listQueueNames();
        for (ISessionsRepository.SessionData sessionData : this.sessionsRepository.list()) {
            if (this.queueRepository.containsQueue(sessionData.clientId())) {
                SessionMessageQueue<EnqueuedMessage> orCreateQueue = this.queueRepository.getOrCreateQueue(sessionData.clientId());
                listQueueNames.remove(sessionData.clientId());
                this.pool.put(sessionData.clientId(), new Session(sessionData, false, orCreateQueue));
                trackForRemovalOnExpiration(sessionData);
            }
        }
        if (listQueueNames.isEmpty()) {
            return;
        }
        LOG.error("Recreating sessions left {} unused queues. This is probably a bug. Session IDs: {}", Integer.valueOf(listQueueNames.size()), Arrays.toString(listQueueNames.toArray()));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SessionCreationResult createOrReopenSession(MqttConnectMessage mqttConnectMessage, String str, String str2) {
        SessionCreationResult reopenExistingSession;
        Session retrieve = retrieve(str);
        if (retrieve == null) {
            Session createNewSession = createNewSession(mqttConnectMessage, str);
            reopenExistingSession = new SessionCreationResult(createNewSession, CreationModeEnum.CREATED_CLEAN_NEW, false);
            if (this.pool.put(str, createNewSession) != null) {
                LOG.error("Another thread added a Session for our clientId {}, this is a bug!", str);
            }
            LOG.trace("case 1, not existing session with CId {}", str);
        } else {
            reopenExistingSession = reopenExistingSession(mqttConnectMessage, str, retrieve, str2);
        }
        return reopenExistingSession;
    }

    private SessionCreationResult reopenExistingSession(MqttConnectMessage mqttConnectMessage, String str, Session session, String str2) {
        SessionCreationResult sessionCreationResult;
        boolean isCleanSession = mqttConnectMessage.variableHeader().isCleanSession();
        if (!session.disconnected()) {
            session.closeImmediately();
        }
        if (isCleanSession) {
            purgeSessionState(session);
            Session createNewSession = createNewSession(mqttConnectMessage, str);
            this.pool.put(str, createNewSession);
            LOG.trace("case 2, oldSession with same CId {} disconnected", str);
            sessionCreationResult = new SessionCreationResult(createNewSession, CreationModeEnum.CREATED_CLEAN_NEW, true);
        } else {
            if (!session.assignState(Session.SessionStatus.DISCONNECTED, Session.SessionStatus.CONNECTING)) {
                throw new SessionCorruptedException("old session moved in connected state by other thread");
            }
            copySessionConfig(mqttConnectMessage, session);
            reactivateSubscriptions(session, str2);
            LOG.trace("case 3, oldSession with same CId {} disconnected", str);
            sessionCreationResult = new SessionCreationResult(session, CreationModeEnum.REOPEN_EXISTING, true);
        }
        untrackFromRemovalOnExpiration(sessionCreationResult.session.getSessionData());
        return sessionCreationResult;
    }

    private void reactivateSubscriptions(Session session, String str) {
        for (Subscription subscription : session.getSubscriptions()) {
            if (!this.authorizator.canRead(subscription.getTopicFilter(), str, session.getClientID())) {
                this.subscriptionsDirectory.removeSubscription(subscription.getTopicFilter(), session.getClientID());
            }
        }
    }

    private void unsubscribe(Session session) {
        Iterator<Subscription> it = session.getSubscriptions().iterator();
        while (it.hasNext()) {
            this.subscriptionsDirectory.removeSubscription(it.next().getTopicFilter(), session.getClientID());
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private Session createNewSession(MqttConnectMessage mqttConnectMessage, String str) {
        boolean isCleanSession = mqttConnectMessage.variableHeader().isCleanSession();
        SessionMessageQueue orCreateQueue = !isCleanSession ? this.queueRepository.getOrCreateQueue(str) : new InMemoryQueue();
        ISessionsRepository.SessionData sessionData = new ISessionsRepository.SessionData(str, MqttVersion.MQTT_3_1_1, isCleanSession ? 0 : this.globalExpirySeconds, this.clock);
        Session session = mqttConnectMessage.variableHeader().isWillFlag() ? new Session(sessionData, isCleanSession, createWill(mqttConnectMessage), orCreateQueue) : new Session(sessionData, isCleanSession, orCreateQueue);
        session.markConnecting();
        this.sessionsRepository.saveSession(sessionData);
        return session;
    }

    private void copySessionConfig(MqttConnectMessage mqttConnectMessage, Session session) {
        session.update(mqttConnectMessage.variableHeader().isCleanSession(), mqttConnectMessage.variableHeader().isWillFlag() ? createWill(mqttConnectMessage) : null);
    }

    private Session.Will createWill(MqttConnectMessage mqttConnectMessage) {
        ByteBuf copiedBuffer = Unpooled.copiedBuffer(mqttConnectMessage.payload().willMessageInBytes());
        return new Session.Will(mqttConnectMessage.payload().willTopic(), copiedBuffer, MqttQoS.valueOf(mqttConnectMessage.variableHeader().willQos()), mqttConnectMessage.variableHeader().isWillRetain());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Session retrieve(String str) {
        return this.pool.get(str);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void connectionClosed(Session session) {
        session.disconnect();
        if (session.expireImmediately()) {
            purgeSessionState(session);
        } else {
            trackForRemovalOnExpiration(session.getSessionData().withExpirationComputed());
        }
    }

    private void purgeSessionState(Session session) {
        LOG.debug("Remove session state for client {}", session.getClientID());
        if (!session.assignState(Session.SessionStatus.DISCONNECTED, Session.SessionStatus.DESTROYED)) {
            throw new SessionCorruptedException("Session has already changed state: " + session);
        }
        unsubscribe(session);
        remove(session.getClientID());
    }

    void remove(String str) {
        Session remove = this.pool.remove(str);
        if (remove != null) {
            this.removableSessions.remove(remove.getSessionData());
            this.loopsGroup.routeCommand(str, "Clean up removed session", () -> {
                remove.cleanUp();
                return null;
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Collection<ClientDescriptor> listConnectedClients() {
        return (Collection) this.pool.values().stream().filter((v0) -> {
            return v0.connected();
        }).map(this::createClientDescriptor).filter((v0) -> {
            return v0.isPresent();
        }).map((v0) -> {
            return v0.get();
        }).collect(Collectors.toList());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean dropSession(String str, boolean z) {
        LOG.debug("Disconnecting client: {}", str);
        if (str == null) {
            return false;
        }
        Session session = this.pool.get(str);
        if (session == null) {
            LOG.debug("Client {} not found, nothing disconnected", str);
            return false;
        }
        session.closeImmediately();
        if (z) {
            purgeSessionState(session);
        }
        LOG.debug("Client {} successfully disconnected from broker", str);
        return true;
    }

    private Optional<ClientDescriptor> createClientDescriptor(Session session) {
        String clientID = session.getClientID();
        return session.remoteAddress().map(inetSocketAddress -> {
            return new ClientDescriptor(clientID, inetSocketAddress.getHostString(), inetSocketAddress.getPort());
        });
    }

    public void close() {
        if (this.scheduledExpiredSessions.cancel(false)) {
            LOG.info("Successfully cancelled expired sessions task");
        } else {
            LOG.warn("Can't cancel the execution of expired sessions task, was already cancelled? {}, was done? {}", Boolean.valueOf(this.scheduledExpiredSessions.isCancelled()), Boolean.valueOf(this.scheduledExpiredSessions.isDone()));
        }
        updateNotCleanSessionsWithProperExpire();
        this.queueRepository.close();
    }

    private void updateNotCleanSessionsWithProperExpire() {
        Stream map = this.pool.values().stream().filter(session -> {
            return !session.isClean();
        }).map((v0) -> {
            return v0.getSessionData();
        }).filter(sessionData -> {
            return !sessionData.expireAt().isPresent();
        }).map((v0) -> {
            return v0.withExpirationComputed();
        });
        ISessionsRepository iSessionsRepository = this.sessionsRepository;
        Objects.requireNonNull(iSessionsRepository);
        map.forEach(iSessionsRepository::saveSession);
    }
}
