cometネタ,ふたたび

1対1チャット

1対1でチャットするcometベースの仕組みを作ってみました.クライアント側はまだブラッシュアップしてる最中ですが,サーバ側のコードは落ち着いてきたのでまとめてみます.このサーバは

  1. html, css, jsファイルのダウンロード
  2. /sendにクライアントが投げてきたpostメッセージの受信
  3. /helloにクライアントがJSONで投げてきたgetメッセージの受信
  4. 受信したpostメッセージを,getメッセージへのresponseとして特定のクライアントにJSONP形式で送信
  5. 死活監視を兼ねて,システムにログイン中のユーザ一覧をgetメッセージへのresponseとして全クライアントにJSONP形式で送信

という機能を持っています.

POE::Component::Server::HTTP

my $server = POE::Component::Server::HTTP->new(
    Port           => 8089,
    PreHandler     => { '/hello' => \&caller, },
    ContentHandler => {
        '/send'  => \&send,
        '/hello' => \&hello,
        '/'      => \&rwfile,
    },
    PostHandler  => { '/hello' => [ \&cleanup ], },
    ErrorHandler => { '/hello' => \&clearnup, },
);

ContentHandlerでは,'/'で始まるpathを含むrequestが来た場合,その処理をrwfileメソッドに投げています.'/', '/css/screen.css', '/favicon.ico'などのrequestがここにヒットします.

Handlers are put on a stack in fifo order. The path /foo/bar/baz/honk.txt will first push the handlers of / then of /foo/ then of /foo/bar/, then of /foo/bar/baz/, and lastly /foo/bar/baz/honk.txt.

つまり,'/css/screen.css'を対象にしたメソッドを登録していないので,次に'/css'向けを探し,それもないので'/'向けを探して,そこでヒットするわけです.PreHandlerではコンテンツとは直接関係のない,$resオブジェクトを%receiverハッシュに登録する作業をcallerメソッドで行っています.同様にPostHandler,ErrorHandlerでは%receiverハッシュからの削除作業をcleanupメソッドで行っています.

定期的なタスクの実行

POE::Session->create(
    inline_states => {
        _start => sub {
            $_[HEAP]->{next_alarm_time} = int( time() ) + 5;
            $_[KERNEL]->alarm( tick => $_[HEAP]->{next_alarm_time} );
        },

        tick => sub {
            my ( $j, $json );
            $j    = JSON::Any->new;
            $json = $j->objToJson(
                {
                    mem =>
                      [ sort map { $receiver{$_}{NAME} } ( keys %receiver ) ],
                    callback => "showmembers"
                }
            );
            broadcast_message( 'JsonCallback(' . $json . ')' );
            $_[HEAP]->{next_alarm_time} += 1;
            $_[KERNEL]->alarm( tick => $_[HEAP]->{next_alarm_time} );
        },
    },
);

5秒毎にユーザのリストをJSONP形式で全クライアントに対して送信します.ユーザがログイン,ログアウトしたタイミングでユーザに対して送信することも考えたのですが,短い時間の間にイベントが複数発生するとサーバに負荷がかかりそうなので,定期実行することにしました.コードの解説はPOE: Cookbook - Recurring Alarmsを参考にしてください.クライアントに送信するJSONP用のメソッド名はJsonCallbackで固定しています.一般的にはクライアントが指定してきた名前を使ってサーバから送り返すのがJSONPですが,JsonCallbackというのも単なるディスパッチャで,実際にはshowmembersというメソッドで処理するので,このような形式にしています.

rwfile

sub rwfile {
    my ( $req, $res ) = @_;

    local *ret = sub {
        my $name = shift;
        $res->content( io($name)->all );
        $res->content_type( MIME::Types->new->mimeTypeOf($name) );
        $res->code(RC_OK);
    };

    my $dispatch = {
        '/'                 => sub { ret('html/index.html') },
        '/css/screen.css'   => sub { ret('html/css/screen.css') },
        '/src/jsr_class.js' => sub { ret('html/src/jsr_class.js') },
        '/src/jquery.js'    => sub { ret('html/src/jquery.js') },
      }->{ $req->uri->path }
      || sub { $res->code(RC_NOT_FOUND) };
    $dispatch->();
    return RC_OK;
}

クライアントが指定したファイルを送り返すメソッドです.JSONPを扱うサーバとファイルを扱うサーバを分けてもよかったのですが,今回はなんとなく1台でまとめてみました.IO::Allという気が狂ってるモジュールを使っているので,ファイル内容を変数に取り込むコードが'io($name)->all'と単純化できています.このIO::Allというモジュール,どのくらい狂ってるかというと,

perl -MIO::All -e 'io(":8080")->fork->accept->(sub { $_[0] < io(-x $1 ? "./$1 |" : $1) if /^GET \/(.*) / })'

ワンライナーhttpdサーバが書けるくらいのすごさです.詳しくはPODをご覧ください.メソッドをシンプルに書くためにクロージャーとlocalを使用しています.miyagawaさんがWeb::Scraperで使っていたのを思い出して参考にしてみました.

JSONP処理

sub hello {
    my ( $req, $res ) = @_;
    my $q;
    my $from;
    {
        my $c = HTTP::Request::AsCGI->new($req)->setup;
        $q = new CGI;
    }
    $from = $q->param('from');

    $receiver{ refaddr $res}{NAME} = $from;
    return RC_WAIT;
}

クライアントが(おそらく)scriptタグを使って送信してきたGETメソッドを処理します.送信する際にはクライアント側でキャッシュされないように工夫してください.ここで使うのはfromというパラメータで送ってきたユーザ名です.パスワードなんかは使っていません.ユーザ名として日本語を使えるようにするのであれば,encodeURI()をクライアント側で使用すべきです.RC_WAITをreturnしているので,このセッションはPOEが保持します.解放されるのはcast_messageメソッド中で$res->continue()が動くタイミングになります.

メッセージ総受信

sub send {
    my ( $req, $res ) = @_;
    my $q;
    my ( $from, $to, $msg );
    my ( $j, $json );
    {
        my $c = HTTP::Request::AsCGI->new($req)->setup;
        $q = new CGI;
    }
    $from = $q->param('from');
    $to   = $q->param('to');
    $msg  = $q->param('msg');

    $j    = JSON::Any->new;
    $json = $j->objToJson(
        {
            msg      => { from => $from, body => uri_unescape($msg) },
            callback => "showmessage"
        }
    );
    unicast_message( $to, 'JsonCallback(' . $json . ')' );
    unicast_message( $from, 'JsonCallback(' . $json . ')' ) if ( $to ne $from );
    return RC_OK;
}

クライアントが(おそらく)ajaxを利用して送ってきたメッセージを送信者と受信者に対してJSONP形式で発信します.受信したrequestのパラメータのうち使うのはfrom, to, msgで,クライアントはJsonCallbackというディスパッチャメソッドで受け,実際に受けたメッセージを処理するのがshowmessageというメソッドになります.

script全体

以下のようになります.

#!/usr/local/bin/perl

use warnings;
use strict;
use POE qw(Component::Server::HTTP);
use HTTP::Status;
use Scalar::Util qw(refaddr);
use DateTime;
use CGI qw(:standard);
use HTTP::Request::AsCGI;
use JSON::Any;
use URI::Escape;
use MIME::Types;
use IO::All;

my (%receiver);

my $server = POE::Component::Server::HTTP->new(
    Port           => 8089,
    PreHandler     => { '/hello' => \&caller, },
    ContentHandler => {
        '/send'  => \&send,
        '/hello' => \&hello,
        '/'      => \&rwfile,
    },
    PostHandler  => { '/hello' => [ \&cleanup ], },
    ErrorHandler => { '/hello' => \&clearnup, },
);

POE::Session->create(
    inline_states => {
        _start => sub {
            $_[HEAP]->{next_alarm_time} = int( time() ) + 5;
            $_[KERNEL]->alarm( tick => $_[HEAP]->{next_alarm_time} );
        },

        tick => sub {
            my ( $j, $json );
            $j    = JSON::Any->new;
            $json = $j->objToJson(
                {
                    mem =>
                      [ sort map { $receiver{$_}{NAME} } ( keys %receiver ) ],
                    callback => "showmembers"
                }
            );
            broadcast_message( 'JsonCallback(' . $json . ')' );
            $_[HEAP]->{next_alarm_time} += 1;
            $_[KERNEL]->alarm( tick => $_[HEAP]->{next_alarm_time} );
        },
    },
);

POE::Kernel->run();

exit();

sub rwfile {
    my ( $req, $res ) = @_;

    local *ret = sub {
        my $name = shift;
        $res->content( io($name)->all );
        $res->content_type( MIME::Types->new->mimeTypeOf($name) );
        $res->code(RC_OK);
    };

    my $dispatch = {
        '/'                 => sub { ret('html/index.html') },
        '/css/screen.css'   => sub { ret('html/css/screen.css') },
        '/src/jsr_class.js' => sub { ret('html/src/jsr_class.js') },
        '/src/jquery.js'    => sub { ret('html/src/jquery.js') },
      }->{ $req->uri->path }
      || sub { $res->code(RC_NOT_FOUND) };
    $dispatch->();
    return RC_OK;
}

sub caller {
    my ( $req, $res ) = @_;
    $receiver{ refaddr $res}{SESSION} = $res;
    $req->headers->header( Connection => 'close' );
    return RC_OK;
}

sub cleanup {
    my ( $req, $res ) = @_;
    delete $receiver{ refaddr $res};
}

sub hello {
    my ( $req, $res ) = @_;
    my $q;
    my $from;
    {
        my $c = HTTP::Request::AsCGI->new($req)->setup;
        $q = new CGI;
    }
    $from = $q->param('from');

    $receiver{ refaddr $res}{NAME} = $from;
    return RC_WAIT;
}

sub send {
    my ( $req, $res ) = @_;
    my $q;
    my ( $from, $to, $msg );
    my ( $j, $json );
    {
        my $c = HTTP::Request::AsCGI->new($req)->setup;
        $q = new CGI;
    }
    $from = $q->param('from');
    $to   = $q->param('to');
    $msg  = $q->param('msg');

    $j    = JSON::Any->new;
    $json = $j->objToJson(
        {
            msg      => { from => $from, body => uri_unescape($msg) },
            callback => "showmessage"
        }
    );
    unicast_message( $to, 'JsonCallback(' . $json . ')' );
    unicast_message( $from, 'JsonCallback(' . $json . ')' ) if ( $to ne $from );
    return RC_OK;
}

sub broadcast_message {
    my $message = shift;
    map { cast_message( $receiver{$_}{SESSION}, $message ) } ( keys %receiver );
}

sub unicast_message {
    my ( $to, $message ) = @_;
    map { cast_message( $$_{SESSION}, $message ); }
      grep { $$_{NAME} eq $to } ( values %receiver );
}

sub cast_message {
    my ( $res, $message ) = @_;
    $res->code(RC_OK);
    $res->content_type('text/plain');
    $res->headers->header( CacheControl => 'no-cache' );
    $res->headers->header( Expires      => '-1' );
    $res->content( $message . "\n" );
    $res->continue();
}

クライアント側のjavascript/htmlもブラッシュアップが落ち着いてきたら公開してみます.誰かの参考になるといいけどな,と思いつつ.