The King's Museum

ソフトウェアエンジニアのブログ。

自分を使い切る

元陸上選手の為末さんがこんなことを書いていた。

http://tamesue.jp/blog/archives/think/20160924

幸福というのは自分はどこまでいったかというよりも、自分という存在をどこまで使い切れたか、能力を引き出し切れたかによるのではないかと思うようになった。

もうすぐ30歳(自分が)。

20歳の頃、30歳の自分はもっと成功していて、もっと遠くにいると思ったのだけれど、そんなことはなかった。

最近、

「20歳の自分に『なんだ、まだこんなところにいるのか』と言われてしまうんだろうなぁ」

と、よく思う。

自分がどこまでいけたかではなく、自分を使い切れたかどうかが幸福を左右する

言われてみればたしかにその通りかもしれない。人生において自分を使い切れた瞬間には輝きがあったように思える。

自分はどこまできたのか

この思考から離れることができれば、今の生活にもう少し明かりが見えるのかもしれない(別に暗いわけじゃないのだが)。

なんだかポエム感がすごいが、たまにはこういう記事も悪くない。

【Effective Java】項目69:wait と notify よりコンカレンシーユティリティを選ぶ(前半)

Java のスレッド操作 API として Object.wait() と Object.notify() があります。 しかし、Java 1.5 からはより高いレベルのユティリティが導入されたので wait() と notify() は使うべきではありません。

追加されたのは java.util.concurrent ユティリティです。 concurrent ユティリティは次の3つに分類されます。

エグゼキューターフレームワークについてはすでに項目68で簡単に説明しました。

今回はコンカレントコレクションとシンクロナイザーについて説明します。 また、wait() と notify() を正しく使う方法についても説明します。

コンカレントコレクション

コンカレントコレクションは、List や Map などのコレクションの高パフォーマンスな実装です。 高度な並行性が実装されていて、すべて内部同期されているクラスです。

内部的に同期されているため、これらのクラスから並行性を除去することはできません。 また、外部から追加でロックをすることに意味はありません。 単にプログラムを遅くするだけです。

ConcurrentHashMap

コンカレントコレクションの複数のメソッド呼び出しを外部同期によってアトミックにすることはできません。 代わりに一部のコレクションインタフェースは、アトミックな単一メソッドを提供しています。

たとえば、ConcurrentMap には「指定されたキーをチェックする操作」と「指定されたキーに値を紐付ける操作」をアトミックに行う putIfAbsent() メソッドがあります。

Java 1.5 以前には同期化されたマップの実装として SynchronizedMap がありました。 Java 1.5 以降は、基本的に SynchronizedMap ではなく ConcurrentHashMap を利用するべきです。 ConcurrentHashMap の高度な並行性実装によって、単に SynchronizedMap を ConcurrentHashMap に置き換えるだけで劇的にパフォーマンスが改善する可能性があります。

より一般的には、外部同期されたクラスよりも内部同期された適切なクラスを使うべきです。

ブロック操作

コンカレンシーユティリティのいくつかのインタフェースは操作が完了するまでスレッドを待機させるブロック操作を持っています。

たとえば、BlockingQueue は Queue を拡張しており、take メソッドが追加されています。 このメソッドは「キューの先頭から値を取り出す。ただし、空であれば待機する」という処理を行います。

このクラスはスレッドプールのワークキュー実装に最適です。 実際、エグゼキューターフレームワークのほとんどがこの BlockingQueue を利用しています。

シンクロナイザー

シンクロナイザーは「他のスレッドの完了を待つ」といった処理を可能にするクラスです。 より一般的にはスレッド間の活動を協調するために利用します。

代表的なシンクロナイザーは CountDonwLatch と Semaphore です。 Java 1.5 より前では wait() と notify() を使ってスレッド間の協調を行っていました。 現代においてそれは正しい選択肢ではありません。

CountDownLatch は複数のスレッドの処理が終わることを待ち合わせるために利用できます。 例えばいくつかのタスクの完了を待ち合わせる処理を CountDownLatch を使って実装すると次のようになります。

public static void run(Executor executor, int taskNum) throws InterruptedException {
    // タスク数と同一のカウントを持つラッチを作成
    final CountDownLatch latch = new CountDownLatch(taskNum);
    for (int i = 0; i < taskNum; i++) {
        final int n = i;
        executor.execute(new Runnable() {
            @Override public void run() {
                System.out.println("task#"+ n);
                // ラッチを一つさげる
                latch.countDown();
            }
        });
    }
    // すべてのタスクが終了するのを待つ
    latch.await();
}

実はこの処理を正しく wait() と notify() で実装するのはかなり難しいです。

次回は wait() と notify() の正しい利用方法について説明します。

【Effective Java】項目68:スレッドよりエグゼキューターとタスクを選ぶ

Java 1.5 から、エグゼキューターフレームワークが利用できるようになりました。 このフレームワークを使うと、スレッドで実行するためのワークキューを簡単に作成することができます。

// スレッドで実行するためのエグゼキューターサービスの生成
ExecutorService executorService = Executors.newSingleThreadExecutor();

他のスレッドで処理を実行するには次のように書きます。

// Runnable の実行
executorService.submit(new Runnable() {
        @Override public void run() {
            // 他のスレッドで実行される
            System.out.print("run");
        }
   });

実行中の処理が終わるのを待ち、エグゼキューターを終わらせるコードは次のようになります。

executorService.shutdown();

このようにエグゼキューターフレームワークを使うことで、スレッドで実行する処理をかなり簡単に書くことができるようになりました。

柔軟性

エグゼキューターサービスは従来のスレッドクラスにはなかった次のような機能を持っています。

  • 特定のタスクの完了を待つ
  • タスクの集まりの一部/すべての終了を待つ
  • タスクが完了するごとに結果を取り出す

スレッドプール

キューを実行するスレッドを2本以上持つ『スレッドプール』も簡単に利用可能です。

小さく負荷の低い処理を行う場合には、Executors.newCachedThreadPool() を使うとよいでしょう。

このスレッドプール実装ではタスクの実行が要求されると、新しいスレッドを生成します。 もし、タスクを処理していないスレッドがあれば、そのスレッドは再利用されるます。

ただし、一度にたくさんのタスクが投入されるとその数だけスレッドを生成してしまうため、リソースが枯渇する可能性があります。 一度に大量のタスクが投入されることが想定されるのならば、最大スレッド数が制限された Executors.newFixedThreadPool() を使うべきです。

Thread の利用は避ける

エグゼキューターフレームワークによって自分でワークキューを書く必要性はなくなりました。 それに加えて、直接 Thread クラスを利用する必要もほとんどありません。

従来の Thread クラスのインタフェースには「機能」と「機構」が混在しています。 エグゼキューターフレームワークでは「機能」と「機構」は分離されています。

機能に対応するクラスは「タスク」と呼ばれています。 具体的には Runnable インタフェースと Callable インタフェースが対応します。 Runnable は値を返しませんが、Callable は値を返すことができます。

一方、機構に対応するのがエグゼキューターです。 機構はタスクである Runnable や Callable を実行するためのフレームワークです。

ScheduledThreadPool

エグゼキューターフレームワークは従来の Timer クラスの代わりにもなります。 従来の Timer は実行スレッドが一つしかないため、次のような欠点がありました。

  • 長い時間動作するタスクがある場合にタイマーの精度が悪くなる
  • 唯一のスレッドが例外を投げると Timer 全体が動作しなくなる。

エグゼキューターフレームワークの ScheduledThreadPoolExecutor はこれらの問題を解決しています。

ScheduleThreadPoolExecutor は複数スレッドをサポートしています。 また、タスクからチェックされない例外がスローされた場合にも、実行スレッドは正しく回復します。

【Effective Java】項目67:過剰な同期は避ける

前回の記事では、同期が不十分な場合にはマルチスレッド環境において予期しないエラーが発生する可能性を説明しました。

hjm333.hatenablog.com

hjm333.hatenablog.com

今回は前回とは逆の視点です。 過剰に同期することによって発生する問題について取り上げます。

過剰な同期による問題

Java において過剰に同期を行うと次の問題が発生する可能性があります。

まずはデッドロックを引き起こす可能性のあるコードについて説明します。

オープンコール

synchronized で同期しているメソッドやブロック内で、制御を他のコードに譲ってはいけません。

例として、リスナーを介して追加の通知が取得できるセットの実装(ObservableSet)を取り上げます。 この実装は項目16で利用した ForwardingSet を利用しています。

/**
 * クライアントが要素を追加した時に、通知を受け取ることが可能。
 * Observer パターンを利用。
 */
public class ObservableSet<E> extends ForwardingSet<E> {
    interface SetObserver<E> {
        void added(ObservableSet set, E element);
    }

    public ObservableSet(Set<E> set) {
        super(set);
    }

    private final List<SetObserver<E>> observers = new ArrayList<>();

    public void addObserver(SetObserver<E> observer) {
        synchronized (observers) {
            observers.add(observer);
        }
    }

    public boolean removeObserver(SetObserver<E> observer) {
        synchronized (observers) {
            return observers.remove(observer);
        }
    }

    private void notifyElementAdded(E element) {
        synchronized (observers) {
            for (SetObserver<E> observer: observers) {
                observer.added(this, element);
            }
        }
    }

    @Override
    public boolean add(E element) {
        boolean added = super.add(element);
        if (added) {
            notifyElementAdded(element);
        }
        return added;
    }
}

この Set は任意のリスナーを登録できます。 要素が追加された際は、そのリスナーを通じて通知を受け取ることができます。

問題は通知を発生させる notifyElementAdded() です。 このメソッドは同期化されており、SetObserver.added() を同期ブロック内で呼び出しています。

SetObserver.added() の実装について ObservableSet は何も知りませんので、どんな処理が行われるか保証できません。 すごく時間のかかる処理をするかもしれないですし、例外を発生させる処理かもしれません。

もし、次のようなコードが書かれているとしたらプログラムはデッドロックしてしまいます。

set.addObserver(new SetObserver<Integer>() {
    @Override
    public void added(final ObservableSet set, Integer element) {
        if (element == 23) {
            ExecutorService executor =
                    Executors.newSingleThreadExecutor();
            final SetObserver<Integer> observer = this;
            // 要素の削除をバックグラウンドスレッドで実行
            try {
                executor.submit(new Runnable() {
                    @Override
                    public void run() {
                        set.removeObserver(observer);
                    }
                });
            } finally {
                executor.shutdown();
            }
        }
    }
});

このデッドロックは SetObserver.added() を同期ブロック内で呼び出しているために発生します。

同期ブロック内で呼び出しなので SetObserver.added() の呼び出しスレッドはロックを取得している状態です。 そのうえで、他のスレッドで ObservableSet.removeObserver() を呼び出そうとしています。

removeObserver() は同期化されているため、SetObserver.added() を呼び出したスレッドが持っているロックを待つ必要があります。 しかし、removeObserver() が終わらない限り SetObserver.added() の処理は終わらないため、ロックは永遠に解放されません。 すなわち、removeObserver() を呼び出したスレッドはロックの取得を永遠に待ち続けるのです。

これがデッドロックの原因です。 このように同期ブロック内で異質なメソッドを呼び出すと、意図しないデッドロックを発生させる可能性があります。

最小限の同期

このような問題を防ぐためのプラクティスを紹介します。 それは「同期された領域内で行う処理は最小限にするべき」というものです。 同期ブロックに入ったら必要な共有データのみを調べ、そして同期ブロックを抜けるのが理想です。

先ほどの ObservableSet では同期ブロック内で SetObserver.added() を呼び出していることが問題でした。 そこで次のようにコードを変更します。

private void notifyElementAdded(E element) {
    List<SetObserver<E>> snapshot = null;
    synchronized (observers) {
        // 同期ブロック内ではその時点で observer のコピーを保存
        snapshot = new ArrayList<SetObserver<E>>(observers);
    }
    for (SetObserver<E> observer: snapshot) {
         observer.added(this, element);
    }
}

snapshot にコピーを保存して、異質なメソッド(SetObserver.added())を同期ブロックで呼ばないように変更しました。 こうすればオープンコールは同期化ブロック外で呼び出されますので、デッドロックを回避できます。

Java ライブラリには CopyOnWriteArrayList と呼ばれるスレッドセーフなリストが実装されています。 これを使っても問題ありません。

パフォーマンス

現在の JVM はロックの取得のためには CPU 時間をあまり消費しません。 その代わり、メモリの一貫性を保つために遅延が発生します。 また、同期化によって JVM 上の最適化が除去されてしまいます。

そのため、過剰な同期はパフォーマンスを低下させます。 パフォーマンス上の視点でも過剰な同期は避けるべきです。

外部同期・内部同期

クラスがマルチスレッド環境で利用されるのならば、クラスの内部で同期を行ってクラス自体をスレッドセーフにするべきです。

クラスを内部的に同期するようにすると、次のようなテクニックを使って高い平行性が実現できます。

しかし、内部同期を使った結果として高いパフォーマンスが得られないのであれば、内部同期しないほうがましです。 クラスでは同期を行わずにそのクラスを利用する外部のクライアント側で同期を行うべきです。

Java の初期の頃は内部同期を使っているクラスが多くありました。 たとえば StringBuffer はその代表例です。

しかし、StringBuffer にスレッドセーフ性が必要な場合はほとんどありません。 そこで Java 1.5 からは内部同期を行わない StringBuilder が導入されました。 「必要であれば StringBulder を使うクライアントが外部から同期を行うべき」という考えです。

このように、必要な場合のみ内部的にスレッドセーフを実現するべきです。

Java の同期化とは何か(2)(【EffectiveJava】項目66:共有された可変データへのアクセスを同期する-後半-)

前回の記事では Java の同期化(synchronized)について説明してきました。

hjm333.hatenablog.com

今回の記事では「volatile」と「同期化が必要ない場合」について説明します。

volatile

Java の同期化(synchronized)は次の性質を保証するものでした。

  • アトミック性
  • メモリ可視性

Java にはメモリの可視性だけを保証するための仕組みがあります。 それが volatile です。

volatile キーワードが付けられた変数はメモリの可視性が保証されます。 volatile では処理をアトミックにすることはできませんが、各スレッド間でフラグを共有する際に便利です。

例えば、前回、例に挙げたスレッドを停止させるコードでは synchronized を使いました。

public class StopThread {
    private static boolean stop;
    private static synchronized void requestStop() {
        stop = true;
    }
    private static  synchronized boolean isStop() {
        return stop;
    }
    public static void main(String[] args) throws InterruptedException {
        Thread background = new Thread(new Runnable() {
            @Override public void run() {
                int i = 0;
                while (isStop()) {
                    i++;
                }
            }
        });
        background.start();

        TimeUnit.SECONDS.sleep(1);
        requestStop();
    }
}public class StopThread {
    private static boolean stop;
    private static synchronized void requestStop() {
        stop = true;
    }
    private static  synchronized boolean isStop() {
        return stop;
    }
    public static void main(String[] args) throws InterruptedException {
        Thread background = new Thread(new Runnable() {
            @Override public void run() {
                int i = 0;
                while (isStop()) {
                    i++;
                }
            }
        });
        background.start();

        TimeUnit.SECONDS.sleep(1);
        requestStop();
    }
}public class StopThread {
    private static boolean stop;
    private static synchronized void requestStop() {
        stop = true;
    }
    private static  synchronized boolean isStop() {
        return stop;
    }
    public static void main(String[] args) throws InterruptedException {
        Thread background = new Thread(new Runnable() {
            @Override public void run() {
                int i = 0;
                while (isStop()) {
                    i++;
                }
            }
        });
        background.start();

        TimeUnit.SECONDS.sleep(1);
        requestStop();
    }
}

しかし、この例では処理のアトミック性は必要ありません。 boolean の読み書き自体がアトミックになっていますので単にメモリの可視性を保証すれば問題ありません。 そのため volatile を使うことができます。

volatile を使って書き換えたのが次のコードです。

public class StopThread {
    private static volatile boolean stop;

    public static void main(String[] args) throws InterruptedException {
        Thread background = new Thread(new Runnable() {
            @Override public void run() {
                int i = 0;
                while (stop) {
                    i++;
                }
            }
        });
        background.start();

        TimeUnit.SECONDS.sleep(1);
        stop = true;
    }
}

このコードは synchroznied を使ったときと同じように正しく動作します。

インクリメントとデクリメント

volatile はメモリの可視性を保証しますが、アトミック性は保証しません。 そのため「アトミックのように見えて実はアトミックではない」という操作に対して volatile を使わないように注意が必要です。

インクリメントとデクリメントは「アトミックに見えるが実はアトミックでない操作」の代表です。 インクリメントは実際には「値の読み出し」、「値の加算」、「値の書き込み」という複数の操作で構成される処理です。 インクリメント・デクリメントに関わる操作を volatile で実装するのは間違いです。

次のコードは複数のスレッドから一意の番号を生成するためのメソッドを実装したものです。

private static volatile int serialNumber = 0;

public static int generateSerialNumber() {
    return serialNumber++;
}

serialNumber のインクリメントは volatile によってメモリの可視性が保証されています。 ただし、アトミックでないため複数のスレッドから呼び出された際に、それぞれのスレッドで同じ値を生成してしまう可能性があります。

正しくは次のように syncronized を使い、メモリの可視性に加えてアトミック性も保証する必要があります。

private static int serialNumber = 0;

public static synchronized int generateSerialNumber() {
    return serialNumber++;
}

同期を回避する

ここまで同期について説明してきましたが、正しく同期を実装すると複雑になるケースが多いです。 一方、同期がなければコードの複雑度は下がります。

ここでは同期を回避するためのテクニックを紹介します。

不変クラスの利用

不変クラスを利用すれば同期は必要はありません。 不変クラスはインスタンスが生成されてから状態が変わることがないからです。

状態を変える処理が存在しないため、アトミック性について気にすることはありません。 加えて、状態が変わらないので常にキャッシュの値を参照したとしても問題ありません。 すなわち、メモリの可視性について考えなくてよいのです。

可変データをスレッドに閉じ込める

たとえ可変なデータであっても、単一のスレッドからしかアクセスできないようにすれば同期は不要です。

単一スレッドからしか読み書きしませんので、アトミック性もメモリの可視性も考える必要はありません。

スレッドセーフクラス

スレッドセーフ性が保証されているクラスを使えば同期は必要ありません。

同期はそのクラス自体が適切に行ってくれます。 また、同期を使わずにスレッドセーフ性を実現するクラスもあります。

たとえばさきほどの serialNumber の例では java.concurrent の AtomicLong というクラスが利用できます。

private static final AtomicLong serialNumber = new AtomicLong();

public static long generateSerialNumber() {
    // getAndIncrement メソッドは値の読取りとインクリメントをスレッドセーフで行うメソッド
    return serialNumber.getAndIncrement();
}

java.concurrent ライブラリには高度に最適化されたスレッドセーフなクラスが多数あります。 自分でスレッドセーフを実装する前にこれらのライブラリの利用を検討するべきです。

その他

その他のテクニックとして次のような手法がありますが、ここでは説明を省きます。

  • オブジェクトを事実上不変にする
  • オブジェクトを安全に公開する

これらのテクニックに関しては次の書籍に詳しく記載されています。

Java並行処理プログラミング ―その「基盤」と「最新API」を究める―

Java並行処理プログラミング ―その「基盤」と「最新API」を究める―

感想

ついに concurrency の章か。。。

concurrency についてはもう一度『Java 並行処理プログラミング』を読み直そうかと思ったり、思わなかったり。。。

Java の同期化とは何か(1)(【EffectiveJava】項目66:共有された可変データへのアクセスを同期する-前半-)

複数のスレッドで可変データを共有する場合にはそれぞれのスレッドを同期化(synchronized)する必要があります。

Java の同期化は次の二つの性質を保証します。

  • アトミック性(処理の排他制御
  • メモリの可視性

アトミック性

異なるスレッドが共通のデータにアクセスする場合、処理途中の不整合な状態を外部に見せたくない場合があります。 そのためには、処理をアトミックにして同時に一つのスレッドだけが処理することを保証する必要があります。

Java では、処理をアトミックにしたい場合は synchronized キーワードを利用します。 synchronized キーワードを使うとメソッドや処理をアトミックにすることができます。

次の atomic メソッドは常に1つのスレッドでのみ処理されることが保証されます。

public synchronized void atomic() {
    ...(アトミックな処理)...
}

もし、あるスレッド A で atomic() を呼び出していた場合、他のスレッド B で atomic() を呼び出すとスレッド A の atomic() の処理が終わるまでスレッド B は待機します。

変数の読み書き処理はアトミックであることが Java の仕様によって保証されています。 そのため、単純な変数の読み書きに synchronized を使う必要はありません。

// 変数の書きこみはアトミック
int value = 1;

ただし、long と double についてはアトミック性は保証されていないので注意が必要です。

// 次の命令はアトミックではない。二つのバイト命令に変換される可能性がある。
long value = 1L;

メモリの可視性

スレッドが複数ある場合、それぞれのスレッドは独自のキャッシュメモリを持っている可能性があります。

あるスレッド Aで書き込んだ値はスレッド A のキャッシュメモリにのみ反映されていてメインメモリに反映されていません。 この状態では他のスレッド B から値を読み込もうとしても、メインメモリに反映されていないため正しい値を読み込めません。

このように、メモリ上の正しい値が観測できるかどうかの性質を「メモリの可視性」と呼んでいます。

例えば、メインスレッドから他のスレッド A を停止させるコードを考えてみます。 この場合、単に次のようなコードを書くかもしれません。

public class StopThread {
    // boolean は操作がアトミックだから synchronized を利用しない(大きな間違い!)
    private static boolean stop;

    public static void main(String[] args) throws InterruptedException {
        // スレッドの生成
        Thread threadA = new Thread(new Runnable() {
            @Override public void run() {
                int i = 0;
                while (!stop) {
                    i++;
                }
            }
        });
        // スレッド A の開始
        threadA.start();

        TimeUnit.SECONDS.sleep(1);
        // スレッド A を停止!
        stop = true;
    }
}

実際、このプログラムを動作させると永遠に threadA が終わらないことが判明します。

これはまさにメモリの可視性の問題です。 stop = true; という書込みはメインスレッドで行われていますが、この書込み結果が threadA からは見えていないのです。

threadA が書き込んだ値を見られるようにするには、同期化が必要です。 Java の同期化はアトミック性に加えてメモリの可視性も保証します。

同期化を使って次のようにコードを変更すると、先ほどのプログラムは正しく動作します。 同期化によってメモリの可視性が保証されるからです。

public class StopThread {
    private static boolean stop;
    private static synchronized void requestStop() {
        stop = true;
    }
    private static  synchronized boolean isStop() {
        return stop;
    }
    public static void main(String[] args) throws InterruptedException {
        Thread background = new Thread(new Runnable() {
            @Override public void run() {
                int i = 0;
                while (isStop()) {
                    i++;
                }
            }
        });
        background.start();

        TimeUnit.SECONDS.sleep(1);
        requestStop();
    }
}

同期化は次の動作を保証します。(『Javaの理論と実践: Javaメモリ・モデルを修正する 第2回』を読んで - The King's Museum)

  • synchronized ブロックに入る時、すべてメインメモリを見るようになる
  • synchronized ブロックを出る時、すべてメインメモリに書き込む

この事実から分かるように、書き込み側にだけ synchronized を指定しても意味がありません。

書き込み側の synchronized によってメインメモリに値が反映されることは保証されます。 一方、読み込み側に synchronized が指定されていないと自分のキャッシュの値を見てしまい、正しくない値を見てしまう可能性があります。

次回

まとめきれなかったので volatile とかは次回に…。

【Effective Java】項目65:例外を無視しない

空の catch ブロックを使って例外を無視してはいけません。

// 空の catch ブロック
try {
  ...
} catch (Exception e) {
}

例外を無視するとエラーがあるという事実が隠蔽されてしまいます。 それによって、他の場所で別の形でエラーが再現する可能性もあります。

最低でも例外を無視する意図を書くべきです。 もし、可能であれば例外をログなどに記録するべきです。

例えば FileInputStream をクローズする時、すでにデータの読取りに成功していれば例外は無視してもかまわないでしょう。

FileInputStream stream;
... 読み出し処理 ...
try {
    stream.close();
} catch (Exception e) {
    // データの読み取りは成功しているので例外は無視し、ログだけ記録する
    Log.warn("stream.close() is failed: ", e);
}

このアドバイスはチェックされる例外にもチェックされない例外にもあてはまります。

チェックされる例外の場合は、一般的に回復可能な処理があるはずです。 そのため例外を無視せずに適切な処理を行うべきです。

チェックされない例外は一般的にはプログラミングエラーを示しています。 空の catch ブロックで例外を無視するのではなく、例外を外側に伝播させてシステム全体のエラーとしてしまったほうが適切です。

(c) The King's Museum