yanbe.diff このページをアンテナに追加 RSSフィード

2009-04-13

SSH接続を起点としたホスト間通信のブートストラップ

23:10 |  SSH接続を起点としたホスト間通信のブートストラップ - yanbe.diff を含むブックマーク はてなブックマーク -  SSH接続を起点としたホスト間通信のブートストラップ - yanbe.diff  SSH接続を起点としたホスト間通信のブートストラップ - yanbe.diff のブックマークコメント

概要

ネットワーク上の複数のホストが協調して動作するようなプログラムの実験をやろうとすると、サーバープロセスをあらかじめ起動しておいたり、サーバー側のプログラムを更新したら適宜デプロイしないといけないのは以前から面倒だと思っていたのですが、以下のやり方で解決できることを知りました。

An example/demo of how to use the managers.SyncManager, Process and others to build a system which can distribute processes and work via a distributed queue to a “cluster” of machines on a network, accessible via SSH.

http://docs.python.org/library/multiprocessing.html

これは、最近、Python 2.6からサポートされたmultiprocessingという「プロセス間通信をPythonのマルチスレッド処理と同じAPIでラップするモジュール」に興味があって実装や使用例を調べているうちに見つけました。

具体的には以下の部分です。少し長いです。

class Host(object):
    '''
    Represents a host to use as a node in a cluster.

    `hostname` gives the name of the host.  If hostname is not
    "localhost" then ssh is used to log in to the host.  To log in as
    a different user use a host name of the form
    "username@somewhere.org"

    `slots` is used to specify the number of slots for processes on
    the host.  This affects how often processes will be allocated to
    this host.  Normally this should be equal to the number of cpus on
    that host.
    '''
    def __init__(self, hostname, slots=None):
        self.hostname = hostname
        self.slots = slots

    def _start_manager(self, index, authkey, address, files):
        if self.hostname != 'localhost':
            tempdir = copy_to_remote_temporary_directory(self.hostname, files)
            debug('startup files copied to %s:%s', self.hostname, tempdir)
            p = subprocess.Popen(
                ['ssh', self.hostname, 'python', '-c',
                 '"import os; os.chdir(%r); '
                 'from distributing import main; main()"' % tempdir],
                stdin=subprocess.PIPE
                )
            data = dict(
                name='BoostrappingHost', index=index,
                dist_log_level=_logger.getEffectiveLevel(),
                dir=tempdir, authkey=str(authkey), parent_address=address
                )
            pickle.dump(data, p.stdin, pickle.HIGHEST_PROTOCOL)
            p.stdin.close()

#
# Copy files to remote directory, returning name of directory
#

unzip_code = '''"
import tempfile, os, sys, tarfile
tempdir = tempfile.mkdtemp(prefix='distrib-')
os.chdir(tempdir)
tf = tarfile.open(fileobj=sys.stdin, mode='r|gz')
for ti in tf:
    tf.extract(ti)
print tempdir
"'''

def copy_to_remote_temporary_directory(host, files):
    p = subprocess.Popen(
        ['ssh', host, 'python', '-c', unzip_code],
        stdout=subprocess.PIPE, stdin=subprocess.PIPE
        )
    tf = tarfile.open(fileobj=p.stdin, mode='w|gz')
    for name in files:
        tf.add(name, os.path.basename(name))
    tf.close()
    p.stdin.close()
    return p.stdout.read().rstrip()

#
# Code which runs a host manager
#

def main():
    # get data from parent over stdin
    data = pickle.load(sys.stdin)
    sys.stdin.close()

SSHで接続可能なホストに接続し、パイプでつながれた標準入出力を介して、リモートからローカルへ「プログラムが転送・展開された一時ディレクトリの情報」を、ローカルからリモートへ「リモートからローカルへ接続を確立するために必要な情報」を送っています。

このようにすることで、プログラムのデプロイも通信もすべてローカルホストを起点に、ホスト間の通信をブートストラップ出来ます。これはいろんな局面で応用出来そうです。この業界では普通に使われてるテクニックな気もするのですが、何か名前があるのでしょうか。名前をつけるとしたら、なんとなくTrackbackみたいなイメージなのでConnect-backとかですかね。

確認のためにこのテクニックを使ったエコーサーバーを書いてみました。以下のコードではサーバー(ローカル)が待ち受けするポート番号は0番を指定していて、実際は空いているポートが適当に選ばれるのですが、これをクライアント(リモート)に、パイプでつながれた標準入出力を介して伝えています。

ところで実ははてなでEmbedded Gistsがこっそり使えるようになっていたんですね。知りませんでした。自分で書いたコードに関しては、スーパーpre記法よりもコードのメンテナンスが楽でいいかもしれません。元のGistをupdateすると、コードを貼り付けた先々でも自動的に同期がとられるのも便利です。

余談ですが、このEmbedded Gist、どなたかにTwitterで教えていただいたのだが失念してしまいました(すいません)。Twitter周りの検索は公式は日本語の検索に問題があったり、有志によるものはログに抜けがあったりしてこういうときに困ります。

トラックバック - http://subtech.g.hatena.ne.jp/y_yanbe/20090413