synchronized锁只能解决单个进程下多线程问题,而针对多个进程(比如多个用户参与秒杀)的情况,是无法解决的,所以需要依赖第三方独立中间件来实现分布式锁。zookeeper是负责分布式协调,所以可以实现分布式锁。
依赖zookeeper特性实现分布式锁:
- 同目录下节点唯一:可以依赖zookeeper的特性之一:同一节点下只能创建一个名字相同的节点;基于zookeeper这一特性可以实现分布式锁,多个线程都去zookeeper同一节点下(比如/lock)创建名字相同的子节点(比如/lock/concurrent-lock),那么只可能有一个线程创建成功,创建成功则代表抢占到锁,然后执行业务逻辑,执行完成则删除节点/lock/concurrent-lock;创建失败的线程则抢占锁失败,对抢占到锁的节点建立监听,一旦当前抢占到锁的节点释放了锁,其他所有建立监听的客户端都会去再次抢占锁。但是会有惊群效应问题,一个客户端释放了锁,其他所有客户端都会再次去竞争,然而只有一个客户端可以竞争到锁,浪费性能。
- 临时有序节点:针对惊群效应问题,可以采用zookeeper另一个特性,在同一节点下创建临时有序节点,哪个客户端创建的节点序号最小,则代表抢占到锁;没有抢占到锁的线程,都去监听自己创建的节点的上一个节点,如果上一个节点释放了锁,那么当前节点就可以获取到锁去执行业务逻辑,这样根据节点序号顺序获取锁,就可以有效避免惊群效应。
注意:zk实现分布式锁,对并发量的要求不能太高(源于一致性算法的设计),如果并发量不太高,对数据一致性要求较高的场景可以使用此技术。
除此之外,redis基于setNX + 超时时间,通过LUA脚本保证原子性也可以实现分布式锁,并发量高的场景可以使用redis实现分布式锁,但是不能对数据一致性要求太高。
Curator实现分布式锁
// 基于临时有序节点实现分布式锁
InterProcessMutex lock = new InterProcessMutex(curatorFramework, "/lock");
try {
lock.acquire(); //抢占分布式锁资源,阻塞方法
// 需要分布式锁的业务代码
Order order = orderServie.getone(orderId);
order.setStock(order.getStock - 1);
orderService.update(order);
} finally {
lock.release();//释放锁
}
zookeeper实现分布式锁源码分析
zookeeper实现分布式锁,用到了容器节点和临时有序节点;创建一个容器节点作为目录,利用其特性,当容器节点下面没有子节点时会被标注并自动删除;在容器节点下创建临时有序节点,根据每个客户端进程创建的有序节点的序号来觉得抢占锁顺序,序号最小的进程优先获得锁。
public InterProcessMutex(CuratorFramework client, String path) {
this(client, path, new StandardLockInternalsDriver());
}
public InterProcessMutex(CuratorFramework client, String path, LockInternalsDriver driver) {
this(client, path, "lock-", 1, driver);
}
InterProcessMutex(CuratorFramework client, String path, String lockName, int maxLeases, LockInternalsDriver driver) {
this.threadData = Maps.newConcurrentMap();
this.basePath = PathUtils.validatePath(path); //lock目录
this.internals = new LockInternals(client, driver, path, lockName, maxLeases);
}
acquire抢占锁
public void acquire() throws Exception {
if (!this.internalLock(-1L, (TimeUnit)null)) {
throw new IOException("Lost connection while trying to acquire lock: " + this.basePath);
}
}
private boolean internalLock(long time, TimeUnit unit) throws Exception {
Thread currentThread = Thread.currentThread();
// 判断当前线程是否已经获得过锁(考虑重入锁情况,避免死锁)
InterProcessMutex.LockData lockData = (InterProcessMutex.LockData)this.threadData.get(currentThread);
if (lockData != null) {
lockData.lockCount.incrementAndGet();
return true;
} else {
/**
* 1、去zookeeper server 创建临时有序节点
* 2、判断创建的节点的序号是否是所有节点中最小的,如果是,则获取锁并返回
* 3、如果没有获取锁则等待
*/
String lockPath = this.internals.attemptLock(time, unit, this.getLockNodeBytes());
if (lockPath != null) {
InterProcessMutex.LockData newLockData = new InterProcessMutex.LockData(currentThread, lockPath);
this.threadData.put(currentThread, newLockData);
return true;
} else {
return false;
}
}
}
attemptLock试图获得锁
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception {
long startMillis = System.currentTimeMillis();
Long millisToWait = unit != null ? unit.toMillis(time) : null;
byte[] localLockNodeBytes = this.revocable.get() != null ? new byte[0] : lockNodeBytes;
int retryCount = 0;
String ourPath = null;
boolean hasTheLock = false;
boolean isDone = false;
while(!isDone) {
isDone = true;
try {
// 创建一个临时有序节点,返回节点名称
ourPath = this.driver.createsTheLock(this.client, this.path, localLockNodeBytes);
// 通过循环方式获取锁
hasTheLock = this.internalLockLoop(startMillis, millisToWait, ourPath);
} catch (KeeperException.NoNodeException var14) {
if (!this.client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper())) {
throw var14;
}
isDone = false;
}
}
return hasTheLock ? ourPath : null;
}
createsTheLock创建父级容器节点和临时有序节点
public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception {
String ourPath;
// 创建父级容器节点,然后创建临时有序节点
if (lockNodeBytes != null) {
ourPath = (String)((ACLBackgroundpathAndBytesable)client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL)).forPath(path, lockNodeBytes);
} else {
ourPath = (String)((ACLBackgroundpathAndBytesable)client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL)).forPath(path);
}
return ourPath;
}
internalLockLoop内部循环获得锁
private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception {
boolean haveTheLock = false;
boolean doDelete = false;
try {
if (this.revocable.get() != null) {
((Backgroundpathable)this.client.getData().usingWatcher(this.revocableWatcher)).forPath(ourPath);
}
// 如果连接是启动状态,并且没有获得锁,就不断循环去获取锁
while(this.client.getState() == CuratorFrameworkState.STARTED && !haveTheLock) {
// 对节点列表进行排序
List<String> children = this.getSortedChildren();
// 截取创建节点的序列号
String sequenceNodeName = ourPath.substring(this.basePath.length() + 1);
// 比较序列号是否是最小的
PredicateResults predicateResults = this.driver.getsTheLock(this.client, children, sequenceNodeName, this.maxLeases);
if (predicateResults.getsTheLock()) {
haveTheLock = true;
} else {
String prevIoUsSequencePath = this.basePath + "/" + predicateResults.getPathToWatch();
synchronized(this) {
try {
((Backgroundpathable)this.client.getData().usingWatcher(this.watcher)).forPath(prevIoUsSequencePath);
if (millisToWait == null) {
this.wait();
} else {
millisToWait = millisToWait - (System.currentTimeMillis() - startMillis);
startMillis = System.currentTimeMillis();
if (millisToWait > 0L) {
this.wait(millisToWait);
} else {
doDelete = true;
break;
}
}
} catch (KeeperException.NoNodeException var19) {
}
}
}
}
} catch (Exception var21) {
ThreadUtils.checkInterrupted(var21);
doDelete = true;
throw var21;
} finally {
if (doDelete) {
this.deleteOurPath(ourPath);
}
}
return haveTheLock;
}
getsTheLock比较当前节点序列号是否最小
public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception {
int ourIndex = children.indexOf(sequenceNodeName); //根据节点名称获取索引
validateOurIndex(sequenceNodeName, ourIndex); //判断索引的有效性
boolean getsTheLock = ourIndex < maxLeases; //判断节点索引是否最小,比如当前节点索引为0,则是最小,maxLeases表示最大可出租多少锁,在这里是1
String pathToWatch = getsTheLock ? null : (String)children.get(ourIndex - maxLeases);
return new PredicateResults(pathToWatch, getsTheLock);
}
release释放锁
public void release() throws Exception {
Thread currentThread = Thread.currentThread();
InterProcessMutex.LockData lockData = (InterProcessMutex.LockData)this.threadData.get(currentThread);
if (lockData == null) {
throw new IllegalMonitorStateException("You do not own the lock: " + this.basePath);
} else {
int newLockCount = lockData.lockCount.decrementAndGet(); //针对重入锁,减少重入次数
if (newLockCount <= 0) {
if (newLockCount < 0) {
throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + this.basePath);
} else { //重入锁次数变成0,释放锁
try {
this.internals.releaseLock(lockData.lockPath);
} finally {
this.threadData.remove(currentThread);
}
}
}
}
}
releaseLock
final void releaseLock(String lockPath) throws Exception {
this.client.removeWatchers();
this.revocable.set((Object)null);
this.deleteOurPath(lockPath);
}
// 删除节点
private void deleteOurPath(String ourPath) throws Exception {
try {
((ChildrenDeletable)this.client.delete().guaranteed()).forPath(ourPath);
} catch (KeeperException.NoNodeException var3) {}
}
zookeeper实现leader选举
也可以利用本文开头的zookeeper节点特性去实现leader选举,创建一个容器节点(如/leader),然后在容器节点下创建临时有序节点,谁的节点序列号最小,那么就是leader节点
zookeeper基于临时有序节点特性封装了API:
leaderLatch使用
maven依赖
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>shardingsphere-jdbc-core</artifactId>
<version>5.0.0-beta</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>5.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.2.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-quartz</artifactId>
<version>2.5.3</version>
</dependency>
springboot项目代码
@SpringBootApplication
public class ZkleaderApplication {
public static void main(String[] args) {
SpringApplication.run(ZkleaderApplication.class);
}
}
// 实现定时任务的触发器
public class ZkSchedulerfactorybean extends Schedulerfactorybean {
private leaderLatch leaderLatch;
private final String leader_PATH = "/leader";
private CuratorFramework getClient() {
CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()
.connectString("192.168.221.128:2181").connectionTimeoutMs(2000)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.sessionTimeoutMs(20000).build();
curatorFramework.start();//启动
return curatorFramework;
}
public ZkSchedulerfactorybean() throws Exception {
this.setAutoStartup(false); //引用启动时不会自动开启定时任务
leaderLatch = new leaderLatch(getClient(), leader_PATH);
// 当leader节点改变时,触发监听
leaderLatch.addListener(new leaderLatchListener() {
@Override
public void isleader() {
System.out.println(Thread.currentThread().getName() + "成为了leader节点");
setAutoStartup(true);
start();
}
@Override
public void notleader() {
System.out.println(Thread.currentThread().getName() + "竞选leader失败");
setAutoStartup(false);
stop();
}
});
leaderLatch.start();
}
@Override
protected void startScheduler(Scheduler scheduler, int startupDelay) throws SchedulerException {
if (this.isAutoStartup()) {
super.startScheduler(scheduler, startupDelay);
}
}
@Override
public void destroy() throws SchedulerException {
Closeableutils.closeQuietly(leaderLatch);
super.destroy();
}
}
public class MyQuartzJobBean extends QuartzJobBean {
@Override
protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {
System.out.println("开始执行任务");
System.out.println("当前执行的系统时间:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
}
}
@Configuration
public class QuartzConfiguration {
@Bean
public ZkSchedulerfactorybean schedulerfactorybean(JobDetail jobDetail, Trigger trigger) throws Exception {
ZkSchedulerfactorybean zkSchedulerfactorybean = new ZkSchedulerfactorybean();
zkSchedulerfactorybean.setJobDetails(jobDetail);
zkSchedulerfactorybean.setTriggers(trigger);
return zkSchedulerfactorybean;
}
@Bean
public JobDetail jobDetail() {
return JobBuilder.newJob(MyQuartzJobBean.class).storeDurably().build();
}
@Bean
public Trigger trigger() {
SimpleScheduleBuilder simpleScheduleBuilder = SimpleScheduleBuilder.simpleSchedule()
//一秒执行一次,重复执行
.withIntervalInSeconds(1).repeatForever();
return TriggerBuilder.newTrigger().forJob(jobDetail()).withSchedule(simpleScheduleBuilder).build();
}
}
springboot项目配置
启动两个项目,代表两个节点,两个应用同时启动,哪个应用在zookeeper目录下生成的节点序列号小,那么就抢占到锁,代表leader节点,执行定时任务。当一个应用停止,另一个应用会自动切换成leader节点,定时任务会自动执行。
leaderSelector使用
底层使用InterProcessMutex,与leaderLatch的区别是,leader释放领导权之后,还可以继续参与竞争。
public class leaderSelectorDemo extends leaderSelectorListenerAdapter implements Closeable {
private final String leader_PATH = "/leader";
private leaderSelector leaderSelector;
private String nodeName;
private CuratorFramework getClient() {
CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()
.connectString("192.168.221.128:2181").connectionTimeoutMs(2000)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.sessionTimeoutMs(20000).build();
curatorFramework.start();//启动
return curatorFramework;
}
public leaderSelectorDemo(String nodeName) {
leaderSelector = new leaderSelector(getClient(), leader_PATH, this);
leaderSelector.autoRequeue(); //自动发起选举,重新参与竞争
this.nodeName = nodeName;
}
@Override
public void close() throws IOException {
leaderSelector.close();
}
public void start() {
leaderSelector.start();
}
@Override
public void takeleadership(CuratorFramework curatorFramework) throws Exception {
System.out.println(nodeName + "竞争成为leader");
}
public static void main(String[] args) {
for (int i = 1; i <= 10; i++) {
leaderSelectorDemo leaderSelectorDemo = new leaderSelectorDemo("node-" + i);
leaderSelectorDemo.start();
}
}
}
原文地址:https://www.jb51.cc/wenti/3281673.html
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。