SendaiLockサーバ

どうも、ずーっとプログラムを動かしているとエラーが出るので原因を調べると、
しばらくソケットが使えなくなることがあるそうです。
SO_LINGERとか、setReuseAddressとかを使うことで対応できるらしいので対応してみました。
phpのクライアントもSO_LINGERでソケットをすぐ解放するように修正。

あと、allocより、allocDirectのほうが速いそうなのだけど、アクターの場合にどうやったら共有できるのかわからなかったのですが、kmizuさんに教えてもらってバッファをダイレクトバッファを使用するように修正しました。ただ、アクターはどれかひとつしか動いてないような気がするのでなんだろうなとは思います。

package sendailock;

import scala.actors.Actor;
import scala.actors.Actor._;
import scala.collection.jcl.Conversions._;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.{LinkedHashMap,Map};
import scala.collection.mutable.Queue;
import java.util.Map.Entry;
import java.nio.channels.{ SelectionKey, Selector, ServerSocketChannel, SocketChannel }
import java.nio.charset.Charset;
import java.security.MessageDigest;

/**
 * リクエストクラス
 */
case class Request(val remoteAddress:String, val channel:SocketChannel, var lock:String){
var count:Int = 0;
}
object Main {
	/**
	 * メイン関数
	 */
	def main(args:Array[String]) {
		// ロックアクター開始
		val actor = new LockActor;
		actor.start();
		// Initメッセージを送る
		actor ! Init(1006);
	}
}
/**
 * メッセージクラス
 */
sealed abstract class Message;
case class Init(port:Int)			extends Message;
case class Select()					extends Message;
case class Accept(key:SelectionKey)	extends Message;
case class Read(key:SelectionKey)	extends Message;
case class Write(key:SelectionKey)	extends Message;

class LockActor extends Actor {


	def log(s:String){
	
	}

	/**
	 * アクター実行
	 */
	def act() {
		loop {
			react {
			// Initメッセージ処理
			case Init(port)		=> echo();init(port); this ! Select;
			case Select			=> echo();select();   this ! Select;
			case Accept(key)	=> echo();accept(key);
			case Read(key)		=> echo();read(key);
			case Write(key)		=> echo();write(key);
			}
		}
	}
	def echo() {
		println("threadid"+Thread.currentThread().getId());
	}
	// ユーザー
	val user = "hoge";
	// パスワード
	val pass = "pass";

	// md5に変換されたパスワード
	val md5pass = md5(pass);

	// 先頭の文字列
	val head = "LOCK " + user + " " + md5pass + " ";
	
	// バッファのサイズ 1kあれば足りるはず。
	val BUF_SIZE = 1024;

	// セレクター
	val selector:Selector = Selector.open();

	// サーバーチャンネル
	val serverChannel:ServerSocketChannel = ServerSocketChannel.open();

	/**
	 * 初期化処理
	 * @param Int port サーバのポート番号
	 */
	def init(port:Int) {
		// サーバーチャンネルをブロックしない設定
		serverChannel.configureBlocking(false);
		// サーバーチャンネルのソケットを指定ポートでバインド
		serverChannel.socket().setReuseAddress(true);
		serverChannel.socket().bind(new InetSocketAddress(port));
		// セレクターを登録
		serverChannel.register(selector, SelectionKey.OP_ACCEPT);
		// ログ
		log("lock server start port=" + port);
	}

	/**
	 * セレクト処理
	 * 
	 * セレクターに登録してあるキーを取り出してメッセージを送る
	 */
	def select() {
		selector.select();
		selector.selectedKeys().foreach { key =>
			if (key.isAcceptable()) {
				// アクセプト
				this ! Accept(key);
			} else
			if (key.isWritable() && key.isValid()) {
				// 出力
				this ! Write(key);
			} else
			if (key.isReadable()) {
				// 読み込み
				this ! Read(key);
			}
		}
	}
	/**
	 * アクセプト
	 * @param SelectionKey key セレクションキー
	 */
	def accept(key:SelectionKey) {

		// サーバーソケットチャンネルを取得
		val socket:ServerSocketChannel = key.channel().asInstanceOf[ServerSocketChannel];

		// アクセプト
		socket.accept() match {
		// nullなら何もしない
		case null =>
		// channelのとき
		case channel:SocketChannel =>
			connect += 1;
			// アドレスを取得
			val remoteAddress:String = channel.socket().getRemoteSocketAddress().toString();
			// リクエストオブジェクト作成
			val request = Request(remoteAddress, channel, null);
			log(request + ":connect");
			// ノンブロッキングモードにする
			channel.configureBlocking(false);
			// セレクターに読み込み登録する
			channel.register(selector, SelectionKey.OP_READ, request);
		}
	}

	val buf = ByteBuffer.allocateDirect(BUF_SIZE);
	/**
	 * 読み込み
	 * @param SelectionKey key セレクションキー
	 */
	def read(key:SelectionKey) {

		// リクエストオブジェクト取得
		val request:Request = key.attachment().asInstanceOf[Request];

		// ソケットチャンネル取得
		val channel = key.channel().asInstanceOf[SocketChannel];
		log(request + ":read");
		try {
			buf.clear();
			// 読み込み
			request.channel.read(buf) match {
			// 閉じる
			case -1 => close(request);
			// なにもしない
			case 0 =>
				log(request + ":zero");
				request.count += 1;
				if(request.count > 1000) close(request);
 			// 読み込みあり
			case x =>
				// バッファ読み込み
				buf.flip();
				var str = Charset.forName("UTF-8").decode(buf).toString();

				// ヘッダ部取得
				var strHead = str.substring(0, head.length);
				
				log(request + ":"+x+"[" + str + "]");
				buf.flip();

				// ヘッダチェック
				if (head != strHead) {
					// 駄目なのでngを返す
					buf.put("ng\r\n".getBytes());
					buf.flip();
					request.channel.write(buf);
					// 切断
					close(request);
				} else {
					// okなのでロック名保存
					request.lock = str.substring(head.length, str.length - 1);
					
					// 書き込みオッケー状態にするのは、ロック取得できたらにしよう。
					locks.synchronized {
						val iterator = locks.entrySet().iterator();
						while( iterator.hasNext()) {		
							val entry = iterator.next();
							log(entry.getKey() + "は" + entry.getValue());
						}

						log("locks size="+locks.size());
						val queue = locks.get(request.lock);
						if (queue == null) {
							locks.put(request.lock, new Queue[Request]);
							request.channel.register(selector, SelectionKey.OP_WRITE, request);
						} else {
							queue.enqueue(request);
						}
					}
				}
			}
		} catch {
		// 接続断などなので接続を閉じる
		case e:java.io.IOException => close(request);
		}
	}

	val wbuf:ByteBuffer = ByteBuffer.allocateDirect(BUF_SIZE);
	/**
	 * 出力
	 * @param SelectionKey key キー情報
	 */
	def write(key:SelectionKey) {
		val request:Request = key.attachment().asInstanceOf[Request];
		log(request + ":write");
		try {
			wbuf.clear();
			wbuf.put("ok\r\n".getBytes());
			wbuf.flip();
			request.channel.write(wbuf);
			request.channel.register(selector, SelectionKey.OP_READ, request);
		} catch {
		case e:java.io.IOException => close(request);
		}
	}
	var close:Int = 0;
	var connect:Int = 0;
	/**
	 * 接続断
	 * @param Request request リクエストオブジェクト
	 */
	def close(request:Request) {
		close += 1;
		request.channel.close();
		log(request + ":close");
		log("close "+close+" connect"+connect);
		locks.synchronized {
			val queue = locks.get(request.lock);

			if (queue != null) {
				if (queue.isEmpty) {
					locks.remove(request.lock);
				} else {
					val nextRequest = queue.dequeue();
					nextRequest.channel.register(selector, SelectionKey.OP_WRITE, nextRequest);
				}
			}

			log("locks size="+locks.size());
			val iterator = locks.entrySet().iterator();
			while( iterator.hasNext()) {		
				val entry = iterator.next();
				log(entry.getKey() + "は" + entry.getValue());
			}
		}
	}
	val locks:LRUMap = new LRUMap(0x1000);
	/**
	 * LRUマップ
	 * 古い順から消えていくマップ
	 * @param Int maxSize 最大サイズ
	 */
	class LRUMap(maxSize:Int) extends LinkedHashMap[String, Queue[Request]](16, 0.75f, true) {
		/**
		 * 削除
		 * LRUマップから削除するかどうかの判定をする
		 */
		protected override def removeEldestEntry(eldest:Map.Entry[String, Queue[Request]]):Boolean = {
			// maxSizeが0より大きくてsize()がmaxSizeより大きいときに削除
			return maxSize > 0 && size() > maxSize;
		}
	}
	/**
	 * md5文字列計算
	 * @param String str 文字列
	 * @return md5文字列
	 */
	private def md5(str:String):String = {
		try {
			MessageDigest.getInstance("md5").digest(str.getBytes())
			.foldLeft("") {(s:String, i:Byte) =>
				val n = i & 0xff;
				s + (if (n < 0x10) "0" else "") + Integer.toHexString(n);
			}
		} catch {
		case e:Exception => ""
		}
	}
}

phpクライアント

<?php

$logger = new Logger();
$logger->debug("start");

$lock = new SendaiLock("hoge", "pass", 1006, "localhost", $logger);

for($i=0;$i<100000;$i++) {
	$no = $i % 100;// rand(1,100);
	$logger->debug("lock $no");
	$rc = $lock->lock($no);
	$logger->debug("process ".var_export($rc,true));
//	if($rc == false) exit;
//	sleep(1);
	$rc = $lock->unlock();
//	$logger->debug("unlock ".var_export($rc, true));
}
$rc = $lock->stop();
$logger->debug("stop ".var_export($rc, true));

class Logger {
    function debug($str) {
        echo $str."\n";
    }
}

/**
 * ネットワークロッククライアント
 * 独自ロックサーバに接続してロックする
 * phpが終了するとソケットが閉じるので自動的にロックが外れる
 */
class SendaiLock {

    /**
     * コンストラクタ
     * @param int $port ポートNo
     * @param string $address サーバアドレス
     * @param object $logger ロギングオブジェクト
     */
    public function __construct($user, $pass, $port, $address, $logger) {
        $this->user = $user;
        $this->md5 = md5($pass);
        $this->port = $port;
        $this->address = $address;
        $this->logger = $logger;
    }
    public function __destruct() {
        $this->unlock();
    }
    function stop() {
        return $this->lock("stop lock");
    }

    /**
     * ロック
     * @param $key ロックキー
     * @return bool true 成功 false 失敗
     */
    function lock($key) {
        if (($this->sock = socket_create(AF_INET, SOCK_STREAM, SOL_TCP)) === false) { 
            return false;
        }
        $arrOpt = array('l_onoff' => 1, 'l_linger' => 1);
        //socket_set_block($this->sock);
        socket_set_option($this->sock, SOL_SOCKET, SO_LINGER, $arrOpt);
    
        if ((socket_connect($this->sock, $this->address, $this->port)) === false) {
            return false;
        }
        $buf = "LOCK {$this->user} {$this->md5} $key\n";
        if (socket_write($this->sock, $buf, strlen($buf)) === false) {
            return false;
        }
        if (($readbuf = socket_read($this->sock, 2048, PHP_NORMAL_READ)) === false) {
            return false;
        }
        return ($readbuf == "ok\r");
    }

    /**
     * アンロック
     */
    function unlock() {
        if ($this->sock === false) return true;
        socket_close($this->sock);
        $this->sock = false;
        return true;
    }
}

?>