本文介绍了在ZooKeeper中使用协议Zab广播的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

早上好,

我是ZooKeeper及其协议的新手,我对它的广播协议Zab感兴趣。

你能否给我提供一个简单的使用Zookeeper的Zab协议的java代码?我一直在寻找,但我没有成功找到一个代码,显示我如何使用Zab。实际上,我需要的很简单,我有一个MapReduce代码,并且我希望所有映射器在更新变量(比如说X)时都会更新一个变量,以便找到更好的X的值(即更大的值)。在这种情况下,领导者必须比较旧值和新值,然后将实际最佳值广播给所有映射者。我怎样才能在Java中做这样的事情?

在此先感谢您,
问候

解决方案

您不需要使用Zab协议。相反,您可以按照以下步骤操作:

您在Zookeeper上有一个Znode说/ bigvalue。所有映射器何时开始读取存储在其中的值。他们还在Znode上设置了数据更改的手表。只要映射器获得更好的值,它就会以更好的值更新Znode。所有的映射器都会收到数据更改事件的通知,并且他们会读取新的最佳值,并且他们会重新建立手表以重新进行数据更改。这样他们就可以与最新的最佳价值同步,并可以在有更好的价值时更新最新的最佳价值。

其实zkclient是一个非常好的图书馆Zookeeper和它隐藏了很多复杂性()。下面是一个示例,演示如何为任何数据更改观看Znode/ bigvalue。

  package geet.org; 

import java.io.UnsupportedEncodingException;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkMarshallingError;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;
导入org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.zookeeper.data.Stat;

public class ZkExample实现IZkDataListener,ZkSerializer {
public static void main(String [] args){
String znode =/ bigvalue;
ZkExample ins = new ZkExample();
ZkClient cl = new ZkClient(127.0.0.1,30000,30000,
ins);
尝试{
cl.createPersistent(znode);
} catch(ZkNodeExistsException e){
System.out.println(e.getMessage());
}
//改变数据以获得乐趣
Stat stat = new Stat();
String data = cl.readData(znode,stat);
System.out.println(当前数据+ data +version =+ stat.getVersion());
cl.writeData(znode,My new data,stat.getVersion());

cl.subscribeDataChanges(znode,ins);
尝试{
Thread.sleep(36000);
} catch(InterruptedException e){
e.printStackTrace();


$ b @Override
public void handleDataChange(String dataPath,Object data)throws Exception {
System.out.println(Detected data更改);
System.out.println(+ dataPath ++(String)data的新数据);

$ b @Override
public void handleDataDeleted(String dataPath)throws Exception {
System.out.println(Data deleted+ dataPath);

$ b @Override
public byte [] serialize(Object data)throws ZkMarshallingError {
if(data instanceof String){
try {
return((String)data).getBytes(UTF-8);
} catch(UnsupportedEncodingException e){
e.printStackTrace();
}
}
返回null;

$ b @Override
public Object deserialize(byte [] bytes)throws ZkMarshallingError {
try {
return new String(bytes,UTF- 8\" );
} catch(UnsupportedEncodingException e){
e.printStackTrace();
}
返回null;
}
}


Good morning,

I am new to ZooKeeper and its protocols and I am interested in its broadcast protocol Zab.

Could you provide me with a simple java code that uses the Zab protocol of Zookeeper? I have been searching about that but I did not succeed to find a code that shows how can I use Zab.

In fact what I need is simple, I have a MapReduce code and I want all the mappers to update a variable (let's say X) whenever they succeed to find a better value of X (i.e. a bigger value). In this case, the leader has to compare the old value and the new value and then to broadcast the actual best value to all mappers. How can I do such a thing in Java?

Thanks in advance,Regards

解决方案

You don't need to use the Zab protocol. Instead you may follow the below steps:

You have a Znode say /bigvalue on Zookeeper. All the mappers when starts reads the value stored in it. They also put an watch for data change on the Znode. Whenever a mapper gets a better value, it updates the Znode with the better value. All the mappers will get notification for the data change event and they read the new best value and they re-establish the watch for data changes again. That way they are in sync with the latest best value and may update the latest best value whenever there is a better value.

Actually zkclient is a very good library to work with Zookeeper and it hides a lot of complexities ( https://github.com/sgroschupf/zkclient ). Below is an example that demonstrates how you may watch a Znode "/bigvalue" for any data change.

package geet.org;

import java.io.UnsupportedEncodingException;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkMarshallingError;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.zookeeper.data.Stat;

public class ZkExample implements IZkDataListener, ZkSerializer {
    public static void main(String[] args) {
        String znode = "/bigvalue";
        ZkExample ins = new ZkExample();
        ZkClient cl = new ZkClient("127.0.0.1", 30000, 30000,
                ins);
        try {
            cl.createPersistent(znode);
        } catch (ZkNodeExistsException e) {
            System.out.println(e.getMessage());
        }
        // Change the data for fun
        Stat stat = new Stat();
        String data =  cl.readData(znode, stat);
        System.out.println("Current data " + data + "version = " + stat.getVersion());
        cl.writeData(znode, "My new data ", stat.getVersion());

        cl.subscribeDataChanges(znode, ins);
        try {
            Thread.sleep(36000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void handleDataChange(String dataPath, Object data) throws Exception {
        System.out.println("Detected data change");
        System.out.println("New data for " + dataPath + " " + (String)data);
    }

    @Override
    public void handleDataDeleted(String dataPath) throws Exception {
        System.out.println("Data deleted " + dataPath);
    }

    @Override
    public byte[] serialize(Object data) throws ZkMarshallingError {
        if (data instanceof String){
            try {
                return ((String) data).getBytes("UTF-8");
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }
        return null;
    }

    @Override
    public Object deserialize(byte[] bytes) throws ZkMarshallingError {
        try {
            return new String(bytes, "UTF-8");
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
        return null;
    }
}

这篇关于在ZooKeeper中使用协议Zab广播的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-11 08:09