DRbQS で処理を分散させる(2. 実装のメモ)

どのような仕組みになっているのかメモしておく。

計算させるマシンについて

server は分割された処理(タスク)を Marshal.dump でバイト列にして druby の tuple space に登録する。 node は自分が実行できるタスクがあれば取得して、 Marshal.load でオブジェクトを復元して、指定されたメソッドを実行する。 その処理が終わったら node は結果を Marshal.dump して druby の tuple space に送り、server はそれを受け取り Marshal.load でもとに戻す。 server にタスクを登録したときに後処理が指定されていれば node から得た結果に対してその後処理を実行する。

node は複数の worker process を持っていて、 マルチコア CPU の性能を使い切ることができる。

server から node に定期的に node が動いているかを確認するために メッセージが送られ、それに返答しなければ、 node が動いていないと判断され、server にある node リストから削除される。 これらのやりとりは、ユーザが気にする必要はない。

セキュリティ

druby の ACL だけなので、外部に直接つながっているマシンでは かなり危険。

ファイルの送信

サーバーから計算ノード、また、逆に計算ノードからサーバーへ ファイルを sftp で送信することができる。

サーバーや計算ノードが動いている SSH サーバー間では 自由にログインできるようにしておく必要がある。 今のところ、パスワードや鍵は指定できないので注意。

ホームディレクトリにあるディレクトリ ~/.drbqs 以下に 実行されているサーバーや計算ノードの情報が保存される。 そこから、計算ノードは自身が接続しているサーバーが 同一ホスト上なのかを調べて sftp を使うのか、単にファイルの移動をするのか選択する。 したがって、サーバーと計算ノードを同一ホスト上で実行する場合は sftp のユーザー名やパスワードを設定する必要はない。

サーバー

設定項目

druby のデフォルトのポートは 13500。 複数のサーバーを起動させるためには、ポート番号が重ならないように 指定して実行する。 また、別のマシンに計算ノードを動かさない場合は UNIX domain socket を 使うこともできる。

ホームディレクトリ以下の ~/.drbqs/acl.txt があれば ACL のデフォルトの設定になる。

サーバー、計算ノード間でファイルの送信をする場合は、 sftp のユーザー名やホスト名、ファイルを保存するディレクトリを指定する。

タスク

サーバーには、druby の tuple space を用意する前に、 処理を分割してタスクをセットしておく。 あらかじめ、タスクのオブジェクトすべてを初期化してしまっても良いのだが、 それだとメモリが足りなくなることがある。 そのため、タスクを次々と生成する DRbQS::Task::Generator をサーバーに セットしておき、必要に応じてタスクが生成されるようにするとよい。

タスクには実行する計算ノードのグループを指定することができ、 特定の計算ノードだけに特定のタスクを実行させることができる。 サーバーと同一の計算ノードは自動で :local グループに入るので、 サーバーと同一の計算ノードに実行させたいタスクは :local グループにする。 他のグループについてはユーザーが適宜、タスクと計算ノードに指定して利用する。

すべてのワーカープロセスで実行される特殊なタスクがある。 初期化時と終了時に実行されるタスクを、それぞれ、 DRbQS::Server#set_initialization_task と DRbQS::Server#set_finalization_task で設定する。

フック

サーバーがある状態になったときに実行する処理を指定することができる。

empty_queue
計算ノードに割り当てられる前のタスクを入れたキューが空になったとき実行する処理
process_data
「drbqs-manage send」で文字列かファイル名を送信したときに実行する処理
finish
すべてのタスクが終了したときに実行する処理
task_assigned
すべてのタスクが計算ノードに割り当てられたときに実行する処理

終了

デフォルトの設定では、サーバーにタスクがなくなれば、 計算ノードに終了するように命令し、 計算ノードの終了を数秒待ってサーバーのプロセスが終了する。

計算ノード

処理

計算ノードは、まず、サーバーに接続してノード ID を受け取り、 計算ノード実行時に指定された数のワーカープロセスを実行する。 DRbQS::Server#set_initialization_task でセットされたタスクがあれば実行する。

計算ノード本体のプロセスはサーバーとの通信を担当する。 はじめは、ワーカープロセスの数だけサーバーからタスクを取得して ワーカープロセスに送る。 ワーカープロセスはタスクが送られたらそれを実行して結果を計算ノード本体に返す。 計算ノードはタスクの結果があればサーバーに送り、 ワーカープロセスがタスクを待っていればサーバーからタスクを取得して割り当てる。 タスクを取得するときには、計算ノードが該当するグループに関するものだけ取得する。

また、サーバーからシグナルを受け取るとそれに対応する。 計算ノードを直ちに終了する、 DRbQS::Server#set_finalization_task で指定されたタスクを実行して終了する、 ワーカープロセスを休ませる、起こす、 計算ノードが動いているかどうか確認するために返答するなどである。

サーバーから計算ノードを終了させるシグナルを受けるまで、 上の処理を繰り返す。

便利なメソッド

計算ノードで実行するメソッドで ファイルの転送と一時ディレクトリ、一時ファイルを使うには 次のメソッドやオブジェクトを使う。 タスクの処理が終わったときに、 送信予定のファイルは送信した後に削除され、 一時ファイルはすべて削除される。

DRbQS::Transfer.enqueue
計算ノードからサーバー送信するファイルを登録する。
DRbQS::Transfer.compress_enqueue
計算ノードからサーバー送信するファイルを登録するのだが、そのときに圧縮する。
DRbQS::Transfer::FileList.new(*files)
計算ノードがサーバーから取得するファイルリストを保存するオブジェクト。 DRbQS::Transfer::FileList#path はサーバーからファイルをダウンロードし、そのファイルのパスの配列を返す。
DRbQS::Temporary.filename
一時ディレクトリ以下にファイル名を生成する FileName オブジェクトを返す。
DRbQS::Temporary.directory
一時ディレクトリを作成して返す。
DRbQS::Temporary.file
一時ファイルとして使えるファイル名を返す。

Tags of current page

, ,