执行速度结果:
- 传统互斥量加锁方式 < no lock不加锁的方式 < 原子函数方式
正文如下:
最近编码需要实现多线程环境下的计数器操作,统计相关事件的次数。下面是一些学习心得和体会。不敢妄称原创,基本是学习笔记。遇到相关的引用,我会致谢。
当然我们知道,count++这种操作不是原子的。一个自加操作,本质是分成三步的:
1 从缓存取到寄存器
2 在寄存器加1
3 存入缓存。
mov eax,dword ptr [a]
add eax,1
mov dword ptr [a],eax
由于时序的因素,多个线程操作同一个全局变量,会出现问题。这也是并发编程的难点。在目前多核条件下,这种困境会越来越彰显出来。
最简单的处理办法就是加锁保护,这也是我最初的解决方案。看下面的代码:
pthread_mutex_t count_lock = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_lock(&count_lock);
global_int++;
pthread_mutex_unlock(&count_lock);
linux 变量 : pthread_mutex_t
linux 函数 : pthread_mutex_lock; pthread_mutex_unlock
后来在网上查找资料,找到了__sync_fetch_and_add系列的命令
__sync_fetch_and_add系列一共有十二个函数,有加/减/与/或/异或/等函数的原子性操作函数,
__snyc_fetch_and_add : 先fetch然后自加,返回的是自加以前的值
__snyc_add_and_fetch : 先自加然后返回,返回的是自加以后的值 (参照 ++i 和 i++)
__snyc_fetch_and_add的一个简单使用
int count = 4;
__sync_fetch_and_add(&count,1); // __sync_fetch_and_add(&count,1) == 4
cout<<count<<endl; //--->count=5
对于多线程对全局变量进行自加,我们就再也不用理线程锁了。
下面这行代码,和上面被pthread_mutex保护的那行代码作用是一样的,而且也是线程安全的。
__sync_fetch_and_add( &global_int,1 );
下面是这群函数的全家福,大家看名字就知道是这些函数是干啥的了。
//在用gcc编译的时候要加上选项 -march=i686
type __sync_fetch_and_add (type *ptr,type value,...);
type __sync_fetch_and_sub (type *ptr,...);
type __sync_fetch_and_or (type *ptr,...);
type __sync_fetch_and_and (type *ptr,...);
type __sync_fetch_and_xor (type *ptr,...);
type __sync_fetch_and_nand (type *ptr,...);
type __sync_add_and_fetch (type *ptr,...);
type __sync_sub_and_fetch (type *ptr,...);
type __sync_or_and_fetch (type *ptr,...);
type __sync_and_and_fetch (type *ptr,...);
type __sync_xor_and_fetch (type *ptr,...);
type __sync_nand_and_fetch (type *ptr,...);
__sync_fetch_and_add,速度是线程锁的6~7倍
type可以是1,2,3或者8字节长度的int类型,即
int8_t
uint8_tint16_t
uint16_tint32_t
uint32_tint64_t
uint64_t
后面的可扩展参数(...)用来指出哪些变量需要memory barrier,因为目前gcc实现的是full barrier(类似于linux kernel 中的mb(),表示这个操作之前的所有内存操作不会被重排序到这个操作之后),所以可以略掉这个参数。
恩.再找个帖子学习学习.linux无锁化编程--__sync_fetch_and_add系列原子操作函数_风吹过的时光的博客-CSDN博客
有一个概念叫过无锁化编程, 知道linux支持的哪些操作是具有原子特性的是理解和设计无锁化编程算法的基础
除了上面提到的12个外 还有4个可以实现互斥锁的功能
//以下两个函数提供原子的比较和交换,如果*ptr = oldValue,就将newValue写入*ptr
//第一个函数在相等并写入的情况下返回true
//第二个函数返回操作之前的值
bool __sync_bool_compare_and_swap(type* ptr,type oldValue,type newValue,....);
type __sync_val_compare_and_swap(type* ptr,....);
//将*ptr设为value并返回*ptr操作之前的值
type __sync_lock_test_and_set(type *ptr,....);
//置*ptr为0
void __sync_lock_release(type* ptr,....);
__sync_synchronize(...)
//作用 : 发出一个full barrier
/*关于memory barrier,cpu会对我们的指令进行排序,一般说来会提高程序的效率,但有时候可能造成我们不希望得到的结果,举一个例子,比如我们有一个硬件设备,它有4个寄存器,当你发出一个操作指令的时候,一个寄存器存的是你的操作指令(比如READ),两个寄存器存的是参数(比如是地址和size),最后一个寄存器是控制寄存器,在所有的参数都设置好之后向其发出指令,设备开始读取参数,执行命令,程序可能如下:*/
write1(dev.register_size,size);
write1(dev.register_addr,addr);
write1(dev.register_cmd,Read);
write1(dev.register_control,GO);
/*如果最后一条write1被换到了前几条语句之前,那么肯定不是我们所期望的,这时候我们可以在最后一条语句之前加入一个memory barrier,强制cpu执行完前面的写入以后再执行最后一条:*/
write1(dev.register_size,Read);
__sync_synchronize();
write1(dev.register_control,GO);
//memory barrier有几种类型:
//acquire barrier : 不允许将barrier之后的内存读取指令移到barrier之前(linux kernel中的wmb())
//release barrier : 不允许将barrier之前的内存读取指令移到barrier之后 (linux kernel中的rmb())
//full barrier : 以上两种barrier的合集(linux kernel中的mb())
//好吧,说实话这个函数的说明基本没看懂
最后从网上找一个代码写一写:无锁编程实战演练_风吹过的时光的博客-CSDN博客
测试场景:假设有一个应用:现在有一个全局变量,用来计数,再创建10个线程并发执行,每个线程中循环对这个全局变量进行++操作(i++),循环加2000000次。
所以很容易知道,这必然会涉及到并发互斥操作。下面通过三种方式[传统互斥量加锁方式,no lock不加锁的方式,原子函数方式]来实现这种并发操作。并对比出其在效率上的不同之处。
这里先贴上代码,共5个文件:2个用于做时间统计的文件:timer.h timer.cpp。这两个文件是临时封装的,只用来计时,可以不必细看。
//timer.h 用于计时
#ifndef TIMER_H_
#define TIMER_H_
#include <sys/time.h>
class Timer
{
public:
Timer();
Timer(const Timer& t) = delete;
~Timer();
void start();
void stop();
void reset();
double costTime();
private:
struct timeval t1;
struct timeval t2;
bool b1,b2;
};
#endif
//timer.cpp
#include "timer.h"
#include <iostream>
using namespace std;
Timer::Timer():b1(false),b2(false)
{
}
Timer::~Timer()
{
}
void Timer::start()
{
gettimeofday(&t1,NULL);
b1 = true;
b2 = false;
}
void Timer::stop()
{
gettimeofday(&t2,NULL);
b2 = true;
}
void Timer::reset()
{
b1 = false;
b2 = false;
}
double Timer::costTime()
{
if (!b1)
{
cout<<"error,do not call function start()"<<endl;
cout<<"the right sequence : start() ..... stop() costTime()"<<endl;
return 0;
}
if (!b2)
{
cout<<"error,do not call function stop()"<<endl;
cout<<"the right sequence : start() ..... stop() costTime()"<<endl;
return 0;
}
size_t sec = t2.tv_sec - t1.tv_sec;
double usec = t2.tv_usec - t1.tv_usec;
if (sec < 0)
{
cout<<"error,call stop() before start()"<<endl;
cout<<"the right sequence : start() ..... stop() costTime()"<<endl;
return 0;
}
if (usec < 0)
{
usec += 1000000;
--sec;
if (sec < 0)
{
cout<<"error,call stop() before start()"<<endl;
cout<<"the right sequence : start() ..... stop() costTime()"<<endl;
return 0;
}
}
return sec + usec * 1.0 / 1000000;
}
//thread_function.h -->多线程要调用的函数
#ifndef THREAD_FUNCTION_H_
#define THREAD_FUNCTION_H_
void* thread_lock_execFunc(void* arg);
void* thread_nolock_execFunc(void* arg);
void* thread_atom_execFunc(void* arg);
#endif
//thread_function.cpp
#include "thread_function.h"
#include "lock.h"
#include <pthread.h>
#include <unistd.h>
extern volatile int count;
struct LOCK;
void* thread_lock_execFunc(void* arg)
{
for (int i = 0; i < 2000000; ++i)
{
pthread_mutex_lock(reinterpret_cast<pthread_mutex_t*>(arg));
++count;
pthread_mutex_unlock(reinterpret_cast<pthread_mutex_t*>(arg));
}
return NULL;
}
void* thread_nolock_execFunc(void* arg)
{
LOCK* pLock = reinterpret_cast<LOCK*>(arg);
for (int i = 0; i < 2000000; ++i)
{
while(!(__sync_bool_compare_and_swap(&(pLock->mutex),pLock->use,1)))
{
usleep(100000);
}
++count;
__sync_bool_compare_and_swap(&(pLock->mutex),pLock->unUse,0);
}
return NULL;
}
void* thread_atom_execFunc(void* arg)
{
for (int i = 0; i < 2000000; ++i)
{
__sync_fetch_and_add(&count,1);
}
return NULL;
}
//lock.h --->给mainnolock.cpp使用的类
#ifndef LOCK_H_
#define LOCK_H_
struct LOCK
{
int mutex;
int use;
int unUse;
LOCK() : mutex(0),use(0),unUse(1)
{
}
};
#endif
//mainlock.cpp 使用mutex加锁方式的多线程
#include <iostream>
#include <pthread.h>
#include <iomanip>
#include "timer.h"
#include "thread_function.h"
using namespace std;
pthread_mutex_t mutex_lock;
volatile int count = 0;
int main( int argc,char** argv)
{
pthread_mutex_init(&mutex_lock,NULL);
Timer timer;
timer.start();
/*test thread begin*/
pthread_t thread_ids[10];
for (int i = 0; i < sizeof(thread_ids)/sizeof(pthread_t); ++i)
{
pthread_create(&thread_ids[i],NULL,thread_lock_execFunc,&mutex_lock);
}
for (int i = 0; i < sizeof(thread_ids)/sizeof(pthread_t); ++i)
{
pthread_join(thread_ids[i],NULL);
}
/*test thread end*/
timer.stop();
cout<<setiosflags(ios::fixed)<<setprecision(4)<<"lock cost["<<timer.costTime()<<"]second"<<endl;
return 0;
}
//main_nolock.cpp 使用__sync_compare_and_swap的多线程
#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <iomanip>
#include "timer.h"
#include "thread_function.h"
#include "lock.h"
using namespace std;
volatile int count = 0;
int main(int argc,char** argv)
{
LOCK lock;
Timer timer;
timer.start();
/*test thread begin*/
pthread_t thread_ids[10];
for (int i = 0; i < sizeof(thread_ids) / sizeof(pthread_t); ++i)
{
pthread_create(&thread_ids[i],thread_nolock_execFunc,&lock);
}
for (int i = 0; i < sizeof(thread_ids) / sizeof(pthread_t); ++i)
{
pthread_join(thread_ids[i],NULL);
}
/*test thread end*/
timer.stop();
cout<<setiosflags(ios::fixed)<<setprecision(4)<<"nolock cost["<<timer.costTime()<<"]\n";
return 0;
}
//main_atomic.cpp 使用__sync_fetch_and_add的多线程
#include<iostream>
#include<pthread.h>
#include<unistd.h>
#include<iomanip>
#include "timer.h"
#include "thread_function.h"
using namespace std;
volatile int count = 0;
int main(int argc,char** argv)
{
Timer timer;
timer.start();
/*pthread begin*/
pthread_t thread_ids[10];
for (int i = 0; i < sizeof(thread_ids)/sizeof(pthread_t); ++i)
{
pthread_create(&thread_ids[i],thread_atom_execFunc,NULL);
}
for (int i = 0; i < sizeof(thread_ids)/sizeof(pthread_t); ++i)
{
pthread_join(thread_ids[i],NULL);
}
/*pthread end*/
timer.stop();
cout<<setiosflags(ios::fixed)<<setprecision(4)<<"atomic cost["<<timer.costTime()<<"]\n";
return 0;
}
//makefile
CC = g++
CFLAGS = -g -lpthread -std=c++11
OBJS_LOCK = main_lock.o timer.o thread_function.o
OBJS_UNLOCK = main_nolock.o timer.o thread_function.o
OBJS_ATOMICLOCK = main_atomic.o timer.o thread_function.o
INC = timer.h thread_function.h lock.h
lock : $(OBJS_LOCK) $(INC)
$(CC) -o mainlock $(OBJS_LOCK) $(CFLAGS)
rm *.o
nolock : $(OBJS_UNLOCK) $(INC)
$(CC) -o mainnolock $(OBJS_UNLOCK) $(CFLAGS)
rm *.o
atomiclock : $(OBJS_ATOMICLOCK) $(INC)
$(CC) -o mainatomic $(OBJS_ATOMICLOCK) $(CFLAGS)
main_lock.o : main_lock.cpp
$(CC) -c main_lock.cpp $(CFLAGS)
main_nolock.o : main_nolock.cpp
$(CC) -c main_nolock.cpp $(CFLAGS)
main_atomic.o : main_atomic.cpp
$(CC) -c main_atomic.cpp $(CFLAGS)
timer.o : timer.cpp
$(CC) -c timer.cpp $(CFLAGS)
thread_function.o : thread_function.cpp
$(CC) -c thread_function.cpp $(CFLAGS)
clean:
rm *.o
执行makefile
make lock
make nolock
make atomiclock
然后生成3个可执行文件
运行这3个可执行文件:
另外:针对main_nolock.cpp而言,作者提到了一个现象
在thread_function.cpp中,随着一下代码的改变,运行时间会有变化
while (!(__sync_bool_compare_and_swap (&mutex,lock,1) ));
while (!(__sync_bool_compare_and_swap (&mutex,1) )) usleep(1);
while (!(__sync_bool_compare_and_swap (&mutex,1) ))usleep(10);
while (!(__sync_bool_compare_and_swap (&mutex,1) ))usleep(100);
while (!(__sync_bool_compare_and_swap (&mutex,1) ))usleep(1000);
while (!(__sync_bool_compare_and_swap (&mutex,1) ))usleep(10000);
while (!(__sync_bool_compare_and_swap (&mutex,1) ))usleep(100000);
执行时间的关系是 : T(;)<T(1)<T(10)<T(100)<T(1000)<T(10000)>T(100000)
通过编程测试及测试得出结论:
1、如果是想用全局变量来做统计操作。而又不得不考虑多线程间的互斥访问的话,最好使用编译器支持的原子操作函数。再满足互斥访问的前提下,编程最简单,效率最高。
2、lock-free,无锁编程方式确实能够比传统加锁方式效率高。所以在高并发程序中采用无锁编程的方式可以进一步提高程序效率。但是得对无锁方式有足够熟悉的了解,不然效率反而会更低而且容易出错。(比如在某些情况下main_nolock比main_lock的效率还要低)
在学习一个无锁化编程的分析帖子 关于CPU编程—无锁编程_风吹过的时光的博客-CSDN博客
Lock-free 算法通常比基于锁的算法要好:
- 从其定义来看,它们是 wait-free 的,可以确保线程永远不会阻塞。
- 状态转变是原子性的,以至于在任何点失败都不会恶化数据结构。
- 因为线程永远不会阻塞,所以当同步的细粒度是单一原子写或比较交换时,它们通常可以带来更高的吞吐量。
- 在某些情况下,lock-free 算法会有更少的同步写操作(比如 Interlocked 操作),因此纯粹从性能来看,它可能更便宜。
但是 lock-freedom 并不是万能药。下面是一些很明显的不利因素:
- 乐观的并发使用会对 hot data structures 导致 livelock。
- 代码需要大量困难的测试。通常其正确性取决于对目标机器内存模型的正确解释。
- 基于众多原因,lock-free 代码很难编写和维护。
无锁编程主要是使用原子操作替代锁来实现对共享资源的访问保护,举个例子,要对某个整数变量进行加1操作的话,用锁保护操作的代码如下:
int a = 0;
Lock();
a+= 1;
Unlock();
如果对上述代码反编译可以发现 a+=1;被翻译成了以下三条汇编指令:
mov eax,eax
如果在单核系统中,由于在上述三条指令的任何一条执行完后都可能发生任务切换,比如执行完第1条指令后就发生了任务切换,这时如果有其他任务来对a进行操作的话,当任务切换回来后,将继续对a进行操作,很可能出现不可预测的结果,因此上述三条指令必须使用锁来保护,以使这段时间内其他任务无法对a进行操作。
需要注意的是,在多核系统中,因为多个CPU核在物理上是并行的,可能发生同时写的现象;所以必须保证一个CPU核在对共享内存进行写操作时,其他CPU核不能写这块内存。因此在多核系统中和单核有区别,即使只有一条指令,也需要要加锁保护。
如果使用原子操作来实现上述加1操作的话,例如使用VC里的InterlockedIncrement来操作的话,那么对a的加1操作需要以下语句
InterlockedIncrement (&a);
这条语句最终的实际加1操作会被翻译成以下一条带lock前缀的汇编指令:
lock xadd dword ptr [ecx],eax
使用原子操作时,在进行实际的写操作时,使用了lock指令,这样就可以阻止其他任务写这块内存,避免出现数据竞争现象。原子操作速度比锁快,一般要快一倍以上。
使用lock前缀的指令实际上在系统中是使用了内存栅障(memory barrier),当原子操作在进行时,其他任务都不能对内存操作,会影响其他任务的执行。因此这种原子操作实际上属于一种激烈竞争的锁,不过由于它的操作时间很快,因此可以看成是一种极细粒度锁。
在无锁(Lock-free)编程环境中,主要使用的原子操作为CAS(Compare and Swap)操作,在VC里对应的操作为InterlockedCompareExchange或者InterlockedCompareExchangeAcquire;如果是64位的操作,需要使用InterlockedCompareExchange64或者InterlockedCompareExchangeAcquire64。使用这种原子操作替代锁的最大的一个好处是它是非阻塞的。
比较项目 |
无锁编程 |
分布式编程 |
|
1 |
加速比性能 |
取决于竞争方式,除非也采用分布式竞争,否则不如分布式锁竞争的性能 |
加速比和CPU核数成正比关系,接近于单核多任务时的性能 |
2 |
实现的功能 |
有限 |
不受限制 |
3 |
程序员掌握难易程度 |
难度太高,过于复杂,普通程序员无法掌握,目前世界上只有少数几个人掌握。 |
和单核时代的数据结构算法难度差不多,普通程序员可以掌握 |
4 |
现有软件的移植 |
使用无锁算法后,以往的算法需要废弃掉,无法复用 |
可以继承已有的算法,在已有程序基础上重构即可。 |
从上表的四个方面的综合比较可以看出,无锁编程的实用价值是远远不如分布式编程的,因此分布式编程比无锁编程更适合多核CPU系统。
可在分布计算机系统的几台计算机上同时协调执行的程序设计方法,分布式程序设计的主要特征是分布和通信。采用分布式程序设计方法设计程序时,一个程序由若干个可独立执行的程序模块组成。这些程序模块分布于一个分布式计算机系统的几台计算机上同时执行。分布在各台计算机上的程序模块是相互关联的,它们在执行中需要交换数据,即通信。只有通过通信,各程序模块才能协调地完成一个共同的计算任务。采用分布式程序设计方法解决计算问题时,必须提供用以进行分布式程序设计的语言和设计相应的分布式算法。分布式程序设计语言与常用的各种程序设计语言的主要区别,在于它具有程序分布和通信的功能。因此,分布式程序设计语言,往往可以由一种程序设计语言增加分布和通信的功能而构成。分布式算法和适用于多处理器系统的并行算法,都具有并行执行的特点,但它们是有区别的。设计分布式算法时,必须保证实现算法的各程序模块间不会有公共变量,它们只能通过通信来交换数据。此外,设计分布式算法时,往往需要考虑坚定性,即当系统中几台计算机失效时,算法仍是有效的。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。