微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

[06] Dubbo_2

dubbo 管理控制台

我们在开发时,需要知道 Zookeeper 注册中心都注册了哪些服务,有哪些消费者来消费这些服务。我们可以通过部署一个管理中心来实现。其实管理中心就是一个 web 应用,部署到 tomcat 即可。

安装

  1. dubbo-admin-2.6.0.war 文件复制到 tomcat 的 webapps 目录下
  2. 启动 tomcat,此 war 文件自动解压
  3. 修改 WEB-INF 下的 dubbo.properties 文件,注意 dubbo.registry.address 对应的值需要对应当前使用的 Zookeeper 的 IP 地址和端口号
    dubbo.registry.address=zookeeper://192.168.206.129:2181
    dubbo.admin.root.password=root
    dubbo.admin.guest.password=guest
    
  4. 重启 tomcat

如果直接启动可能会遇到的问题:

使用

dubbo 相关配置说明

包扫描

服务提供者和服务消费者都需要配置,表示包扫描,作用是扫描指定包(包括子包)下的类,发布 Duubo 服务。

<dubbo:annotation package="cn.edu.nuist.service" />

如果不使用包扫描,也可以通过如下配置的方式来发布服务:

<bean id="helloService" class="cn.edu.nuist.service.impl.HelloServiceImpl" />
<dubbo:service interface="cn.edu.nuist.api.HelloService" ref="helloService" />

作为服务消费者,不使用包扫描则可以通过如下配置来引用服务:

<!-- 生成远程服务代理,可以和本地 bean 一样使用 helloService -->
<dubbo:reference id="helloService" interface="cn.edu.nuist.api.HelloService" />

上面这种方式发布和引用服务,一个配置项(<dubbo:service>、<dubbo:reference>) 只能发布或者引用一个服务,如果有多个服务,这种方式就比较繁琐了。推荐使用包扫描方式。

协议

<dubbo:protocol name="dubbo" port="20880"/>

一般在服务提供者一方配置,可以指定使用的协议名称和端口号。

其中 dubbo 支持的协议有:dubbo、rmi、hessian、http、webservice、rest、redis 等。推荐使用的是 dubbo 协议。

dubbo 协议采用单一长连接和 NIO 异步通讯,适合于小数据量大并发的服务调用,以及服务消费者机器数远大于服务提供者机器数的情况。不适合传送大数据量的服务,比如传文件,传视频等,除非请求量很低。

也可以在同一个工程中配置多个协议,不同服务可以使用不同的协议,例如:

<!-- 多协议配置 -->
<dubbo:protocol name="dubbo" port="20880" />
<dubbo:protocol name="rmi" port="1099" />
<!-- 使用 dubbo 协议暴露服务 -->
<dubbo:service interface="cn.edu.nuist.api.HelloService" ref="helloService" protocol="dubbo" />
<!-- 使用 rmi 协议暴露服务 -->
<dubbo:service interface="cn.edu.nuist.api.DemoService" ref="demoService" protocol="rmi" />

若不使用配置文件方式,可以使用 @Service 的 protocol 属性指定使用的协议。

启动时检查

<dubbo:consumer check="false"/>

上面这个配置需要配置在【服务消费者】一方,如果不配置认 check 值为 true。dubbo 缺省会在启动时检查依赖的服务是否可用,不可用时会抛出异常,阻止 Spring 初始化完成,以便上线时能及早发现问题。

10:33:51,417 ERROR Logger:225 -  [dubBO] Failed to init remote service reference at filed
helloService in class cn.edu.nuist.controller.HelloController, cause: Failed to check the
status of the service cn.edu.nuist.service.HelloService. No provider available for the
service cn.edu.nuist.service.HelloService from the url zookeeper://192.168.206.129:2181/
com.alibaba.dubbo.registry.RegistryService?application=dubbo_consumer
&dubbo=2.6.0&interface=cn.edu.nuist.service.HelloService&methods=sayHello ...

java.lang.IllegalStateException: Failed to check the status of the service
cn.edu.nuist.service.HelloService. No provider available for the service
cn.edu.nuist.service.HelloService from the url zookeeper://192.168.206.129:2181
/com.alibaba.dubbo.registry.RegistryService?application=dubbo_consumer ...

可以通过将 check 值改为 false 来关闭检查,在此之后就不需要严格按照先起 provider 后起 consumer 的顺序了,只要保证最终都起来就行。

建议在开发阶段将 check 值设置为 false,在生产环境下改为 true。

负载均衡

负载均衡(Load Balance):其实就是将请求分摊到多个操作单元上进行执行,从而共同完成工作任务。

集群负载均衡时,dubbo 提供了多种均衡策略(包括随机、轮询、最少活跃调用数、一致性 Hash),缺省为 random 随机调用

配置负载均衡策略,既可以在服务提供者一方配置,也可以在服务消费者一方配置:

@Controller
@RequestMapping("/demo")
public class HelloController {
    // 在服务消费者一方配置负载均衡策略
    @Reference(check = false, loadbalance = "random")
    private HelloService helloService;

    @RequestMapping("/hello")
    @ResponseBody
    public String getName(String name){
        // 远程调用
        String result = helloService.sayHello(name);
        System.out.println(result);
        return result;
    }
}

··························································

// 在服务提供者一方配置负载均衡
@Service(loadbalance = "random")
public class HelloServiceImpl implements HelloService {
    public String sayHello(String name) {
        return "hello!" + name;
    }
}

可以通过启动多个服务提供者来观察 dubbo 负载均衡效果。但要注意!因为我们是在一台机器上启动多个服务提供者,所以需要修改 tomcat 的端口号和 dubbo 服务的端口号 <dubbo:protocol port=""> 来防止端口冲突。在实际生产环境中,多个服务提供者是分别部署在不同的机器上,所以不存在端口冲突问题。

事务代理的 Service

解决 dubbo 无法发布被事务代理的 Service 问题

问题说明

前面我们已经完成了 dubbo 的入门案例,通过入门案例我们可以看到通过 dubbo 提供的标签配置就可以进行包扫描,扫描到 @Service 注解的类就可以被发布为服务。

<dubbo:annotation package="cn.edu.nuist.service.impl"/>

但是我们如果在服务提供者类上加入 @Transactional 事务控制注解后,服务就发布不成功了。原因是事务控制的底层原理是为服务提供者类创建代理对象,而认情况下Spring是基于 JDK 动态代理方式创建代理对象,而此代理对象的完整类名为 com.sun.proxy.$Proxy42(最后两位数字不是固定的),导致 dubbo 在发布服务前进行包匹配时无法完成匹配,进而没有进行服务的发布。

问题展示

上面的错误为没有可用的服务提供者,此时查看 dubbo 管理控制台发现服务并没有发布,如下:

可以通过断点调试的方式查看 dubbo 执行过程,dubbo 通过 AnnotationBean 的 postProcessAfterInitialization 方法进行处理:

解决方

通过上面的断点调试可以看到,在 HelloServiceImpl 类上加入事务注解后,Spring 会为此类基于 JDK 动态代理技术创建代理对象,创建的代理对象完整类名为 com.sun.proxy.$Proxy35,导致 dubbo 在进行包匹配时没有成功(因为我们在发布服务时扫描的包为 com.itheima.service),所以后面真正发布服务的代码没有执行。

解决方式操作步骤:

1. 修改 applicationContext-service.xml 配置文件,开启事务控制注解支持时指定 proxy-target-class 属性,值为 true。其作用是使用 cglib 代理方式为 Service 类创建代理对象。

<!--开启事务控制的注解支持-->
<tx:annotation-driven transaction-manager="transactionManager" proxy-target-class="true"/>

虽然服务发布成功了,但是查看详情:

2. 修改 HelloServiceImpl 类,在 Service 注解中加入 interfaceClass 属性,值为 HelloService.class,作用是指定服务的接口类型。

@Service(interfaceClass = HelloService.class)
@Transactional
public class HelloServiceImpl implements HelloService {
    public String sayHello(String name) {
        return "hello " + name;
    }
}

Re: ZooKeeper

上边 dubbo 讲完了,下面来详细讲下 ZK。

概述

简介

为啥叫 ZooKeeper?它是拿来管大象(Hadoop)、蜜蜂(Hive)、小猪(Pig) 的管理员

Apache Hbase 和 Apache Solr 以及阿里的 dubbo 等项目中都采用到了 Zookeeper。

ZooKeeper 是一个分布式协调技术、高性能的,开源的分布式系统的协调(Coordination) 服务,是 Google 的 Chubby 一个开源的实现,是 Hadoop 和 Hbase 的重要组件。它是一个为分布式应用程序一致性和分布式协调技术服务的软件。

从设计模式来理解

一个基于观察者模式设计的分布式服务管理框架,它负责存储和管理大家都关心的数据,然后接受观察者的注册,一旦这些数据的状态发生变化,Zookeeper 就将负责通知已经在 Zookeeper 上注册的那些观察者做出相应的反应,从而实现集群中类似 Master/Slave 管理模式。

小结

  • Zookeeper = 类似 Unix 文件系统 + 通知机制 + Znode 节点
  • 作用:服务注册 + 分布式系统的一致性通知协调

功能概述

统一命名服务

Name Service,如 dubbo 服务注册中心

服务提供者在启动的时候,向 ZK 上的指定节点 /dubbo/${serviceName}/providers 目录下写入自己的 URL 地址,这个操作就完成了服务的发布。

服务消费者启动的时候,订阅 /dubbo/${serviceName}/providers 目录下的提供者 URL 地址, 并向 /dubbo/${serviceName}/consumers 目录下写入自己的 URL 地址。

注意,所有向 ZK 上注册的地址都是临时节点,这样就能够保证服务提供者和消费者能够自动感应资源的变化。 另外,dubbo 还有针对服务粒度的监控,方法订阅 /dubbo/${serviceName} 目录下所有提供者和消费者的信息。

配置管理

Configuration Management,如:淘宝开源配置管理框架 Diamond

大型的分布式系统中,为了服务海量的请求,同一个应用常常需要多个实例。如果存在配置更新的需求,常常需要逐台更新,给运维增加了很大的负担同时带来一定的风险(配置会存在不一致的窗口期,或者个别节点忘记更新)。Zookeeper 可以用来做集中的配置管理,存储在 Zookeeper 集群中的配置,如果发生变更会主动推送到连接配置中心的应用节点,实现一处更新处处更新的效果

现在把这些配置全部放到 Zookeeper 上去,保存在 Zookeeper 的某个目录节点中,然后所有相关应用程序对这个目录节点进行监听,一旦配置信息发生变化,每个应用程序就会收到 Zookeeper 的通知,然后从 Zookeeper 获取新的配置信息应用到系统中就好。

Java 操作 API

pom.xml

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<dependencies>
    <!-- https://mvnrepository.com/artifact/com.101tec/zkclient -->
    <dependency>
        <groupId>com.101tec</groupId>
        <artifactId>zkclient</artifactId>
        <version>0.10</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->
    <dependency>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version>3.4.9</version>
    </dependency>
    <dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>1.2.17</version>
    </dependency>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.9</version>
        <scope>test</scope>
    </dependency>
</dependencies>

HelloZK.java

package cn.edu.nuist.zk3;

import java.io.IOException;

import org.apache.log4j.Logger;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

public class HelloZK {

    private static final Logger logger = Logger.getLogger(HelloZK.class);

    private static final String CONNECTSTRING = "192.168.206.129:2181";
    private static final String PATH = "/testZK";
    private static final int SESSION_TIMEOUT = 50*1000;

    // 1. 创建链接
    public ZooKeeper startZK() throws IOException {
        return new ZooKeeper(CONNECTSTRING, SESSION_TIMEOUT, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
            }
        });
    }
    
    // 4. 关闭链接
    public void stopZK(ZooKeeper zk) throws InterruptedException {
       if (zk != null) zk.close();
    }

    // 2. 新建 ZNode 节点
    public void createZNode(ZooKeeper zk, String path,
           String nodeValue) throws KeeperException, InterruptedException {
       zk.create(path, nodeValue.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    }

    // 3. 查询 ZNode 节点的最新值
    public String getZNode(ZooKeeper zk, String path)
            throws KeeperException, InterruptedException {
        byte[] byteArray = zk.getData(path, false, new Stat());
        return new String(byteArray);
    }

    public static void main(String[] args) throws
            IOException, KeeperException, InterruptedException {
        HelloZK hello = new HelloZK();
        ZooKeeper zk = hello.startZK();
        Stat stat = zk.exists(PATH, false);

        if (stat == null) {
            hello.createZNode(zk, PATH, "zk1101");
            String result = hello.getZNode(zk, PATH);
            System.out.println("result: " + result);
        } else {
            System.out.println("***********ZNode has already ok***********");
        }
        hello.stopZK(zk);
    }
}

配置、客户端连接

zoo.cfg 解读

1. tickTime:通信心跳数,Zookeeper 服务器心跳时间,单位毫秒。

Zookeeper 使用的基本时间,服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个 tickTime 时间就会发送一个心跳,时间单位为毫秒。

它用于心跳机制,并且设置最小的 session 超时时间为两倍心跳时间(session 的最小超时时间是 2*tickTime)。

2. initLimit:这个配置项是用来配置 Zookeeper 接收 Follower 客户端初始化连接是最长能忍受多少个心跳的时间间隔数。

这里所说的客户端不是用户链接 Zookeeper 服务器的客户端,而是 Zookeeper 服务器集群中连接到 leader 的 Follower 服务器,Follower 在启动过程中,会从 leader 同步所有最新数据,然后确定自己能够对外服务的起始状态。leader 允许 Follower 在 initLimit 时间内完成这个工作。

当已经超过 10 个心跳的时间(也就是 tickTime)长度后 Zookeeper 服务器还没有收到客户端返回的信息,那么表明这个客户端连接失败。总的时间长度就是 10*2000=20 秒。

3. synclimit:集群中 leader 与 Follower 之间的最大响应时间单位/同步通信时限。

在运行过程中,leader 负责与 ZK 集群中所有机器进行通信,例如通过一些心跳检测机制,来检测机器的存活状态。

简单来说就是,请求和应答时间长度,最长不能超过多少个 tickTime 的时间长度。假如响应超过 synclimit * tickTime (假设 synclimit=5,总的时间长度就是 5*2000=10 秒),leader 认为 Follwer 死掉,从服务器列表中删除 Follwer。

在运行过程中,leader 负责与 ZK 集群中所有机器进行通信,例如通过一些心跳检测机制,来检测机器的存活状态。如果 L 发出心跳包在 synclimit 之后,还没有从 F 那收到响应,那么就认为这个 F 已经不在线了。

4. dataDir:数据文件目录 + 数据持久化路径

保存内存数据库快照信息的位置,如果没有其他说明,更新的事务日志也保存到数据库

5. clientPort:客户端连接端口

监听客户端连接的端口。

客户端连接

先进入 bin 目录,启动服务端:./zkServer.sh start,再通过 ./zkCli.sh 获取客户端连接。

ZNode 节点

ZNode 数据模型

由上述操作可以看出,ZooKeeper 数据模型的结构与 Unix 文件系统的目录树结构很类似,整体上可以看作是一个树形结构,每个节点称作一个 ZNode,每一个 ZNode 认能够存储 1MB 的数据,每个 ZNode 都可以通过其路径唯一标识。

ZNode-stat

Zookeeper 内部维护了一套类似 UNIX 的树形数据结构:由 ZNode 构成的集合,ZNode 的集合又是一个树形结构,每一个 ZNode 又有很多属性进行描述:Znode = path(key) + data(value) + stat

ZNode 维护了一个 stat 结构,这个 stat 包含数据变化的版本号、访问控制列表变化、还有时间戳。版本号和时间戳一起,可让 Zookeeper 验证缓存和协调更新。每次 ZNode 的数据发生了变化,版本号就增加

例如,无论何时客户端检索数据,它也一起检索数据的版本号。并且当客户端执行更新或删除时,客户端必须提供他正在改变的 ZNode 的版本号。如果它提供的版本号和真实的数据版本号不一致,更新将会失败。

ZNode 存在类型

ZNode 是由客户端创建的,它和创建它的客户端的内在联系,决定了它的存在性:

  1. PERSISTENT-持久化目录节点:创建这个节点的客户端在与 Zookeeper 服务的连接断开后,该节点依旧存在(除非使用 API 强制删除)。
  2. PERSISTENT_SEQUENTIAL-持久化顺序编号目录节点:当客户端请求创建这个节点 A 后,Zookeeper 会根据 parent-znode 的 zxid 状态,为这个 A 节点编写一个全目录唯一的编号(这个编号只会一直增长)。当客户端与 Zookeeper 服务的连接断开后,该节点依旧存在。
  3. EPHEMERAL-临时目录节点:客户端与 Zookeeper 断开连接后,这个节点(还有涉及到的子节点)就会被删除
  4. EPHEMERAL_SEQUENTIAL-临时顺序编号目录节点:当客户端请求创建这个节点 A 后,Zookeeper会根据 parent-znode 的 zxid 状态,为这个 A 节点编写一个全目录唯一的编号(这个编号只会一直增长)。当创建这个节点的客户端与 Zookeeper 服务的连接断开后,这个节点被删除

ZK 常用命令

和 Redis 的 KV 键值对类似,只不过 key 变成了一个路径节点值,value 就是 data。

Zookeeper 表现为一个分层的文件系统目录树结构。不同于文件系统之处在于:ZK 节点可以有自己的数据,而 Unix 文件系统中的目录节点只有子节点。

一个节点对应一个应用/服务,节点存储的数据就是应用需要的配置信息。


Zookeeper 支持某些特定的四字命令,他们大多是用来查询 ZK 服务的当前状态及相关信息的,通过 telnet 或 nc 向 Zookeeper 提交相应命令,运行公式:echo 四字命令 | nc 主机IP zookeeper端口,如:echo ruok | nc 127.0.0.1 2181

ZK 通知机制

通知机制】 客户端注册监听它关心的目录节点,当目录节点发生变化(数据改变、被删除、子目录节点增加删除)时,Zookeeper 会通知客户端。

watch

异步回调的触发机制

ZooKeeper 支持 watch(观察) 的概念。客户端可以在每个 ZNode 结点上设置一个观察。如果被观察服务端的 ZNode 结点有变更,那么 watch 就会被触发,这个 watch 所属的客户端将接收到一个通知包被告知结点已经发生变化,把相应的事件通知给设置过 Watcher 的 Client 端。

Zookeeper 里的所有读取操作:getData()/getChildren()/exists() 都有设置 watch 的选项。

1. 一次触发

当数据有变化时 ZKserver 向客户端发送一个 watch,它是一次性的动作,即触发一次就不再有效,类似一次性纸杯。

只监控一次。如果想继续 Watch 的话,需要客户端重新设置 Watcher。因此,如果你得到一个 watch 事件且想在将来的变化得到通知,必须新设置另一个 watch


2. 发往客户端

Watches 是异步发往客户端的,Zookeeper 提供一个顺序保证:在看到 watch 事件之前绝不会看到变化,这样不同客户端看到的是一致性的顺序。

在(导致 watch 事件被触发的)修改操作的成功返回码到达客户端之前,事件可能在去往客户端的路上,但是可能不会到达客户端。watch 事件是异步地发送给观察者(客户端)的。ZooKeeper 会保证次序:在收到 watch 事件之前,客户端不会看到已经为之设置观察的节点的改动。网络延迟或者其他因素可能会让不同的客户端在不同的时间收到 watch 事件和更新操作的返回码。这里的要点是:不同客户端看到的事情都有一致的次序。


3. 为数据设置 watch

节点有不同的改动方式。可以认为 ZooKeeper 维护 2 个观察列表:数据观察和子节点观察。其中 getData() 和 exists() 设置数据观,getChildren() 设置子节点观察。此外,还可以认为不同的返回数据有不同的观察。getData() 和 exists() 返回节点的数据,而 getChildren() 返回子节点列表。所以,setData() 将为 ZNode 触发数据观察。成功的 create() 将为新创建的节点触发数据观察,为其父节点触发子节点观察。成功的 delete() 将会为被删除的节点触发数据观察以及子节点观察(因为节点不能再有子节点了),为其父节点触发子节点观察。

观察维护在客户端连接到的 ZooKeeper 服务器中。这让观察的设置、维护和分发是轻量级的。客户端连接到新的服务器时,所有 session 事件将被触发。同服务器断开连接期间不会收到观察。客户端重新连接时,如果需要,先前已经注册的观察将被重新注册和触发。通常这都是透明的,有一种情况下 watch 事件将丢失:对还没有创建的节点设置存在观察,而在断开连接期间创建节点,然后删除


4. 时序性和一致性

watches 是在 Client 连接到 Zookeeper 服务端的本地维护,这可让 watches 成为轻量的,可维护的和派发的。当一个 Client 连接到新 Server,watch 将会触发任何 session 事件,断开连接后不能接收到。当客户端重连,先前注册的 watches 将会被重新注册并触发。


关于 watches,Zookeeper 维护这些保证:

(1) watches 和其他事件、watches 和异步恢复都是有序的。Zookeeper 客户端保证每件事都是有序派发。
(2) 客户端在看到新数据之前先看到 watch 事件。
(3) 对应更新顺序的 watches 事件顺序由 Zookeeper 服务所见。

code

WatchOne

import java.io.IOException;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;

public class WatchOne {
    // 定义常量
    private static final String CONNECTSTRING = "192.168.206.129:2181";
    private static final String PATH = "/nuist";
    private static final int SESSION_TIMEOUT = 50 * 1000;
    // 定义实例变量
    private ZooKeeper zk = null;

    // 以下为业务方法
    public ZooKeeper startZK() throws IOException {
        return new ZooKeeper(CONNECTSTRING, SESSION_TIMEOUT, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                try {
                    triggerValue(PATH);
                } catch (KeeperException e) {
                    e.printstacktrace();
                } catch (InterruptedException e) {
                    e.printstacktrace();
                }
            }
        });
    }
    
    public String triggerValue(String path) throws KeeperException, InterruptedException {
        byte[] byteArray = zk.getData(path, false, new Stat());
        String retValue = new String(byteArray);
        System.out.println("triggerValue: " + retValue);
        return retValue;
    }

    public void stopZK() throws InterruptedException {
        if (zk != null) {
            zk.close();
        }
    }

    public void createZNode(String path, String nodeValue) throws KeeperException, InterruptedException {
        zk.create(path, nodeValue.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    }

    public String getZNode(String path) throws KeeperException, InterruptedException {
        byte[] byteArray = zk.getData(path, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                try {
                    triggerValue(path);
                } catch (KeeperException | InterruptedException e) {
                    e.printstacktrace();
                }
            }
        }, new Stat());
        return new String(byteArray);
    }

    public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
        WatchOne watchOne = new WatchOne();

        watchOne.setZk(watchOne.startZK());

        if (watchOne.getZk().exists(PATH, false) == null) {
            watchOne.createZNode(PATH, "TREE1101");
            System.out.println("----->: " + watchOne.getZNode(PATH));
            // 不能关闭连接:异步回调还没来,main线程不能停
            Thread.sleep(Long.MAX_VALUE);
        } else {
            System.out.println("I have znode");
        }
    }

    // setter---getter
    public ZooKeeper getZk() {
        return zk;
    }

    public void setZk(ZooKeeper zk) {
        this.zk = zk;
    }

}

WatchMore

import java.io.IOException;

import org.apache.log4j.Logger;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;

public class WatchMore {
    private static final Logger logger = Logger.getLogger(WatchMore.class);
    // 定义常量
    private static final String CONNECTSTRING = "192.168.206.129:2181";
    private static final String PATH = "/watchmore";
    private static final int SESSION_TIMEOUT = 50 * 1000;
    // 定义实例变量
    private ZooKeeper zk = null;
    private String lastValue = "";

    // 以下为业务方法
    public ZooKeeper startZK() throws IOException {
        return new ZooKeeper(CONNECTSTRING, SESSION_TIMEOUT, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
            }
        });
    }

    public void stopZK() throws InterruptedException {
        if (zk != null) {
            zk.close();
        }
    }

    public void createZNode(String path, String nodeValue) throws KeeperException, InterruptedException {
        zk.create(path, nodeValue.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    }

    public String getZNode(String path) throws KeeperException, InterruptedException {
        byte[] byteArray = zk.getData(path, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                try {
                    triggerValue(path);
                } catch (KeeperException | InterruptedException e) {
                    e.printstacktrace();
                }
            }
        }, new Stat());

        return new String(byteArray);
    }

    public boolean triggerValue(String path) throws KeeperException, InterruptedException {
        byte[] byteArray = zk.getData(path, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                try {
                    triggerValue(path); // 这是递归吗?
                } catch (KeeperException | InterruptedException e) {
                    e.printstacktrace();
                }
            }
        }, new Stat());

        String newValue = new String(byteArray);

        if (lastValue.equals(newValue)) {
            System.out.println("There is no change!");
            return false;
        } else {
            System.out.println("lastValue: " + lastValue + "\t" + "newValue: " + newValue);
            this.lastValue = newValue;
            return true;
        }
    }

    public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
        WatchMore watch = new WatchMore();

        watch.setZk(watch.startZK());

        if (watch.getZk().exists(PATH, false) == null) {
            String initValue = "1101";
            watch.setLastValue(initValue);
            watch.createZNode(PATH, initValue);
            System.out.println("initValue: " + watch.getZNode(PATH));
            Thread.sleep(Long.MAX_VALUE);
        } else {
            System.out.println("I have znode");
        }
    }

    // setter---getter
    public ZooKeeper getZk() {
        return zk;
    }

    public void setZk(ZooKeeper zk) {
        this.zk = zk;
    }

    public String getLastValue() {
        return lastValue;
    }

    public void setLastValue(String lastValue) {
        this.lastValue = lastValue;
    }

}

ZK 集群

  • initLimit 是 Zookeeper 用它来限定集群中的 Zookeeper 服务器连接到 leader 的时限。
  • synclimit 限制了 Follower 服务器与 leader 服务器之间请求和应答之间的时限。

服务器名称与地址:集群信息(服务器编号,服务器地址,LF通信端口,选举端口)

这个配置项的书写格式比较特殊:server.N=YYY:A:B。其中,N 表示服务器编号,YYY 表示服务器的 IP 地址,A 为 LF 通信端口,表示该服务器与集群中的 leader 交换的信息的端口。B 为选举端口,表示选举新 leader 时服务器间相互通信的端口(当 leader 挂掉时,其余服务器会相互通信,选择出新的 leader)。

一般来说,集群中每个服务器的 A 端口都是一样,每个服务器的 B 端口也是一样。下面是一个集群的例子:

server.0=233.34.9.144:2008:6008
server.1=233.34.9.145:2008:6008
server.2=233.34.9.146:2008:6008
server.3=233.34.9.147:2008:6008

伪分布式单机配置步骤:

leader、Follower 都能读,都能写。因为 Zookeeper 主要作用不是扩容,而是抗风险,高可用

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐