目次へ

3.JMSの基礎3

2006.11.17 株式会社四次元データ 宮澤了祐

3.1.Publisher-Subscriberメッセージモデル

ここでは、一対多でメッセージングを行うPublisher-Subscriberモデルについて解説します。

Publisher-Subscriberモデルでは、まず送信者はJMSサーバ内部に作成されたトピック(Destinationオブジェクト)に対してメッセージを発行します。 トピックとは話題を意味する単語で、その名の通りメッセージを分類するオブジェクトです。 受信者側は予め、JMSサーバに対してそのトピックに送られたメッセージを購読(Subscribe)することを伝えておきます。 メッセージが発行されれば、購読を申し込んでいる全ての受信者にメッセージが送られます。全ての受信者がメッセージを受け取った際にはじめてメッセージはサーバから削除されます。

サンプルプログラムを用いて解説していきます。まずは送信側です。

import javax.jms.JMSException;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;

import org.activemq.ActiveMQConnectionFactory;

public class Publisher {

    public static void main(String[] args){
        TopicConnectionFactory factory 
            = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL);
        try {
            //Publisher-Subscriberモデル用のコネクションの作成
            TopicConnection connection = factory.createTopicConnection();
            TopicSession session = connection.createTopicSession(false,
                                                    TopicSession.AUTO_ACKNOWLEDGE);
            
            connection.start();
            
            //Topicオブジェクトの作成
            Topic topic = session.createTopic("TestTopic");
            
            //Topicに対してメッセージを送るMessageProducerの作成
            TopicPublisher publisher = session.createPublisher(topic);
            
            //メッセージの送信
            TextMessage msg = session.createTextMessage("test message!!");
            publisher.publish(msg);
            
            publisher.close();
            session.close();
            connection.close();
            
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
    
}

PTPのプログラムとほとんど違いはありません。 PTPと違う点は
ConnectionオブジェクトがTopicConnectionオブジェクトになっている。
Destinationオブジェクトがjavax.jms.Topicオブジェクトになっている。
MessagePuroducerがjavax.jms.Publisherオブジェクトになっている。
があります。

また、メッセージを送るメソッドもsend()メソッドがpubslish()メソッドになっています。

続いて受信側です。 複数の受信者を用意するためにRunnableなクラスを作成し、5つのSubscriberを用意します。

import javax.jms.JMSException;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;

import org.activemq.ActiveMQConnectionFactory;
import org.activemq.ActiveMQConnection;

public class Subscriber implements Runnable{
    String name;
    
    public Subscriber(String name) {
        this.name = name;
    }
    
    public void run() {
        ActiveMQConnectionFactory factory
             = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL);
        try {
            TopicConnection connection = factory.createTopicConnection();
            TopicSession session = connection.createTopicSession(false,
                                                     TopicSession.AUTO_ACKNOWLEDGE);
            
            connection.start();
            
            //Topicオブジェクトの作成
            Topic topic = session.createTopic("TestTopic");
            
            //Topicを購読するMessageConsumerの設定
            TopicSubscriber subscriber= session.createSubscriber(topic);
            
            //メッセージの受信
            TextMessage msg = (TextMessage)subscriber.receive();
            System.out.println("[" + this.name +"]" + msg.getText());
            
            subscriber.close();
            session.close();
            connection.close();
            
        } catch (JMSException e) {
            e.printStackTrace();
        }        
    }
    
    public static void main(String[] args){
        
        Thread[] thread = new Thread[5];
        
        for(int i=0; i<5; i++){
            thread[i] = new Thread(new Subscriber("thread-" + i));
        }
        
        for(int i=0; i<5; i++){
            thread[i].start();
        }
    }
}

受信側のMessageConsumerはjavax.jms.Subscriberクラスを使用します。 メッセージを受信するメソッドはSubscriber#receive()です。

コンストラクタでスレッドの名前を設定し、メッセージを受信出来れば名前のあとにメッセージを出力しています。

[thread-0]test message!!
[thread-1]test message!!
[thread-2]test message!!
[thread-3]test message!!
[thread-4]test message!!

のように出力されれば成功です。(順番は入れ替わる可能性があります)

特定のTopicに対して購読を申し込む必要があるため、受信側を先に起動する必要があります。

3.2.メッセージの受動的な受け取り

先ほどまでの例では、MessageConsumer#receive()メソッドを用いてJMSサーバーまで動的にメッセージを受け取りに行っていました。 JMSではリスナを用いて受動的にメッセージを受け取ることが出来ます。

先ほどのSubscriberにリスナを追加してみましょう。

import javax.jms.MessageListener;
public class Subscriber implements Runnable,MessageListener{
    public void onMessage(Message message) {
        TextMessage msg = (TextMessage)message;
        try {
            System.out.println("[" + this.name +"]" + msg.getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
    
    public void run(){
        try{
            ...
            topic = session.createTopic("TestTopic");
            subscriber= session.createSubscriber(topic);
            subscriber.setMessageListener(this);
            
            connection.start();
            ...
        }
    }
    
}

Connectionが閉じられないように、解放処理は別で行います。

MessageListnerインターフェイスはメッセージを受信した際に呼ばれるonMessage()メソッドのみを持つインターフェイスです。

MessageConsumer#setMessageListener(MessageListener listener)メソッドで追加することが出来ます。

↑このページの先頭へ

こちらもチェック!

PR
  • XMLDB.jp