5.メッセージ配信の確認2007.01.10 株式会社四次元データ 宮澤了祐
JMS 5章 メッセージ配信の確認
5.1.メッセージの配信モードメッセージが確実に送受信出来たかを確認するために、JMSでは「通知」と「トランザクション」の二つの方法を提供してくれます。 メッセージが送信される場合にJMSサーバはメッセージを受け取ったことをプロデューサーに知らせる機能が「通知」です。 MessageProducer#send()メソッドはこの通知があるまで待機しています。またメッセージが受信される場合、コンシューマはメッセージが受信完了したことをJMSサーバに通知します。 JMSではいくつかの配信モードが定義されており、セッションを作成する際に配信モードを設定します。 JMSで定義されている配信モードには次があります。
5.2.clientモードclientモードは通知を明示的に行う配信モードです。 clientモードを使用するためには、確認応答を実装する必要があります。 受信した際にjavax.jms.Message#acknowldge()メソッドを実行することで確認応答を明示的に行うことが出来ます。 これ以外のモードでacknowldge()メソッドを実行しても何も起こりません。 acknowldge()メソッドを実行して確認応答することで、メッセージ配信が完了したことになります。 以下は「last message!」というメッセージが送られて来たら確認応答を行うレシーバーのサンプルです。
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueReceiver;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import org.activemq.ActiveMQConnection;
import org.activemq.ActiveMQConnectionFactory;
public class ClientReceiver{
public static void main(String[] args) {
try{
ActiveMQConnectionFactory factory
= new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL);
QueueConnection connection = factory.createQueueConnection();
QueueSession session = connection.createQueueSession(false,
QueueSession.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue("Test");
QueueReceiver receiver = session.createReceiver(queue);
connection.start();
int i = 0;
while(true){
TextMessage msg = (TextMessage) receiver.receive(1000);
if(msg == null)break;
String text = msg.getText();
if(text.equals("last message")){
msg.acknowledge();
System.out.println(text);
}else{
System.out.print(text);
}
i++;
}
receiver.close();
session.close();
connection.close();
}catch(JMSException e){
e.printStackTrace();
}
}
}
Message#acknowledge()メソッドを呼び出した場合、受信したメッセージに対してではなくそれまでに受信した全てのメッセージに対して確認応答が行われたのと同じ意味をなします。 「last message」というテキストが送られてこなければレシーバーを再起動することで全てのメッセージが再送信されます。 5.3.トランザクションモードトランザクションとは、メッセージのプロデュースまたはコンシューム、あるいはその両方を 極小の単位にグループ化する方法です。 処理を確定するコミット、処理を復元するロールバックなどを行うことが出来ます。 トランザクションモードではコンシューマ/プロデューサーがコミットを行えば、JMSサーバに対してメッセージ配信が完了したことを通知します。 コンシューマがロールバックを行えば受信されたことをキャンセルし、再配信を要求することになります。プロデューサーがロールバックを行えばメッセージ送信をキャンセルします。 トランザクションモードを解説するために、複数のメッセージに対して三通目までは受信するがそれ以降のメッセージには再配信を要求するレシーバーを作成します。
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueReceiver;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import org.activemq.ActiveMQConnection;
import org.activemq.ActiveMQConnectionFactory;
public class TransactionReceiver{
public static void main(String[] args) {
try{
ActiveMQConnectionFactory factory
= new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL);
QueueConnection connection = factory.createQueueConnection();
//トランザクションモードに設定したセッションを作成。
QueueSession session = connection.createQueueSession(true,-1);
Queue queue = session.createQueue("Test");
QueueReceiver receiver = session.createReceiver(queue);
connection.start();
int i = 0;
while(true){
TextMessage msg = (TextMessage) receiver.receive(1000);
if( i < 3){
session.commit();
}else{
session.rollback();
}
if(msg == null)break;
System.out.print(msg.getText());
i++;
}
receiver.close();
session.close();
connection.close();
}catch(JMSException e){
e.printStackTrace();
}
}
}
送信側は次のように数字をTextMessageとして送信します。
...
for(int i=0; i<10; i++){
TextMessage msg = session.createTextMessage(i + " ");
sender.send(msg);
}
...
もし自動確認モードであれば、 0 1 2 3 4 5 6 7 8 9 と出力されて終わりますがこのプログラムでは次のように出力されます。 0 1 2 3 4 5 6 7 8 9 3 4 5 6 7 8 9 3 4 5 6 7 8 ... 「3」以降の数が再配信されていることがわかります。 この再配信される回数などはJMSサーバーによって異なります。 |
![]()
![]()
|