package com.anahata.yam.tech.pull.jms;

import com.anahata.util.transport.rpc.Rpc;
import com.anahata.yam.tech.pull.Pull;
import com.anahata.yam.tech.pushpull.ClassMetadataSelector;
import java.lang.ref.ReferenceQueue;
import java.lang.reflect.Method;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.Topic;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/anahata/yam/tech/pull/jms/JmsPull.class */
public class JmsPull<T> extends Pull<T> implements MessageListener {
    private static final Logger log = LoggerFactory.getLogger(JmsPull.class);
    private Session session;
    private Topic topic;
    private MessageConsumer consumer;
    private String selector;

    public void init(Session session, Topic topic, @NonNull T t, ReferenceQueue referenceQueue, boolean z, @NonNull Class<T> cls, String... strArr) throws Exception {
        if (t == null) {
            throw new NullPointerException("listener is marked non-null but is null");
        }
        if (cls == null) {
            throw new NullPointerException("interfaceType is marked non-null but is null");
        }
        super.init(t, referenceQueue, z, cls, strArr);
        this.session = session;
        this.topic = topic;
        this.selector = ClassMetadataSelector.create(cls, new Method[0]);
        if (strArr.length > 0) {
            for (String str : strArr) {
                this.selector += " AND (" + str + ")";
            }
        }
        log.debug("selector: {}", this.selector);
    }

    @Override // com.anahata.yam.tech.pull.Pull
    protected void doOpen() throws Exception {
        this.consumer = this.session.createConsumer(this.topic, this.selector);
        this.consumer.setMessageListener(this);
    }

    public void onMessage(Message message) {
        if (isClosed()) {
            log.debug("Got JMS message when pull was closed. ignoring {}", message);
            return;
        }
        log.debug("Got JMS message{}", message);
        try {
            super.invoke(Rpc.parse(message));
        } catch (Exception e) {
            log.warn("Exception processing callback message", e);
            throw new RuntimeException(e);
        }
    }

    @Override // com.anahata.yam.tech.pull.Pull
    protected synchronized void doClose() throws Exception {
        log.debug("Closing consumer {}", this);
        this.consumer.close();
        log.debug("Consumer closed {} ", this);
    }
}
