目次へ

5.メッセージ配信の確認

2007.01.10 株式会社四次元データ 宮澤了祐

5.1.メッセージの配信モード

メッセージが確実に送受信出来たかを確認するために、JMSでは「通知」と「トランザクション」の二つの方法を提供してくれます。

メッセージが送信される場合にJMSサーバはメッセージを受け取ったことをプロデューサーに知らせる機能が「通知」です。 MessageProducer#send()メソッドはこの通知があるまで待機しています。またメッセージが受信される場合、コンシューマはメッセージが受信完了したことをJMSサーバに通知します。

JMSではいくつかの配信モードが定義されており、セッションを作成する際に配信モードを設定します。 JMSで定義されている配信モードには次があります。

  • autoモード
    メッセージの配信の通知をJMSサーバーが自動的に処理してくれます。一回だけの配信を保証します。
    createSession(false,Session.AUTO_ACKNOWLEDGE)で設定します。
  • clientモード
    確認応答をJMSサーバーではなくJMSクライアントが行うモードです。
    createSession(false,Session.CLIENT_ACKNOWLEDGE)で設定します。
  • duplicates okeyモード
    自動的に確認応答を行ってくれます。数通受け取った場合通知を行うため、autoモードと違い同じメッセージが何通も配信されることがあります。必ず一回だけを保証するautoモードと比べるとその点を保証しない分処理量が少ないというメリットがあります。
    createSession(false,Session.DUPS_OK_ACKNOWLEDGE)で設定します。
  • トランザクションモード
    createSession(true,-1)で設定します。 第二引数は使用しないため、ダミーの値を入力しています。Session#getAcknowledgeMode()の返り値は「Session.SESSION_TRANSACTED」として与えられます。

5.2.clientモード

clientモードは通知を明示的に行う配信モードです。

clientモードを使用するためには、確認応答を実装する必要があります。 受信した際にjavax.jms.Message#acknowldge()メソッドを実行することで確認応答を明示的に行うことが出来ます。

これ以外のモードでacknowldge()メソッドを実行しても何も起こりません。

acknowldge()メソッドを実行して確認応答することで、メッセージ配信が完了したことになります。

以下は「last message!」というメッセージが送られて来たら確認応答を行うレシーバーのサンプルです。

import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueReceiver;
import javax.jms.QueueSession;
import javax.jms.TextMessage;

import org.activemq.ActiveMQConnection;
import org.activemq.ActiveMQConnectionFactory;

public class ClientReceiver{
    public static void main(String[] args) {
        try{
            
            ActiveMQConnectionFactory factory 
                = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL);
            QueueConnection connection = factory.createQueueConnection();
            
            QueueSession session = connection.createQueueSession(false,
                                                     QueueSession.CLIENT_ACKNOWLEDGE);
            
            Queue queue = session.createQueue("Test");
            
            QueueReceiver receiver = session.createReceiver(queue);
            
            connection.start();
            
            int i = 0;
            while(true){
                TextMessage msg = (TextMessage) receiver.receive(1000);

                if(msg == null)break;
                
                String text = msg.getText();
                
                if(text.equals("last message")){
                    msg.acknowledge();
                    System.out.println(text);
                }else{
                    System.out.print(text);
                }

                i++;
            }
            
            receiver.close();
            session.close();
            connection.close();
        }catch(JMSException e){
            e.printStackTrace();
        }
    
    }
}

Message#acknowledge()メソッドを呼び出した場合、受信したメッセージに対してではなくそれまでに受信した全てのメッセージに対して確認応答が行われたのと同じ意味をなします。 「last message」というテキストが送られてこなければレシーバーを再起動することで全てのメッセージが再送信されます。

5.3.トランザクションモード

トランザクションとは、メッセージのプロデュースまたはコンシューム、あるいはその両方を 極小の単位にグループ化する方法です。 処理を確定するコミット、処理を復元するロールバックなどを行うことが出来ます。

トランザクションモードではコンシューマ/プロデューサーがコミットを行えば、JMSサーバに対してメッセージ配信が完了したことを通知します。 コンシューマがロールバックを行えば受信されたことをキャンセルし、再配信を要求することになります。プロデューサーがロールバックを行えばメッセージ送信をキャンセルします。

トランザクションモードを解説するために、複数のメッセージに対して三通目までは受信するがそれ以降のメッセージには再配信を要求するレシーバーを作成します。

import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueReceiver;
import javax.jms.QueueSession;
import javax.jms.TextMessage;

import org.activemq.ActiveMQConnection;
import org.activemq.ActiveMQConnectionFactory;

public class TransactionReceiver{

    public static void main(String[] args) {
        try{
            
            ActiveMQConnectionFactory factory 
                = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL);
            QueueConnection connection = factory.createQueueConnection();
            
            //トランザクションモードに設定したセッションを作成。
            QueueSession session = connection.createQueueSession(true,-1);
            
            Queue queue = session.createQueue("Test");
            
            QueueReceiver receiver = session.createReceiver(queue);
            
            connection.start();
            
            int i = 0;
            while(true){
                TextMessage msg = (TextMessage) receiver.receive(1000);
                if( i < 3){
                    session.commit();
                }else{
                    session.rollback();
                }
                
                if(msg == null)break;
                
                System.out.print(msg.getText());
                
                i++;
            }
            
            receiver.close();
            session.close();
            connection.close();
        }catch(JMSException e){
            e.printStackTrace();
        }
    
    }
}

送信側は次のように数字をTextMessageとして送信します。

...
for(int i=0; i<10; i++){
    TextMessage msg = session.createTextMessage(i + "  ");
    sender.send(msg);
}
...

もし自動確認モードであれば、

0  1  2  3  4  5  6  7  8  9

と出力されて終わりますがこのプログラムでは次のように出力されます。

0  1  2  3  4  5  6  7  8  9  3  4  5  6  7  8  9  3  4  5  6  7  8 ...

「3」以降の数が再配信されていることがわかります。

この再配信される回数などはJMSサーバーによって異なります。

↑このページの先頭へ

こちらもチェック!

PR
  • XMLDB.jp