봉 블로그

zookeeper client example 본문

java

zookeeper client example

idkbj 2013. 5. 29. 20:40

초간단 zookeeper client example.

 일반적으로 Zookeeper Client Applications 는 2가지 부분으로 나뉜다. 하나는 zoookeeper server 와의 connection 을 유지하는 부분과 다른 하나는 data를 모니터링하는 부분이다.

아래 example code는 zookeeper server 에 연결된 client의 생성(연결) 및 제거 이벤트를 모든 클라이언트가 모니터링(Watch)하는 기능을 가지고 있다.

클라이언트는 Zookeeper Server 에 접속과 동시에 Watcher를 등록한후 자신의 IP를 등록한다. (다른 client 가 알수 있도록) 그리고 나서 다른 클라이언트의  접속상태를 모니터링 한다. Client IP와 함께 Public 권한(List) 을 함께 등록한다.  CreateMode.EPHEMERAL_SEQUENTIAL 는 client 연결이 유지되는동안만 node에 sequential number를 붙여서 생성한다.
연결이 끊기면 해당 node 도 삭제된다.

public class ZookeeperClient implements Watcher, Runnable, ChildrenCallback {

        private ZooKeeper zk;
        private boolean dead;
        private String znode;


        public ZookeeperClient(String hostPort, String znode, String localIp) throws KeeperException, IOException, InterruptedException {

                zk = new ZooKeeper(hostPort, 3000, this); //server 연결 및 watcher(this) 등록.

this.znode = znode; zk.create(znode+"/client", localIp.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); /* * 다른 client 의 연결상태를 모니터링 하기 위해 Watch event 를 설정한다. * - 비동기함수로 콜백함수인 processResult()가 실행됨. */ zk.getChildren(znode, true, this, null); }

 

client ip 가 생성(연결)되거나 제거되면 아래 event 함수가 호출된다.

       /**
         * 서버로부터의 event 발생시 실행된다.
         * - 새로운 client 가 추가되거나 삭제되면 NodeChildrenChanged 이벤트가 발생된다.
         * - 이벤트 수신후 새로운 Watch event 를 설정한다. (참고로 event는 한번만 수신 받기때문에 재등록한다.)
         */
        @Override
        public void process(WatchedEvent event) {
                String path = event.getPath();

                String eventType = event.getType().name();

                String eventState = event.getState().name();

                System.out.println("## process: path : "+path+", eventType : "+ eventType + ", eventState: "+ eventState);

                if (event.getType() == Event.EventType.NodeChildrenChanged) {
                        zk.getChildren(znode, true, this, null);
                }

        }

 

 

Zookeeper Server와의 연결을 유지시킨다.

        @Override
        public void run() {
                try {
                    synchronized (this) {
                        while (!dead) {
                            wait();
                        }
                   }
                } catch (InterruptedException e) {
                }
        }

 

 

Client 실행 main 함수

실행예 : ZookeeperClient 10.10.10.1:2121 /zk_test 10.10.10.2

public static void main(String[] args) {
                String hostPort = args[0];//zookeeper server IP:Port
                String znode = args[1];
                String localIp = args[2]; //client ip

                try{
                        new ZookeeperClient(hostPort, znode, localIp).run();
                }catch(Exception e){
                        e.printStackTrace();
                }
        }

 

zk.getChildren() 함수의 콜백함수 

서버에 등록된 클라이언트 정보를 시스템 로그로 출력한다.

        @Override
        public void processResult(int rc, String path, Object ctx, List children) {
                // TODO Auto-generated method stub
                for(String child : children){

                        byte[] data = null;
                        try {
                            data = zk.getData(znode + "/" + child, false, null); // client ip 정보 가져오기.
                        } catch (KeeperException e) {
                               // We don't need to worry about recovering now. The watch
                               // callbacks will kick off any exception handling
                               e.printStackTrace();
                        } catch (InterruptedException e) {
                               return;
                        }
                        System.out.println("## processResult: rc="+ rc + ", path="+path+", child= "+ child + ", data = "+ new String(data));
                }
        }

이상. 정말 초간단 샘플코드 였습니당.