Sendai Lock

複数Webサーバ上でデータを共有する場合は DataBase を使います。
しかしある程度サーバが重くなってくると負荷対策が必要となります。
そこで登場するのがkey value型の高速なメモリキャッシュサーバであるmemcacheです。
memcacheを使えばデータベースにアクセスするよりも高速にデータを保存、取り出しすることが出来ます。
しかしmemcacheに簡単にロックする機能がないので不便です。
スピンロックを実装する方法がありますがスレッドが死んだらデッドロックするかもしれません。
DBの行ロックを使ったロック方法もあるかもしれませんが、色々と重くなりそうです。
socket で接続してロックして接続が切れたらアンロックするだけのサーバとクライアントライブラリがあったらいいのに!!

ということで、作ったのがsendai lockです。
名前はもちろんtokyo cabinetのパクリです。
しかし、実装は適当です。すいません。すいません。すいません。

使い方

include_once("SendaiLock.class.php");
$lock = new SendaiLock(1006, "localhost");
$rc = $lock->lock("lockKey");
sleep(10);
$rc = $lock->unlock();

SendaiLockクラスをインクルードして、ポートNoとサーバを指定してオブジェクトを作成し、ロックキー名を指定してロックメソッドを呼び出せばロックできます。アンロックしたいときはunlockメソッドを呼ぶだけです。またプロセスが途中で切れればソケットが切断され自動的にロックは外れます。

仕組み

サーバ

サーバは Java で作成しました。 Java はマルチスレッドで同期の仕組みを言語で持っているのでサーバをさくさくっと作れるからです。


基本的な仕組みはとても単純です。
クライアントからロックのキーを受け取ってそのキーをjavaのsynchronizedでロックして接続が切れるのを待つだけです。
ロックのオブジェクトは連想配列に保存して共有します。


これだけでも十分ですが、キーの値が増えてくると必要以上にメモリも食ってしまいます。
ソケットの接続数には限界があるので必要以上にキー情報を取っておくのはメモリの無駄です。
そこで、キャッシュの仕組みで使われる LRUMap を使って古くて使われていないキーのオブジェクトは消すようにしました。
Java には LinkedHashMap という LRUMap を実装するためにあるようなクラスがあるので LinkedHashMap を使用して LRUMap を実現しています。

クライアント

クライアントは php で使うので php で作成されています。
ソケットでサーバに接続してロックのキーを渡します。
サーバから ok が帰ってきたらロック成功です。
ロックが終わったら目的の処理を行います。
アンロックするときは接続を切るだけです。

以下ソースです。

<?php
/**
 * ネットワークロッククライアント
 *
 * 独自ロックサーバに接続してロックする
 * phpが終了するとソケットが閉じるので自動的にロックが外れる
 */
class SendaiLock {

    /**
     * コンストラクタ
     * @param int $port ポートNo
     * @param string $address サーバアドレス
     */
    public function __construct($port, $address) {
        $this->port = $port;
        $this->address = $address;
    }

    /**
     * ロック
     * @param $key ロックキー
     * @return bool true 成功 false 失敗
     */
    function lock($key) {
        if (($this->sock = socket_create(AF_INET, SOCK_STREAM, SOL_TCP)) === false) { 
            return false;
        }
        if ((socket_connect($this->sock, $this->address, $this->port)) === false) {
            return false;
        }
        $buf = "LOCK $key\n";
        if (socket_write($this->sock, $buf, strlen($buf)) === false) {
            return false;
        }
        if (($readbuf = socket_read($this->sock, 2048, PHP_NORMAL_READ)) === false) {
            return false;
        }
        return ($readbuf == "ok");
    }
    /**
     * アンロック
     */
    function unlock() {
        socket_close($this->sock);
    }
}

?>
import java.net.*;
import java.io.*;
import java.util.*;

public class SendaiLockD extends Thread {

    public static void main(String argv[]) {
        try {
            ServerSocket serverSocket = new ServerSocket(1006);
            System.out.println("*** start lock server");
            while(true) {
                try {
                    Socket socket = serverSocket.accept();
                    System.out.println(socket.getInetAddress() + " accepted");
                    new SendaiLockD(socket).start();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
            System.exit(-1);
        }
    }

    private Socket socket = null;
    private static Map map = new LRUMap(1000);

    public SendaiLockD(Socket socket) {
        this.socket = socket;
    }

    public void run() {
        String key = "";
        String lock = "";
        try {
            PrintWriter out = new PrintWriter(socket.getOutputStream(), true);
            BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            String[] tokens = in.readLine().split(" ");
            key = tokens[1];
            System.out.println("lock start " + key);
            synchronized (map) {
            	lock = (String)map.get(key);
                if(lock == null) map.put(key, lock = key);
                System.out.println("map size="+map.size());
            }
            synchronized (lock) {
                out.println("ok");
                in.readLine();
            }
        } catch(Exception e) {
        }
        try {
            socket.close();
        }catch(Exception e) {
        }
        System.out.println("unlock " + key);
    }
}

class LRUMap extends LinkedHashMap {

    private int maxSize;

    public LRUMap(int maxSize) {
        super(16, 0.75f, true);
        this.maxSize = maxSize;
    }
    protected boolean removeEldestEntry(Map.Entry eldest) {
        return maxSize > 0 && size() > maxSize;
    }
}

追記

と思ってたのですがLRUMapは使わずに String.intern で単一化された文字列を使えば十分かもしれないので、SendaiLockD2を以下に追加しておきます。
windows xpjava version "1.6.0_17"上でLRUMapはwindows上で20M弱、1〜1000万の数字のキーで 上で31Mbytesくらいのメモリを使用します。

import java.net.*;
import java.io.*;

public class SendaiLockD2 extends Thread {

    public static void main(String argv[]) {
        try {
            ServerSocket serverSocket = new ServerSocket(1006);
            System.out.println("*** start lock server");
            while(true) {
                try {
                    Socket socket = serverSocket.accept();
                    System.out.println(socket.getInetAddress() + " accepted");
                    new SendaiLockD(socket).start();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
            System.exit(-1);
        }
    }

    private Socket socket = null;

    public SendaiLockD2(Socket socket) {
        this.socket = socket;
    }

    public void run() {
        try {
            PrintWriter out = new PrintWriter(socket.getOutputStream(), true);
            BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            String[] tokens = in.readLine().split(" ");
            String key = tokens[1].intern();
            System.out.println("lock start " + key);
            synchronized(key) {
                out.println("ok");
                in.readLine();
                System.out.println("unlock " + key);
            }
        } catch(Exception e) {
        }
        try {
            socket.close();
        }catch(Exception e) {
        }
    }
}