目次へ

12.新たに追加されたコレクション2

2006.06.12 株式会社四次元データ 宮澤了祐

12.1. 同時コレクション

JDK5.0より、マルチスレッドでの使用を考慮に入れた、Collectionの新しい実装が追加されました。

マルチスレッドでのListやMapを使用する場合、 今まではCollections.synchronizedList()やCollections.synchronizedMap()メソッドなどを用いてスレッドセーフなコレクションを使用していました。 しかしこれらのメソッドで作成したコレクションは、ロックのために使用するモニターがひとつという構造であるため、 並行性に関して問題がありました。

また反復子を取得している最中にコレクションを更新する作業を行えば、java.util.ConcurrentModificationExceptionが発生していました。 これを防ぐため、synchronized修飾子を使いコレクション全体をロックするなどして、更新作業を行わせない対策を行う必要がありました。

{
    Map map = Collections.synchronizedMap(new HashMap());
    ...
    Thread thread = new Thread(new MyRunnable(map));
    thread.start();
    map.remove("value");
    ...
}

public void run() {
    synchronized(map) {
        Iterator itr = map.values().iterator();
        while(itr.hasNext()) {
            System.out.println(itr.next());
            try {
                Thread.sleep(1000);
            } catch(InterruptedException e) {
            }
        }
    }
}

コレクションより値を参照するたびに長時間ロックをかけることは、実行速度に多大な影響を与えます。

これらの問題の解決のため、従来のコレクションを改良した新しいコレクション実装及びインターフェイスが追加されました。

  • BlockingQueueインターフェイス
    Queueインターフェイスを拡張。
  • ConcurrentLinkedQueueクラス
    Queueインターフェイスのスレッドセーフな実装。
  • ConcurrentMapインターフェイス
    Mapインターフェイスの拡張。
  • CopyOnWriteArrayListクラス、CopyOnWriteArraySetクラス
    ListおよびSetのスレッドセーフな実装。

これらは全てjava.util.conccurentパッケージに所属しています。

12.2. BlockingQueue、ConcurrentLinkedQueue

Queueインターフェイスを拡張し、マルチスレッド下で使用するためにいくつかのメソッドが追加されました。

キューの容量が設定されており、要素の格納時に容量を超えるまで格納を待機することが出来ます。 また取り出し時にもキューが空でなくなるまで待機するなどが可能になりました。 次のメソッドが追加されています。

  • take()
    キューの先頭の要素を取得し、削除します。空の場合は待機します。
  • poll(long timeout, TimeUnit unit)
    キューの先頭を取得及び削除します。空の場合は指定した時間まで待機します。
  • put(E o)
    指定した要素をキューに追加します。キューが利用可能になるまで待機します。
  • offer(E o,long timeout, TimeUnit unit)
    指定した要素をキューの最後尾に追加します。指定した時間まで処理を待機します。
  • offer(E o)
    指定した要素をキューの最後尾に追加します。キューがいっぱいの場合は即座にfalseが帰ってきます。
  • remainingCapacity()
    このキューが受け入れられる要素の数を返します。

5つの異なった機能を持つスレッドセーフな実装が追加されました。

  • LinkedBlockingQueue
    リンクノードに基づくBlockingQueueです。容量を指定することも出来ますが、指定しなければ容量は自動的に拡張されます。
  • ArrayBlockingQueue
    固定要領の配列に基づくBlockingQueue
  • SynchronousQueue
    Blockingインターフェイスを利用した値の受け渡しを行うクラス。
  • PriorityBlockingQueue
    取り出すオブジェクトの優先順位を設定することの出来るBlockingQueue。コンストラクタでCompareオブジェクトを設定します。
  • DelayQueue
    追加されてから指定した時間後経過しなければ要素を取り出すことの出来ないBlockingQueue。

いくつかの特徴的な実装が追加されています。 SynchronousQueueは内部容量を持たず、値の追加を値の取得が行われるまでブロックし、 値の取得も値の追加が行われるまでブロックするという特徴を持ちます。 そのため二つのスレッドで値の受け渡しを同期化することが出来ます。

DelayQueue は java.util.conccurent.Delayed インターフェイスを実装したオブジェクト以外を追加することが出来ません。 Delayed インターフェイスは関連付けされた残り時間を取得する getDelay() メソッドを持つインターフェイスです。 DelayQueue が要素の取得や追加をブロックすることはありません。取り出せる要素は残り時間が一番少ないオブジェクトです。

またBlockingQueueではないスレッドセーフなQueue実装、ConcurentLinkedQueueクラスも実装されています。 容量制限を行わない場合などは、BlockingQueueの実装よりも、ConcurrentLinkedQueueを用いた方が高速に実行出来ます。

以下はSynchronousQueueを用いて二つのスレッドで値をやりとりする例です。

import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.BlockingQueue;

public class MySynchronousQueue implements Runnable {

    BlockingQueue queue;
    String name;

    public static void main(String[] args) {
        BlockingQueue queue = new SynchronousQueue();
        
        Thread thread1 = new Thread(new MySynchronousQueue("Thread1", queue));
        thread1.start();
                
        Thread thread2 = new Thread(new MySynchronousQueue("Thread2", "hoge", queue));
        thread2.start();
    }
    
    public MySynchronousQueue(String name, BlockingQueue queue) {
        this.queue = queue;
        this.name = name;
    }
    
    public MySynchronousQueue(String name, String str, BlockingQueue queue) {
        this.name = name;
        this.queue = queue;
        
        try {
            this.queue.put(str);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    
    public void run() {
        while(true) {
            try {                
                String str = (String)this.queue.take();
                System.out.println(name + ":" + str);
                this.queue.put(str);
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }        
    }
}

次のように出力されます。

Thread1:hoge
Thread2:hoge
Thread1:hoge
Thread2:hoge
Thread1:hoge
Thread2:hoge
Thread1:hoge
Thread2:hoge
...

Thread1とThread2で値の交換が出来ていることがわかります。

↑このページの先頭へ

こちらもチェック!

PR
  • XMLDB.jp