package io.moquette.broker;

import io.moquette.broker.config.FileResourceLoader;
import io.moquette.broker.config.FluentConfig;
import io.moquette.broker.config.IConfig;
import io.moquette.broker.config.IResourceLoader;
import io.moquette.broker.config.MemoryConfig;
import io.moquette.broker.config.ResourceLoaderConfig;
import io.moquette.broker.security.ACLFileParser;
import io.moquette.broker.security.AcceptAllAuthenticator;
import io.moquette.broker.security.DenyAllAuthorizatorPolicy;
import io.moquette.broker.security.IAuthenticator;
import io.moquette.broker.security.IAuthorizatorPolicy;
import io.moquette.broker.security.PermitAllAuthorizatorPolicy;
import io.moquette.broker.security.ResourceAuthenticator;
import io.moquette.broker.subscriptions.CTrieSubscriptionDirectory;
import io.moquette.broker.unsafequeues.QueueException;
import io.moquette.interception.BrokerInterceptor;
import io.moquette.interception.InterceptHandler;
import io.moquette.logging.LoggingUtils;
import io.moquette.persistence.H2Builder;
import io.moquette.persistence.MemorySessionsRepository;
import io.moquette.persistence.MemorySubscriptionsRepository;
import io.moquette.persistence.SegmentQueueRepository;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.lang.reflect.InvocationTargetException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.text.ParseException;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/moquette/broker/Server.class */
public class Server {
    private static final Logger LOG = LoggerFactory.getLogger(Server.class);
    public static final String MOQUETTE_VERSION = "0.17";
    private ScheduledExecutorService scheduler;
    private NewNettyAcceptor acceptor;
    private volatile boolean initialized;
    private PostOffice dispatcher;
    private BrokerInterceptor interceptor;
    private H2Builder h2Builder;
    private SessionRegistry sessions;
    private boolean standalone = false;

    public static void main(String[] strArr) throws IOException {
        Server server = new Server();
        try {
            server.startStandaloneServer();
        } catch (RuntimeException e) {
            System.exit(1);
        }
        System.out.println("Server started, version 0.17");
        Runtime runtime = Runtime.getRuntime();
        Objects.requireNonNull(server);
        runtime.addShutdownHook(new Thread(server::stopServer));
    }

    public void startServer() throws IOException {
        File defaultConfigFile = defaultConfigFile();
        LOG.info("Starting Moquette integration. Configuration file path={}", defaultConfigFile.getAbsolutePath());
        startServer(new ResourceLoaderConfig(new FileResourceLoader(defaultConfigFile)));
    }

    private void startStandaloneServer() throws IOException {
        this.standalone = true;
        startServer();
    }

    private static File defaultConfigFile() {
        return new File(System.getProperty("moquette.path", null), IConfig.DEFAULT_CONFIG);
    }

    public void startServer(File file) throws IOException {
        LOG.info("Starting Moquette integration. Configuration file path: {}", file.getAbsolutePath());
        startServer(new ResourceLoaderConfig(new FileResourceLoader(file)));
    }

    public void startServer(Properties properties) throws IOException {
        LOG.debug("Starting Moquette integration using properties object");
        startServer(new MemoryConfig(properties));
    }

    public void startServer(IConfig iConfig) throws IOException {
        LOG.debug("Starting Moquette integration using IConfig instance");
        startServer(iConfig, null);
    }

    public void startServer(IConfig iConfig, List<? extends InterceptHandler> list) throws IOException {
        LOG.debug("Starting moquette integration using IConfig instance and intercept handlers");
        startServer(iConfig, list, null, null, null);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v66, types: [io.moquette.broker.IQueueRepository] */
    /* JADX WARN: Type inference failed for: r0v70, types: [io.moquette.broker.ISubscriptionsRepository] */
    /* JADX WARN: Type inference failed for: r0v73, types: [io.moquette.broker.IRetainedRepository] */
    /* JADX WARN: Type inference failed for: r0v76, types: [io.moquette.broker.ISessionsRepository] */
    public void startServer(IConfig iConfig, List<? extends InterceptHandler> list, ISslContextCreator iSslContextCreator, IAuthenticator iAuthenticator, IAuthorizatorPolicy iAuthorizatorPolicy) throws IOException {
        MemorySubscriptionsRepository memorySubscriptionsRepository;
        MemoryQueueRepository memoryQueueRepository;
        MemoryRetainedRepository memoryRetainedRepository;
        MemorySessionsRepository memorySessionsRepository;
        long currentTimeMillis = System.currentTimeMillis();
        if (list == null) {
            list = Collections.emptyList();
        }
        LOG.trace("Starting Moquette Server. MQTT message interceptors={}", LoggingUtils.getInterceptorIds(list));
        this.scheduler = Executors.newScheduledThreadPool(1);
        String property = System.getProperty("intercept.handler");
        if (property != null) {
            iConfig.setProperty("intercept.handler", property);
        }
        initInterceptors(iConfig, list);
        LOG.debug("Initialized MQTT protocol processor");
        if (iSslContextCreator == null) {
            LOG.info("Using default SSL context creator");
            iSslContextCreator = new DefaultMoquetteSslContextCreator(iConfig);
        }
        IAuthenticator initializeAuthenticator = initializeAuthenticator(iAuthenticator, iConfig);
        IAuthorizatorPolicy initializeAuthorizatorPolicy = initializeAuthorizatorPolicy(iAuthorizatorPolicy, iConfig);
        if (iConfig.getProperty("persistent_store") != null) {
            LOG.warn("Using a deprecated setting {} please update to {}", "persistent_store", IConfig.DATA_PATH_PROPERTY_NAME);
            LOG.warn("Forcing {} to true", IConfig.PERSISTENCE_ENABLED_PROPERTY_NAME);
            iConfig.setProperty(IConfig.PERSISTENCE_ENABLED_PROPERTY_NAME, Boolean.TRUE.toString());
            String property2 = iConfig.getProperty("persistent_store");
            String substring = property2.substring(0, property2.lastIndexOf("/"));
            LOG.warn("Forcing {} to {}", IConfig.DATA_PATH_PROPERTY_NAME, substring);
            iConfig.setProperty(IConfig.DATA_PATH_PROPERTY_NAME, substring);
        }
        Clock systemDefaultZone = Clock.systemDefaultZone();
        if (Boolean.parseBoolean(iConfig.getProperty(IConfig.PERSISTENCE_ENABLED_PROPERTY_NAME))) {
            Path path = Paths.get(iConfig.getProperty(IConfig.DATA_PATH_PROPERTY_NAME), new String[0]);
            if (!path.toFile().exists()) {
                if (path.toFile().mkdirs()) {
                    LOG.debug("Created data_path {} folder", path);
                } else {
                    LOG.warn("Impossible to create the data_path {}", path);
                }
            }
            LOG.debug("Configuring persistent subscriptions store and queues, path: {}", path);
            this.h2Builder = new H2Builder(this.scheduler, path, Integer.parseInt(iConfig.getProperty("autosave_interval", "30")), systemDefaultZone).initStore();
            memoryQueueRepository = initQueuesRepository(iConfig, path, this.h2Builder);
            LOG.trace("Configuring H2 subscriptions repository");
            memorySubscriptionsRepository = this.h2Builder.subscriptionsRepository();
            memoryRetainedRepository = this.h2Builder.retainedRepository();
            memorySessionsRepository = this.h2Builder.sessionsRepository();
        } else {
            LOG.trace("Configuring in-memory subscriptions store");
            memorySubscriptionsRepository = new MemorySubscriptionsRepository();
            memoryQueueRepository = new MemoryQueueRepository();
            memoryRetainedRepository = new MemoryRetainedRepository();
            memorySessionsRepository = new MemorySessionsRepository();
        }
        CTrieSubscriptionDirectory cTrieSubscriptionDirectory = new CTrieSubscriptionDirectory();
        cTrieSubscriptionDirectory.init(memorySubscriptionsRepository);
        Authorizator authorizator = new Authorizator(initializeAuthorizatorPolicy);
        int millis = iConfig.getProperty(IConfig.PERSISTENT_CLIENT_EXPIRATION_PROPERTY_NAME) != null ? ((int) iConfig.durationProp(IConfig.PERSISTENT_CLIENT_EXPIRATION_PROPERTY_NAME).toMillis()) / 1000 : Integer.MAX_VALUE;
        SessionEventLoopGroup sessionEventLoopGroup = new SessionEventLoopGroup(this.interceptor, iConfig.intProp(IConfig.SESSION_QUEUE_SIZE, 1024));
        this.sessions = new SessionRegistry(cTrieSubscriptionDirectory, memorySessionsRepository, memoryQueueRepository, authorizator, this.scheduler, systemDefaultZone, millis, sessionEventLoopGroup);
        this.dispatcher = new PostOffice(cTrieSubscriptionDirectory, memoryRetainedRepository, this.sessions, this.interceptor, authorizator, sessionEventLoopGroup);
        BrokerConfiguration brokerConfiguration = new BrokerConfiguration(iConfig);
        NewNettyMQTTHandler newNettyMQTTHandler = new NewNettyMQTTHandler(new MQTTConnectionFactory(brokerConfiguration, initializeAuthenticator, this.sessions, this.dispatcher));
        this.acceptor = new NewNettyAcceptor();
        this.acceptor.initialize(newNettyMQTTHandler, iConfig, iSslContextCreator, brokerConfiguration);
        LOG.info("Moquette integration has been started successfully in {} ms", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
        if (iConfig.boolProp(IConfig.ENABLE_TELEMETRY_NAME, true)) {
            collectAndSendTelemetryDataAsynch(iConfig);
        }
        this.initialized = true;
    }

    private static IQueueRepository initQueuesRepository(IConfig iConfig, Path path, H2Builder h2Builder) throws IOException {
        IQueueRepository segmentQueueRepository;
        String property = iConfig.getProperty(IConfig.PERSISTENT_QUEUE_TYPE_PROPERTY_NAME);
        if ("h2".equalsIgnoreCase(property)) {
            LOG.info("Configuring H2 queue store");
            segmentQueueRepository = h2Builder.queueRepository();
        } else {
            if (!"segmented".equalsIgnoreCase(property)) {
                throw new RuntimeException(String.format("Invalid property for %s found [%s] while only h2 or segmented are admitted", IConfig.PERSISTENT_QUEUE_TYPE_PROPERTY_NAME, property));
            }
            LOG.info("Configuring segmented queue store to {}", path);
            try {
                segmentQueueRepository = new SegmentQueueRepository(path, iConfig.intProp("queue_page_size", 67108864), iConfig.intProp("queue_segment_size", 4194304));
            } catch (QueueException e) {
                throw new IOException("Problem in configuring persistent queue on path " + path, e);
            }
        }
        return segmentQueueRepository;
    }

    private void collectAndSendTelemetryDataAsynch(IConfig iConfig) {
        new Thread(() -> {
            collectAndSendTelemetryData(iConfig);
        }).start();
    }

    private void collectAndSendTelemetryData(IConfig iConfig) {
        try {
            sendTelemetryData(collectTelemetryData(checkOrCreateUUID(iConfig)));
        } catch (IOException e) {
            LOG.info("Can't reach the telemetry collector");
            if (LOG.isDebugEnabled()) {
                LOG.debug("Original exception", e);
            }
        }
    }

    private String checkOrCreateUUID(IConfig iConfig) {
        Path path = Paths.get(iConfig.getProperty(IConfig.DATA_PATH_PROPERTY_NAME, ""), ".moquette_uuid");
        if (Files.exists(path, new LinkOption[0])) {
            try {
                return new String(Files.readAllBytes(path), StandardCharsets.UTF_8);
            } catch (IOException e) {
                LOG.error("Problem accessing file path: {}", path, e);
            }
        }
        UUID randomUUID = UUID.randomUUID();
        try {
            FileWriter fileWriter = new FileWriter(path.toFile(), false);
            fileWriter.write(randomUUID.toString());
            fileWriter.close();
        } catch (IOException e2) {
            LOG.error("Problem writing new UUID to file path: {}", path, e2);
        }
        return randomUUID.toString();
    }

    private String collectTelemetryData(String str) {
        String property = System.getProperty("os.name");
        String property2 = System.getProperty("os.arch");
        String property3 = System.getProperty("java.specification.version");
        String property4 = System.getProperty("java.vendor");
        long maxMemory = Runtime.getRuntime().maxMemory();
        return String.format("{\"os\": \"%s\", \"cpu_arch\": \"%s\", \"jvm_version\": \"%s\", \"jvm_vendor\": \"%s\", \"broker_version\": \"%s\", \"standalone\": %s,\"max_heap\": \"%s\", \"remote_ip\": \"%s\", \"uuid\": \"%s\"}", property, property2, property3, property4, MOQUETTE_VERSION, Boolean.valueOf(this.standalone), maxMemory == Long.MAX_VALUE ? "undefined" : Long.toString(maxMemory), "uncollected", str);
    }

    private String retrievePublicIP() {
        try {
            HttpURLConnection httpURLConnection = (HttpURLConnection) new URL("http://whatismyip.akamai.com").openConnection();
            int responseCode = httpURLConnection.getResponseCode();
            if (responseCode == 200) {
                return new BufferedReader(new InputStreamReader(httpURLConnection.getInputStream())).readLine();
            }
            LOG.debug("What's my IP service replied with {}", Integer.valueOf(responseCode));
            return "";
        } catch (Exception e) {
            LOG.debug("Can't connect to what's my IP service");
            return "";
        }
    }

    private void sendTelemetryData(String str) throws IOException {
        HttpURLConnection httpURLConnection = (HttpURLConnection) new URL("https://telemetry.moquette.io/api/v1/notify").openConnection();
        httpURLConnection.setRequestMethod("POST");
        httpURLConnection.setRequestProperty("Content-Type", "application/json");
        httpURLConnection.setRequestProperty("Accept", "application/json");
        httpURLConnection.setInstanceFollowRedirects(true);
        httpURLConnection.setDoOutput(true);
        byte[] bytes = str.getBytes("utf-8");
        OutputStream outputStream = httpURLConnection.getOutputStream();
        try {
            outputStream.write(bytes, 0, bytes.length);
            if (outputStream != null) {
                outputStream.close();
            }
            int responseCode = httpURLConnection.getResponseCode();
            LOG.trace("Response code is {}", Integer.valueOf(responseCode));
            boolean z = false;
            if (responseCode != 200 && (responseCode == 302 || responseCode == 301 || responseCode == 303)) {
                z = true;
            }
            LOG.trace("Response Code: {} ", Integer.valueOf(responseCode));
            if (z) {
                String headerField = httpURLConnection.getHeaderField("Location");
                httpURLConnection = (HttpURLConnection) new URL(headerField).openConnection();
                httpURLConnection.addRequestProperty("Accept-Language", "en-US,en;q=0.8");
                httpURLConnection.addRequestProperty("User-Agent", "Mozilla");
                httpURLConnection.addRequestProperty("Referer", "google.com");
                httpURLConnection.setRequestMethod("POST");
                httpURLConnection.setDoOutput(true);
                outputStream = httpURLConnection.getOutputStream();
                try {
                    outputStream.write(bytes, 0, bytes.length);
                    if (outputStream != null) {
                        outputStream.close();
                    }
                    LOG.trace("Redirect to URL: {}", headerField);
                } finally {
                }
            }
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(httpURLConnection.getInputStream()));
            StringBuffer stringBuffer = new StringBuffer();
            while (true) {
                String readLine = bufferedReader.readLine();
                if (readLine == null) {
                    bufferedReader.close();
                    LOG.trace("Content: {}", stringBuffer);
                    httpURLConnection.disconnect();
                    return;
                }
                stringBuffer.append(readLine);
            }
        } finally {
        }
    }

    private IAuthorizatorPolicy initializeAuthorizatorPolicy(IAuthorizatorPolicy iAuthorizatorPolicy, IConfig iConfig) {
        LOG.debug("Configuring MQTT authorizator policy");
        String property = iConfig.getProperty(IConfig.AUTHORIZATOR_CLASS_NAME, "");
        if (iAuthorizatorPolicy == null && !property.isEmpty()) {
            iAuthorizatorPolicy = (IAuthorizatorPolicy) loadClass(property, IAuthorizatorPolicy.class, IConfig.class, iConfig);
        }
        if (iAuthorizatorPolicy == null) {
            String property2 = iConfig.getProperty(IConfig.ACL_FILE_PROPERTY_NAME, "");
            if (property2 == null || property2.isEmpty()) {
                iAuthorizatorPolicy = new PermitAllAuthorizatorPolicy();
            } else {
                iAuthorizatorPolicy = new DenyAllAuthorizatorPolicy();
                try {
                    LOG.info("Parsing ACL file. Path = {}", property2);
                    iAuthorizatorPolicy = ACLFileParser.parse(iConfig.getResourceLoader().loadResource(property2));
                } catch (ParseException e) {
                    LOG.error("Unable to parse ACL file. path = {}", property2, e);
                }
            }
            LOG.info("Authorizator policy {} instance will be used", iAuthorizatorPolicy.getClass().getName());
        }
        return iAuthorizatorPolicy;
    }

    private IAuthenticator initializeAuthenticator(IAuthenticator iAuthenticator, IConfig iConfig) {
        LOG.debug("Configuring MQTT authenticator");
        String property = iConfig.getProperty(IConfig.AUTHENTICATOR_CLASS_NAME, "");
        if (iAuthenticator == null && !property.isEmpty()) {
            iAuthenticator = (IAuthenticator) loadClass(property, IAuthenticator.class, IConfig.class, iConfig);
        }
        IResourceLoader resourceLoader = iConfig.getResourceLoader();
        if (iAuthenticator == null) {
            String property2 = iConfig.getProperty(IConfig.PASSWORD_FILE_PROPERTY_NAME, "");
            iAuthenticator = property2.isEmpty() ? new AcceptAllAuthenticator() : new ResourceAuthenticator(resourceLoader, property2);
            LOG.info("An {} authenticator instance will be used", iAuthenticator.getClass().getName());
        }
        return iAuthenticator;
    }

    private void initInterceptors(IConfig iConfig, List<? extends InterceptHandler> list) {
        InterceptHandler interceptHandler;
        LOG.info("Configuring message interceptors...");
        ArrayList arrayList = new ArrayList(list);
        String property = iConfig.getProperty("intercept.handler");
        if (property != null && !property.isEmpty() && (interceptHandler = (InterceptHandler) loadClass(property, InterceptHandler.class, Server.class, this)) != null) {
            arrayList.add(interceptHandler);
        }
        this.interceptor = new BrokerInterceptor(iConfig, arrayList);
    }

    /* JADX WARN: Multi-variable type inference failed */
    private <T, U> T loadClass(String str, Class<T> cls, Class<U> cls2, U u) {
        Object newInstance;
        try {
            LOG.info("Invoking constructor with {} argument. ClassName={}, interfaceName={}", new Object[]{cls2.getName(), str, cls.getName()});
            newInstance = getClass().getClassLoader().loadClass(str).asSubclass(cls).getConstructor(cls2).newInstance(u);
        } catch (ClassNotFoundException | IllegalAccessException | InstantiationException e) {
            LOG.warn("Unable to invoke constructor with {} argument. ClassName={}, interfaceName={}, cause={}, errorMessage={}", new Object[]{cls2.getName(), str, cls.getName(), e.getCause(), e.getMessage()});
            return null;
        } catch (NoSuchMethodException | InvocationTargetException e2) {
            try {
                LOG.info("Invoking default constructor. ClassName={}, interfaceName={}", str, cls.getName());
                newInstance = getClass().getClassLoader().loadClass(str).asSubclass(cls).getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
            } catch (ClassNotFoundException | IllegalAccessException | InstantiationException | NoSuchMethodException | InvocationTargetException e3) {
                LOG.error("Unable to invoke default constructor. ClassName={}, interfaceName={}, cause={}, errorMessage={}", new Object[]{str, cls.getName(), e3.getCause(), e3.getMessage()});
                return null;
            }
        }
        return (T) newInstance;
    }

    public RoutingResults internalPublish(MqttPublishMessage mqttPublishMessage, String str) {
        int packetId = mqttPublishMessage.variableHeader().packetId();
        if (!this.initialized) {
            LOG.error("Moquette is not started, internal message cannot be published. CId: {}, messageId: {}", str, Integer.valueOf(packetId));
            throw new IllegalStateException("Can't publish on a integration is not yet started");
        }
        LOG.trace("Internal publishing message CId: {}, messageId: {}", str, Integer.valueOf(packetId));
        RoutingResults internalPublish = this.dispatcher.internalPublish(mqttPublishMessage);
        mqttPublishMessage.payload().release();
        return internalPublish;
    }

    public void stopServer() {
        LOG.info("Unbinding integration from the configured ports");
        if (this.acceptor == null) {
            LOG.error("Closing a badly started server, exit immediately");
            return;
        }
        this.acceptor.close();
        LOG.trace("Stopping MQTT protocol processor");
        this.initialized = false;
        this.scheduler.shutdownNow();
        this.sessions.close();
        if (this.h2Builder != null) {
            LOG.trace("Shutting down H2 persistence {}");
            this.h2Builder.closeStore();
        }
        this.interceptor.stop();
        this.dispatcher.terminate();
        LOG.info("Moquette integration has been stopped.");
    }

    public int getPort() {
        return this.acceptor.getPort();
    }

    public int getSslPort() {
        return this.acceptor.getSslPort();
    }

    public void addInterceptHandler(InterceptHandler interceptHandler) {
        if (!this.initialized) {
            LOG.error("Moquette is not started, MQTT message interceptor cannot be added. InterceptorId={}", interceptHandler.getID());
            throw new IllegalStateException("Can't register interceptors on a integration that is not yet started");
        }
        LOG.info("Adding MQTT message interceptor. InterceptorId={}", interceptHandler.getID());
        this.interceptor.addInterceptHandler(interceptHandler);
    }

    public void removeInterceptHandler(InterceptHandler interceptHandler) {
        if (!this.initialized) {
            LOG.error("Moquette is not started, MQTT message interceptor cannot be removed. InterceptorId={}", interceptHandler.getID());
            throw new IllegalStateException("Can't deregister interceptors from a integration that is not yet started");
        }
        LOG.info("Removing MQTT message interceptor. InterceptorId={}", interceptHandler.getID());
        this.interceptor.removeInterceptHandler(interceptHandler);
    }

    public Collection<ClientDescriptor> listConnectedClients() {
        if (this.initialized) {
            return this.sessions.listConnectedClients();
        }
        LOG.error("Moquette is not started, MQTT clients listing unavailable");
        throw new IllegalStateException("Can't get clients list from a Server that is not yet started");
    }

    public boolean disconnectClient(String str) {
        return this.sessions.dropSession(str, false);
    }

    public boolean disconnectAndPurgeClientState(String str) {
        return this.sessions.dropSession(str, true);
    }

    public FluentConfig withConfig() {
        return new FluentConfig(this);
    }
}
