라이프로그


インタフェースCallableによる非同期並行処理 IT

戻り値を返せる非同期処理用メソッド

 従来のスレッド処理で利用していたインタフェースRunnableのメソッドrunは、戻り値を返さないメソッドである。したがって、スレッドで何らかの計算を行い、その結果を返すようなタスクをRunnableオブジェクトで処理するのは簡単ではない。

 そこで、並行ライブラリ(JSR 166で追加になったクラス/インタフェース群)では、新たにインタフェースCallableを用意し、戻り値を返すことが可能な非同期処理用のメソッド callを同インタフェースに宣言している。同インタフェースのソース・コードは、以下のようになっている。
interface Callable {
V call() throws Exception;
}

 Callableオブジェクトも、Runnableオブジェクト同様、イグゼキュタのタスクとして処理することができる。つまり、Callableオブジェクトも、メソッドsubmitの呼び出し元スレッドとは非同期にスレッド・プール内で実行される。

 上述したとおり、インタフェースCallableのメソッドcallは、インタフェースRunnableのメソッドrunとは異なり、戻り値を返す。メソッドcallが戻り値を得るためには、スレッド・プール内のスレッドで処理した結果をメソッドsubmitの呼び出し元スレッドに渡す仕組みが必要だ。そして、プログラマーがその仕組みを利用できるようにするために、並行ライブラリでは、インタフェースFutureが用意されている。

 インタフェースExecutorServiceのメソッドsubmitは、このFutureオブジェクトを戻り値として返す。例えば、Callableオブジェクトを引数にとるメソッドsubmitのシグネチャは以下のとおりだ。
Future subm it(Callable task)

 インタフェースFutureには、getというメソッドが宣言されており、これを利用すれば、Callableオブジェクトのメソッドcallの戻り値を取り出すことができる。具体的な手順としては、まず、Callableオブジェクトを引数としてメソッドsubmitを呼び出す。メソッドsubmitは、Futureオブジェクトを返すので、そのオブジェクトのメソッドgetを呼び出す。メソッドgetは、Callableのタスクが実行され、戻り値が計算されるまで待機する。そして、タスクの実行が終了し、計算結果が得られると、それを戻り値として返すといった具合だ。

 インタフェースFutureには、戻り値を取り出すメソッドget以外にも、メソッドsubmitの呼び出しが行われたタスクをキャンセルするためのメソッドcancelや、タスクが終了したかどうかを調べるためのメソッドisDoneなどが備わっている。

 ちなみに、Runnableオブジェクトに対してメソッドsubmitを呼び出した場合にも、Futureオブジェクトが戻り値として返される。Runnableオブジェクトを引数にとるメソッドsubmitには、以下の2つがある。
Future submit(Runnable task)
Future submit(Runnable task, T result)

 戻り値を返さないRunnableのタスクに対して、Futureオブジェクトを返すのは意味がないと思われるかもしれない。しかし、実際には、このようにすることで、メソッドgetによってタスクの終了を待つことができるし、メソッドcancelを呼び出せば、タスクをキャンセルすることもできるのだ。

 引数が1つのメソッドsubmitの場合、返されたFutureオブジェクトのメソッドgetを呼び出すと、タスクの終了後に戻り値として nullが返される。一方、引数が2つのメソッドsubmitの場合、メソッドgetを呼び出すと2つ目の引数で渡されたresultが戻り値として返される。

Callableオブジェクトの利用例

 Callableオブジェクトを使ったプログラムの例として、リスト1に示すクラスRandomWaitCallを作成した。
リスト1:Callableオブジェクトを使ってタスクを発生させるクラスRandomWaitCall


class RandomWaitCall implements Callable {
public Integer call() throws InterruptedException {
int r = (int)(Math.random() * 1000);
Util.println("sleep " + r);
Thread.sleep(r);
Util.println("return " + r);
return r;
}
}



 同クラスでは、乱数を発生させ、その値に応じた長さだけプログラムをスリープさせた後、最後に乱数の値を返す(以下、このプログラムの処理をRandomWaitCallタスクと呼ぶ)。

 以下に示したのは、クラスRandomWaitCallを利用したプログラムである。
import java.util.*;
import java.util.concurrent.*;

class Main3 {
public static void main(String[] args) throws Exception {
Util.println("main");
ExecutorService ex = Executors.newCachedThreadPool();
List> futures
= new ArrayList>();
for (int i = 0; i < 4; i++) {
Future f = ex.submit(new RandomWaitCall());
futures.add(f);
Util.println("submit");
}
ex.shutdown();
for (Future f : futures) {
int r = f.get();
Util.println("" + r);
}
}
}

 このプログラムでは、まず、 CachedThreadPoolオブジェクトを生成し、RandomWaitCallタスクを発生させ、メソッドsubmitが返したFutureオブジェクトをListオブジェクトに格納するという処理を4回繰り返している。そして、そのFutureオブジェクトに対してListオブジェクトの要素の数だけメソッドgetを呼び出し、メソッドsubmitの戻り値を取得している。

 このプログラムを実行すると、以下のように出力された。
00.000 main
00.067 submit
00.069 submit
00.070 submit
00.071 sleep 156
00.072 sleep 572
00.073 sleep 44
00.074 sleep 432
00.075 submit
00.103 return 44
00.216 return 156
00.217 156
00.492 return 432
00.632 return 572
00.633 572
00.635 44
00.643 432

 各Callableオブジェクトの戻り値が、mainスレッドで取り出されていることがわかる。

インタフェースCompletionService

 上のプログラムでは、メソッドsubmitを呼び出した順にFutureオブジェクトを格納し、格納順どおりにメソッドgetを呼び出している。そのため、いちばん最初に終了したのは3番目のスレッドであるにもかかわらず、その結果は、1、2番目のスレッドが終了するまで取り出せていない。しかし、実用的なプログラムでは、スレッドが終了した順に処理結果を取得したいことも多いだろう。

 それを簡単に実現するために、並行ライブラリには、インタフェースCompletionServiceとその実装クラスExecutorCompletionServiceが用意されている。

 これらのクラス/インタフェースの利用法は以下のようになる。
import java.util.concurrent.*;

class Main4 {
public static void main(String[] args) throws Exception {
Util.println("main");
ExecutorService ex = Executors.newCachedThreadPool();
CompletionService cs
= new ExecutorCompletionService(ex);
for (int i = 0; i < 4; i++) {
cs.submit(new RandomWaitCall());
Util.println("submit");
}
ex.shutdown();
for (int i = 0; i < 4; i++) {
Future f = cs.take();
int r = f.get();
Util.println("" + r);
}
}
}

 このプログラムでは、まず ExecutorServiceオブジェクトをコンストラクタの引数に指定して、ExecutorCompletionServiceオブジェクトを生成している。次に、Callableオブジェクトを引数に指定したメソッドsubmitをExecutorCompletionServiceオブジェクトから呼び出している。これにより、クラスCompletionServiceのメソッドtakeを呼び出し、終了したスレッドの順にFutureオブジェクトを取り出すことが可能になる。

 上のプログラムを実行すると、以下のように出力された。
00.000 main
00.068 submit
00.072 sleep 191
00.073 sleep 72
00.074 submit
00.074 submit
00.075 submit
00.077 sleep 914
00.078 sleep 638
00.132 return 72
00.134 72
00.251 return 191
00.252 191
00.720 return 638
00.721 638
00.975 return 914
00.976 914

 これを見ると、最初に処理を終えた2番目のスレッドの値が最初に取得できていることがわかる。その後、1、4、3番目の順にスレッドが終了し、その順序どおりに結果を取得できている。
繰り返し実行処理

 並行ライブラリには、タスクを定期的に繰り返し実行したり、指定した遅延時間が経過してから実行したりするイグゼキュタとして、インタフェース ScheduledExecutorServiceが用意されている。ScheduledExecutorServiceは、インタフェース ExecutorServiceのサブインタフェースである(図1)。

図1:インタフェースScheduledExecutorServiceの継承関係を表すクラス図

 表1に示したのは、インタフェースScheduledExecutorServiceのメソッドの概要である。
 
表1:クラスScheduledExecutorServiceのメソッド   
メソッド        説明       
ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) delay時間後にcallableを実行
ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) delay時間後にcommandを実行
ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) initialDelay時間後からperiod周期でcommandを繰り返し実行
ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) initialDelay時間後から毎回delayの遅延をおいてcommandを繰り返し実行

 なお、各メソッドの戻り値の型に指定されているScheduledFuture型というのは、インタフェースFutureのサブインタフェースだ。

 表1に示したメソッドのうち、メソッドscheduleは、引数に指定した時間だけ遅延させた後に処理を実行するメソッドである。同メソッドには、Callableオブジェクトに対して処理を行うものとRunnableオブジェクトに対して処理を行うものの2種類がある。

 scheduleAtFixedRateとscheduleWithFixedDelayは、繰り返し実行処理を行うメソッドである。これらのメソッドには、Runnableオブジェクト用のものしか存在しない。繰り返し戻り値を返す仕組みが用意されていないので、Callableオブジェクト用の繰り返し実行メソッドは用意されていないのである。

 メソッドscheduleAtFixedRateとscheduleWithFixedDelayの違いは、実行開始の間隔が一定時間になるか、実行終了と実行開始の間隔が一定時間になるかという点である(図2)。

図2:メソッドscheduleAtFixedRateとscheduleWithFixedDelayの違い

 インタフェース ScheduledExecutorServiceの実装クラスは、クラスThreadPoolExecutorのサブクラス ScheduledThreadPoolExecutorである。つまり、クラスScheduledThreadPoolExecutorにかかわる継承関係は、図3のようになる。

図3:クラスScheduledThreadPoolExecutorにかかわる継承関係

 クラス ScheduledThreadPoolExecutorについても、クラスThreadPoolExecutorと同様、クラスExecutorsに newScheduledThreadPoolとnewSingleThreadScheduledExecutorというファクトリ・メソッドが用意されている。

クラスScheduledThreadPoolExecutorの利用例

 それでは、クラスScheduledThreadPoolExecutorの利用法について説明しよう。まずは、クラス ScheduledThreadPoolExecutorのメソッドを利用可能にするために、タスクの実装内容を変更する。具体的には、リスト1の RandomWaitCallタスクは、Callableオブジェクトとして作成した(インタフェースCallableをimplementsした)ものだが、これを基に、RunnableオブジェクトとしてクラスRandomWaitRunを作成した(以下参照)。
import java.util.concurrent.*;

class RandomWaitRun implements Runnable {
public void run() {
try {
int r = (int)(Math.random() * 1000);
Util.println("sleep " + r);
Thread.sleep(r);
Util.println("done " + r);
} catch (InterruptedException e) {
Util.println("exception");
}
}
}

 リスト2に示したのは、このRandomWaitRunオブジェクトを1秒周期で繰り返し実行するプログラムである。
リスト2:クラスRandomWaitRunを1秒周期で実行するプログラム


import java.util.concurrent.*;
import static java.util.concurrent.TimeUnit.*;
import static java.util.concurrent.TimeUnit.*;
class ScheduleTest {
public static void main(String[] args) {
ScheduledExecutorService se
= Executors.newScheduledThreadPool(1);
se.scheduleAtFixedRate(new RandomWaitRun(), 0, 1, SECONDS);  (1)
}
}



 1秒周期の繰り返し処理は、メソッドscheduleAtFixedRateによって行っている。このプログラムを実行すると、以下のように出力された。
00.000 sleep 737
00.777 done 737
00.999 sleep 62
01.052 done 62
02.004 sleep 145
02.140 done 145
03.003 sleep 54
03.048 done 54
04.000 sleep 732
04.723 done 732
…略…

 「sleep」と記述されている行の時間を抜き出してみると、「00.000」、「00.999」、「02.004」、「03.003」、「04.000」となっており、ほぼ1秒間隔になっていることがわかる。

 次に、リスト2-(1)の部分を以下のように変更し、メソッドscheduleWithFixedDelayの動作を確認してみよう。
se.scheduleWithFixedDelay(new RandomWaitRun(), 0, 1, SECONDS);

 修正したプログラムを実行すると、以下のように出力される。
00.000 sleep 608
00.646 done 608
01.659 sleep 472
02.122 done 472
03.134 sleep 331
03.456 done 331
04.471 sleep 672
05.132 done 672
06.145 sleep 275
06.411 done 275
…略…

 「done」と「sleep」の行の間の時間を計算すると、以下のようになっており、毎回ほぼ1秒の遅延で繰り返し実行していることがわかる。
01.659 - 00.646 = 01.013
03.134 - 02.122 = 01.012
04.471 - 03.456 = 01.015
06.145 - 05.132 = 01.013

繰り返し実行処理の終了

 リスト2のプログラムは、いつまでも繰り返しが続き、処理が終了しない。繰り返し実行を停止させるには、インタフェースScheduledFutureのメソッドcancelを呼び出す必要がある。

 例えば、5秒後に繰り返し実行処理を停止させたければ、メソッドcancelを呼び出すタスクが5秒後に実行されるようにスケジューリングすればよい(以下参照)。
import java.util.concurrent.*;
import static java.util.concurrent.TimeUnit.*;

class ScheduleTest1 {
public static void main(String[] args) {
ScheduledExecutorService se
= Executors.newScheduledThreadPool(1);
final ScheduledFuture future
= se.scheduleAtFixedRate(new RandomWaitRun(), 0, 1, SECONDS);
se.schedule(new Runnable() {
public void run() {
future.cancel(true);
Util.println("cancel");
}
}, 5, SECONDS);
}
}

 このプログラムでは、メソッド scheduleAtFixedRateが返したScheduledFutureオブジェクトのメソッドcancelを呼び出すタスクを、メソッド scheduleで5秒後にスケジューリングしている。このように記述しておけば、5秒後に繰り返し実行処理は停止する。
プログラムの終了

 上のプログラムでは、確かに5秒後に繰り返し実行処理が停止するが、実はプログラム全体は終了しない。なぜなら、 ScheduledExecutorServiceオブジェクトのスレッドが残っているからである。プログラムを終了させるには、 ScheduledExecutorServiceオブジェクトのメソッドshutdownを呼び出す必要がある。

 メソッドshutdownを呼び出すと、その時点でまだ遅延時間が残っているタスクはキャンセルされる。したがって、この例では、5秒より前にメソッドshutdownを呼び出してしまうと、繰り返し処理が終わる前にプログラムが終了してしまうことになる。5秒間、繰り返し実行した後で終了させるには、5秒後に実行されるタスクの終了を待ってから、メソッドshutdownを呼び出す必要がある。この処理を実現したものが、以下に示したプログラムである(例外処理は省略している)。
import java.util.concurrent.*;
import static java.util.concurrent.TimeUnit.*;

class ScheduleTest2 {
public static void main(String[] args) throws Exception {
ScheduledExecutorService se
= Executors.newScheduledThreadPool(1);
final ScheduledFuture future
= se.scheduleAtFixedRate(new RandomWaitRun(), 0, 1, SECONDS);
final ScheduledFuture future2 = se.schedule(new Runnable() {
public void run() {
future.cancel(true);
Util.println("cancel");
}
}, 5, SECONDS);
future2.get();
se.shutdown();
}
}

 このプログラムでは、5秒後に繰り返し処理を停止させるタスクを一度ScheduledFutureオブジェクトとして格納し、そのオブジェクトからメソッドgetを呼び出すというかたちで処理することで、タスクの終了を待っている。その後に、メソッドshutdownを呼び出しているので、5秒間の繰り返し実行が終了した後でプログラムが終了する。

 タスクの終了を待ってからプログラムを終了する方法をもう1つ紹介しよう。インタフェースScheduledExecutorServiceの実装クラスはScheduledThreadPoolExecutorだが、実装クラス固有のメソッドとして、 setContinueExistingPeriodicTasksAfterShutdownPolicyや setExecuteExistingDelayedTasksAfterShutdownPolicyなどが用意されている。これらは、メソッド shutdownを呼び出したときの動作を調整するためのメソッドだ。これらを利用すると、メソッドshutdownを呼び出してもすぐには終了せず、スケジューリングされているタスクがすべて終了してからプログラムを終了するようになる。これらのメソッドを使用したプログラムは以下のようになる。
import java.util.concurrent.*;
import static java.util.concurrent.TimeUnit.*;

class ScheduleTest3 {
public static void main(String[] args) {
ScheduledExecutorService se
= Executors.newScheduledThreadPool(1);
ScheduledThreadPoolExecutor stpe
= (ScheduledThreadPoolExecutor)se;
stpe.setContinueExistingPeriodicTasksAfterShutdownPolicy(true);
stpe.setExecuteExistingDelayedTasksAfterShutdownPolicy(true);
final ScheduledFuture future
= se.scheduleAtFixedRate(new RandomWaitRun(), 0, 1, SECONDS);
se.schedule(new Runnable() {
public void run() {
future.cancel(true);
Util.println("cancel");
}
}, 5, SECONDS);
se.shutdown();
}
}

 以上、今回は、イグゼキュタの応用的なクラスについて解説した。

 イグゼキュタは、簡潔かつ強力なフレームワークである。今後、スレッドを使ったプログラムを作成する場合には、このフレームワークを使用するのが標準的な手法になるだろう。また、このフレームワークの実装は、プログラミングを行ううえでの参考になる。時間に余裕があれば、ぜひ同フレームワークの各クラスのソース・コードもご覧になっていただきたい。

http://www.itarchitect.jp/technology_and_programming/-/23561.html

트랙백

이 글과 관련된 글 쓰기 (트랙백 보내기)
TrackbackURL : http://charie.pe.kr/tb/1066857 [도움말]