3.JMSの基礎3
2006.11.17 株式会社四次元データ 宮澤了祐
- 3.1. Publisher-Subscriberメッセージモデル
- 3.2. メッセージの受動的な受け取り
3.1.Publisher-Subscriberメッセージモデル
ここでは、一対多でメッセージングを行うPublisher-Subscriberモデルについて解説します。
Publisher-Subscriberモデルでは、まず送信者はJMSサーバ内部に作成されたトピック(Destinationオブジェクト)に対してメッセージを発行します。 トピックとは話題を意味する単語で、その名の通りメッセージを分類するオブジェクトです。 受信者側は予め、JMSサーバに対してそのトピックに送られたメッセージを購読(Subscribe)することを伝えておきます。 メッセージが発行されれば、購読を申し込んでいる全ての受信者にメッセージが送られます。全ての受信者がメッセージを受け取った際にはじめてメッセージはサーバから削除されます。
サンプルプログラムを用いて解説していきます。まずは送信側です。
import javax.jms.JMSException;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import org.activemq.ActiveMQConnectionFactory;
public class Publisher {
public static void main(String[] args){
TopicConnectionFactory factory
= new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL);
try {
//Publisher-Subscriberモデル用のコネクションの作成
TopicConnection connection = factory.createTopicConnection();
TopicSession session = connection.createTopicSession(false,
TopicSession.AUTO_ACKNOWLEDGE);
connection.start();
//Topicオブジェクトの作成
Topic topic = session.createTopic("TestTopic");
//Topicに対してメッセージを送るMessageProducerの作成
TopicPublisher publisher = session.createPublisher(topic);
//メッセージの送信
TextMessage msg = session.createTextMessage("test message!!");
publisher.publish(msg);
publisher.close();
session.close();
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
PTPのプログラムとほとんど違いはありません。
PTPと違う点は
ConnectionオブジェクトがTopicConnectionオブジェクトになっている。
Destinationオブジェクトがjavax.jms.Topicオブジェクトになっている。
MessagePuroducerがjavax.jms.Publisherオブジェクトになっている。
があります。
また、メッセージを送るメソッドもsend()メソッドがpubslish()メソッドになっています。
続いて受信側です。 複数の受信者を用意するためにRunnableなクラスを作成し、5つのSubscriberを用意します。
import javax.jms.JMSException;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import org.activemq.ActiveMQConnectionFactory;
import org.activemq.ActiveMQConnection;
public class Subscriber implements Runnable{
String name;
public Subscriber(String name) {
this.name = name;
}
public void run() {
ActiveMQConnectionFactory factory
= new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL);
try {
TopicConnection connection = factory.createTopicConnection();
TopicSession session = connection.createTopicSession(false,
TopicSession.AUTO_ACKNOWLEDGE);
connection.start();
//Topicオブジェクトの作成
Topic topic = session.createTopic("TestTopic");
//Topicを購読するMessageConsumerの設定
TopicSubscriber subscriber= session.createSubscriber(topic);
//メッセージの受信
TextMessage msg = (TextMessage)subscriber.receive();
System.out.println("[" + this.name +"]" + msg.getText());
subscriber.close();
session.close();
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
public static void main(String[] args){
Thread[] thread = new Thread[5];
for(int i=0; i<5; i++){
thread[i] = new Thread(new Subscriber("thread-" + i));
}
for(int i=0; i<5; i++){
thread[i].start();
}
}
}
受信側のMessageConsumerはjavax.jms.Subscriberクラスを使用します。 メッセージを受信するメソッドはSubscriber#receive()です。
コンストラクタでスレッドの名前を設定し、メッセージを受信出来れば名前のあとにメッセージを出力しています。
[thread-0]test message!! [thread-1]test message!! [thread-2]test message!! [thread-3]test message!! [thread-4]test message!!
のように出力されれば成功です。(順番は入れ替わる可能性があります)
特定のTopicに対して購読を申し込む必要があるため、受信側を先に起動する必要があります。
3.2.メッセージの受動的な受け取り
先ほどまでの例では、MessageConsumer#receive()メソッドを用いてJMSサーバーまで動的にメッセージを受け取りに行っていました。 JMSではリスナを用いて受動的にメッセージを受け取ることが出来ます。
先ほどのSubscriberにリスナを追加してみましょう。
import javax.jms.MessageListener;
public class Subscriber implements Runnable,MessageListener{
public void onMessage(Message message) {
TextMessage msg = (TextMessage)message;
try {
System.out.println("[" + this.name +"]" + msg.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
public void run(){
try{
...
topic = session.createTopic("TestTopic");
subscriber= session.createSubscriber(topic);
subscriber.setMessageListener(this);
connection.start();
...
}
}
}
Connectionが閉じられないように、解放処理は別で行います。
MessageListnerインターフェイスはメッセージを受信した際に呼ばれるonMessage()メソッドのみを持つインターフェイスです。
MessageConsumer#setMessageListener(MessageListener listener)メソッドで追加することが出来ます。

