package org.eclipse.osee.framework.messaging.internal.activemq;

import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.logging.Level;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TemporaryTopic;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.eclipse.osee.framework.jdk.core.type.CompositeKeyHashMap;
import org.eclipse.osee.framework.jdk.core.type.OseeCoreException;
import org.eclipse.osee.framework.jdk.core.type.Pair;
import org.eclipse.osee.framework.logging.OseeLog;
import org.eclipse.osee.framework.messaging.ConnectionListener;
import org.eclipse.osee.framework.messaging.ConnectionNodeFailoverSupport;
import org.eclipse.osee.framework.messaging.MessageID;
import org.eclipse.osee.framework.messaging.NodeInfo;
import org.eclipse.osee.framework.messaging.OseeMessagingListener;
import org.eclipse.osee.framework.messaging.OseeMessagingStatusCallback;
import org.eclipse.osee.framework.messaging.internal.Activator;
import org.eclipse.osee.framework.messaging.internal.ConsoleDebugSupport;
import org.eclipse.osee.framework.messaging.internal.ServiceUtility;
import org.eclipse.osee.framework.messaging.services.internal.OseeMessagingStatusImpl;

/* loaded from: input_file:org/eclipse/osee/framework/messaging/internal/activemq/ConnectionNodeActiveMq.class */
class ConnectionNodeActiveMq implements ConnectionNodeFailoverSupport, MessageListener {
    private final NodeInfo nodeInfo;
    private Connection connection;
    private Session session;
    private TemporaryTopic temporaryTopic;
    private MessageConsumer replyToConsumer;
    private final ExceptionListener exceptionListener;
    private MessageProducer replyProducer;
    private boolean started = false;
    private final ActiveMqUtil activeMqUtil = new ActiveMqUtil();
    private final ConcurrentHashMap<String, Topic> topicCache = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<Topic, MessageProducer> messageProducerCache = new ConcurrentHashMap<>();
    private final CompositeKeyHashMap<String, MessageConsumer, OseeMessagingListener> regularListeners = new CompositeKeyHashMap<>(64, true);
    private final Map<String, OseeMessagingListener> replyListeners = new ConcurrentHashMap();

    public ConnectionNodeActiveMq(String str, String str2, NodeInfo nodeInfo, ExecutorService executorService, ExceptionListener exceptionListener) {
        this.nodeInfo = nodeInfo;
        this.exceptionListener = exceptionListener;
    }

    @Override // org.eclipse.osee.framework.messaging.ConnectionNodeFailoverSupport
    public synchronized void start() {
        if (this.started) {
            return;
        }
        try {
            this.connection = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER, ActiveMQConnectionFactory.DEFAULT_PASSWORD, this.nodeInfo.getUri().toASCIIString()).createConnection();
            this.connection.setExceptionListener(this.exceptionListener);
            this.session = this.connection.createSession(false, 2);
            this.temporaryTopic = this.session.createTemporaryTopic();
            this.replyToConsumer = this.session.createConsumer(this.temporaryTopic);
            this.replyToConsumer.setMessageListener(this);
            this.replyProducer = this.session.createProducer((Destination) null);
            this.connection.start();
            this.started = true;
        } catch (Throwable th) {
            OseeCoreException.wrapAndThrow(th);
        }
    }

    @Override // org.eclipse.osee.framework.messaging.ConnectionNode
    public void send(MessageID messageID, Object obj) {
        send(messageID, obj, new OseeMessagingStatusImpl(String.format("Error sending message(%s)", messageID.getId()), getClass()));
    }

    @Override // org.eclipse.osee.framework.messaging.ConnectionNode
    public synchronized void send(MessageID messageID, Object obj, OseeMessagingStatusCallback oseeMessagingStatusCallback) {
        send(messageID, obj, null, oseeMessagingStatusCallback);
    }

    @Override // org.eclipse.osee.framework.messaging.ConnectionNode
    public synchronized void send(MessageID messageID, Object obj, Properties properties, OseeMessagingStatusCallback oseeMessagingStatusCallback) {
        try {
            if (messageID.isTopic()) {
                try {
                    sendInternal(messageID, obj, properties);
                    oseeMessagingStatusCallback.success();
                } catch (JMSException unused) {
                    removeProducerFromCache(messageID);
                    sendInternal(messageID, obj, properties);
                    oseeMessagingStatusCallback.success();
                }
            }
        } catch (Exception e) {
            oseeMessagingStatusCallback.fail(e);
            OseeCoreException.wrapAndThrow(e);
        }
    }

    private synchronized void sendInternal(MessageID messageID, Object obj, Properties properties) throws JMSException {
        ConsoleDebugSupport consoleDebugSupport = ServiceUtility.getConsoleDebugSupport();
        if (consoleDebugSupport != null) {
            if (consoleDebugSupport.getPrintSends()) {
                System.out.println("+++++++++++++++++++++++++++++++++++++++++++++++");
                System.out.println(String.valueOf(messageID.getName()) + " ==> " + new Date());
                if (properties != null) {
                    System.out.println("PROPERTIES:");
                    System.out.println(properties.toString());
                }
                System.out.println("MESSAGE:");
                System.out.println(obj.toString());
                System.out.println("STACK:");
                for (StackTraceElement stackTraceElement : Thread.currentThread().getStackTrace()) {
                    System.out.println("   " + stackTraceElement.toString());
                }
                System.out.println("-----------------------------------------------------------------------------");
            }
            consoleDebugSupport.addSend(messageID);
        }
        MessageProducer orCreateProducer = getOrCreateProducer(getOrCreateTopic(messageID));
        Message createMessage = this.activeMqUtil.createMessage(this.session, messageID.getSerializationClass(), obj);
        if (messageID.isReplyRequired()) {
            createMessage.setJMSReplyTo(this.temporaryTopic);
        }
        if (properties != null) {
            for (Map.Entry entry : properties.entrySet()) {
                if (entry.getValue() instanceof Integer) {
                    createMessage.setIntProperty(entry.getKey().toString(), ((Integer) entry.getValue()).intValue());
                }
                if (entry.getValue() instanceof Boolean) {
                    createMessage.setBooleanProperty(entry.getKey().toString(), ((Boolean) entry.getValue()).booleanValue());
                }
                if (entry.getValue() instanceof Byte) {
                    createMessage.setByteProperty(entry.getKey().toString(), ((Byte) entry.getValue()).byteValue());
                }
                if (entry.getValue() instanceof Double) {
                    createMessage.setDoubleProperty(entry.getKey().toString(), ((Double) entry.getValue()).doubleValue());
                }
                if (entry.getValue() instanceof Float) {
                    createMessage.setFloatProperty(entry.getKey().toString(), ((Float) entry.getValue()).floatValue());
                }
                if (entry.getValue() instanceof Long) {
                    createMessage.setLongProperty(entry.getKey().toString(), ((Long) entry.getValue()).longValue());
                }
                if (entry.getValue() instanceof String) {
                    createMessage.setStringProperty(entry.getKey().toString(), (String) entry.getValue());
                }
                if (entry.getValue() instanceof Short) {
                    createMessage.setShortProperty(entry.getKey().toString(), ((Short) entry.getValue()).shortValue());
                } else {
                    createMessage.setObjectProperty(entry.getKey().toString(), entry.getValue());
                }
            }
        }
        orCreateProducer.send(createMessage);
    }

    @Override // org.eclipse.osee.framework.messaging.ConnectionNode
    public synchronized void subscribe(MessageID messageID, OseeMessagingListener oseeMessagingListener, OseeMessagingStatusCallback oseeMessagingStatusCallback) {
        try {
            if (isConnectedThrow()) {
                MessageConsumer createConsumer = this.session.createConsumer(getOrCreateTopic(messageID));
                createConsumer.setMessageListener(new ActiveMqMessageListenerWrapper(this.activeMqUtil, this.replyProducer, this.session, oseeMessagingListener));
                this.regularListeners.put(messageID.getId(), createConsumer, oseeMessagingListener);
                oseeMessagingStatusCallback.success();
            } else {
                oseeMessagingStatusCallback.fail(new OseeCoreException("This connection is not started.", new Object[0]));
            }
        } catch (NullPointerException e) {
            oseeMessagingStatusCallback.fail(e);
        } catch (JMSException e2) {
            oseeMessagingStatusCallback.fail(e2);
        }
    }

    @Override // org.eclipse.osee.framework.messaging.ConnectionNode
    public void subscribe(MessageID messageID, OseeMessagingListener oseeMessagingListener, String str, OseeMessagingStatusCallback oseeMessagingStatusCallback) {
        try {
            if (isConnectedThrow()) {
                MessageConsumer createConsumer = this.session.createConsumer(getOrCreateTopic(messageID), str);
                createConsumer.setMessageListener(new ActiveMqMessageListenerWrapper(this.activeMqUtil, this.replyProducer, this.session, oseeMessagingListener));
                this.regularListeners.put(messageID.getId(), createConsumer, oseeMessagingListener);
                oseeMessagingStatusCallback.success();
            } else {
                oseeMessagingStatusCallback.fail(new OseeCoreException("This connection is not started.", new Object[0]));
            }
        } catch (JMSException e) {
            oseeMessagingStatusCallback.fail(e);
        } catch (NullPointerException e2) {
            oseeMessagingStatusCallback.fail(e2);
        }
    }

    @Override // org.eclipse.osee.framework.messaging.ConnectionNode
    public void subscribe(MessageID messageID, OseeMessagingListener oseeMessagingListener) {
        subscribe(messageID, oseeMessagingListener, new OseeMessagingStatusImpl(String.format("Error subscribing message(%s)", messageID.getId()), getClass()));
    }

    private Topic getOrCreateTopic(MessageID messageID) throws JMSException {
        Topic topic = this.topicCache.get(messageID.getId());
        if (topic == null) {
            topic = this.session.createTopic(messageID.getId());
            this.topicCache.put(messageID.getId(), topic);
        }
        return topic;
    }

    private MessageProducer getOrCreateProducer(Topic topic) throws JMSException {
        MessageProducer messageProducer = this.messageProducerCache.get(topic);
        if (messageProducer == null) {
            messageProducer = this.session.createProducer(topic);
            messageProducer.setDeliveryMode(1);
            this.messageProducerCache.put(topic, messageProducer);
        }
        return messageProducer;
    }

    private void removeProducerFromCache(MessageID messageID) throws JMSException {
        this.messageProducerCache.remove(getOrCreateTopic(messageID));
    }

    @Override // org.eclipse.osee.framework.messaging.ConnectionNode
    public boolean subscribeToReply(MessageID messageID, OseeMessagingListener oseeMessagingListener) {
        this.replyListeners.put(messageID.getId(), oseeMessagingListener);
        return true;
    }

    @Override // org.eclipse.osee.framework.messaging.ConnectionNode
    public void unsubscribe(MessageID messageID, OseeMessagingListener oseeMessagingListener) {
        unsubscribe(messageID, oseeMessagingListener, new OseeMessagingStatusImpl(String.format("Error unsubscribing message(%s)", messageID.getId()), getClass()));
    }

    @Override // org.eclipse.osee.framework.messaging.ConnectionNode
    public void unsubscribe(MessageID messageID, OseeMessagingListener oseeMessagingListener, OseeMessagingStatusCallback oseeMessagingStatusCallback) {
        Map keyedValues = this.regularListeners.getKeyedValues(messageID.getId());
        ArrayList<MessageConsumer> arrayList = new ArrayList();
        if (keyedValues != null) {
            try {
                for (Map.Entry entry : keyedValues.entrySet()) {
                    if (((OseeMessagingListener) entry.getValue()).equals(oseeMessagingListener)) {
                        arrayList.add((MessageConsumer) entry.getKey());
                    }
                }
                for (MessageConsumer messageConsumer : arrayList) {
                    keyedValues.remove(messageConsumer);
                    messageConsumer.close();
                }
            } catch (JMSException e) {
                oseeMessagingStatusCallback.fail(e);
            }
        }
        oseeMessagingStatusCallback.success();
    }

    @Override // org.eclipse.osee.framework.messaging.ConnectionNode
    public boolean unsubscribteToReply(MessageID messageID, OseeMessagingListener oseeMessagingListener) {
        this.replyListeners.remove(messageID.getId());
        return true;
    }

    public void onMessage(Message message) {
        OseeMessagingListener oseeMessagingListener;
        try {
            String jMSCorrelationID = message.getJMSCorrelationID();
            if (jMSCorrelationID != null && (oseeMessagingListener = this.replyListeners.get(jMSCorrelationID)) != null) {
                oseeMessagingListener.process(this.activeMqUtil.translateMessage(message, oseeMessagingListener.getClazz()), new HashMap(), new ReplyConnectionActiveMqImpl());
            }
        } catch (JMSException e) {
            OseeLog.log(ConnectionNodeActiveMq.class, Level.SEVERE, e);
        } catch (OseeCoreException e2) {
            OseeLog.log(ConnectionNodeActiveMq.class, Level.SEVERE, e2);
        }
        OseeLog.logf(Activator.class, Level.FINE, "recieved reply message %s", new Object[]{message});
    }

    @Override // org.eclipse.osee.framework.messaging.ConnectionNode
    public synchronized void stop() {
        this.topicCache.clear();
        this.messageProducerCache.clear();
        this.regularListeners.clear();
        this.started = false;
        try {
            if (this.session != null) {
                this.session.close();
                this.session = null;
            }
        } catch (JMSException e) {
            OseeLog.log(ConnectionNodeActiveMq.class, Level.FINEST, e);
        }
        try {
            if (this.connection != null) {
                this.connection.setExceptionListener((ExceptionListener) null);
                this.connection.close();
                this.connection = null;
            }
        } catch (JMSException e2) {
            OseeLog.log(ConnectionNodeActiveMq.class, Level.FINEST, e2);
        }
    }

    @Override // org.eclipse.osee.framework.messaging.ConnectionNodeFailoverSupport
    public synchronized boolean isConnected() {
        try {
            return isConnectedThrow();
        } catch (JMSException unused) {
            this.started = false;
            return false;
        }
    }

    private synchronized boolean isConnectedThrow() throws JMSException {
        if (this.connection == null || !this.started) {
            return false;
        }
        this.connection.getMetaData();
        return true;
    }

    @Override // org.eclipse.osee.framework.messaging.ConnectionNode
    public void addConnectionListener(ConnectionListener connectionListener) {
    }

    @Override // org.eclipse.osee.framework.messaging.ConnectionNode
    public void removeConnectionListener(ConnectionListener connectionListener) {
    }

    @Override // org.eclipse.osee.framework.messaging.ConnectionNode
    public String getSenders() {
        StringBuilder sb = new StringBuilder();
        for (Map.Entry<Topic, MessageProducer> entry : this.messageProducerCache.entrySet()) {
            try {
                sb.append(String.format("Topic [%s] \n", entry.getKey().getTopicName()));
                sb.append(String.format("\tProducer Destination [%s]\n", entry.getValue().getDestination().toString()));
            } catch (JMSException e) {
                OseeLog.log(Activator.class, Level.SEVERE, e);
            }
        }
        return sb.toString();
    }

    @Override // org.eclipse.osee.framework.messaging.ConnectionNode
    public String getSubscribers() {
        StringBuilder sb = new StringBuilder();
        for (Pair pair : this.regularListeners.keySet()) {
            try {
                sb.append(String.format("Topic [%s] \n", pair.getFirst()));
                sb.append(String.format("\tConsumer Selector [%s]\n", ((MessageConsumer) pair.getSecond()).getMessageSelector()));
                MessageListener messageListener = ((MessageConsumer) pair.getSecond()).getMessageListener();
                if (messageListener instanceof ActiveMqMessageListenerWrapper) {
                    sb.append("\tConsumer Listeners:\n");
                    sb.append(String.format("\t\t%s\n", ((ActiveMqMessageListenerWrapper) messageListener).getListener().toString()));
                }
            } catch (JMSException e) {
                OseeLog.log(Activator.class, Level.SEVERE, e);
            }
        }
        return sb.toString();
    }

    @Override // org.eclipse.osee.framework.messaging.ConnectionNode
    public String getSummary() {
        return this.nodeInfo.toString() + "\n" + String.format("\tisStarted[%b]\n", Boolean.valueOf(this.started)) + getSenders() + getSubscribers();
    }
}
