博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
CAS原子操作实现无锁及性能分析
阅读量:4290 次
发布时间:2019-05-27

本文共 11306 字,大约阅读时间需要 37 分钟。

原文地址:http://blog.csdn.net/chen19870707/article/details/41083183

CAS原子操作实现无锁及性能分析

 

  • Author:Echo Chen(陈斌)

  • Email:chenb19870707@gmail.com

  • Blog:

  • Date:Nov 13th, 2014

    最近在研究nginx的自旋锁的时候,又见到了GCC CAS原子操作,于是决定动手分析下CAS实现的无锁到底性能如何,网上关于CAS实现无锁的文章很多,但少有研究这种无锁的性能提升的文章,这里就以实验结果和我自己的理解逐步展开。

  • 1.什么是CAS原子操作

    在研究无锁之前,我们需要首先了解一下CAS原子操作——Compare & Set,或是 Compare & Swap,现在几乎所有的CPU指令都支持CAS的原子操作,X86下对应的是 CMPXCHG 汇编指令。

    大家应该还记得操作系统里面关于“原子操作”的概念,一个操作是原子的(atomic),如果这个操作所处的层(layer)的更高层不能发现其内部实现与结构。原子操作可以是一个步骤,也可以是多个操作步骤,但是其顺序是不可以被打乱,或者切割掉只执行部分。有了这个原子操作这个保证我们就可以实现无锁了。

    CAS原子操作在中的代码描述如下:

    1: int compare_and_swap(int* reg, int oldval, int newval)
    2: {
    3:   ATOMIC();
    4:   int old_reg_val = *reg;
    5:   if (old_reg_val == oldval)
    6:      *reg = newval;
    7:   END_ATOMIC();
    8:   return old_reg_val;
    9: }
  • 也就是检查内存*reg里的值是不是oldval,如果是的话,则对其赋值newval。上面的代码总是返回old_reg_value,调用者如果需要知道是否更新成功还需要做进一步判断,为了方便,它可以变种为直接返回是否更新成功,如下:

  • 1: bool compare_and_swap (int *accum, int *dest, int newval)
    2: {
    3:   if ( *accum == *dest ) {
    4:       *dest = newval;
    5:       return true;
    6:   }
    7:   return false;
    8: }

    除了CAS还有以下原子操作:

    • ,一般用来对变量做 +1 的原子操作。
      1: << atomic >>
      2: function FetchAndAdd(address location, int inc) {
      3:     int value := *location
      4:     *location := value + inc
      5:     return value
      6: }
       
      ,写值到某个内存位置并传回其旧值。汇编指令BST。
      1: #define LOCKED 1
      2: 
      3: int TestAndSet(int* lockPtr) {
      4:     int oldValue;
      5: 
      6:     // Start of atomic segment
      7:     // The following statements should be interpreted as pseudocode for
      8:     // illustrative purposes only.
      9:     // Traditional compilation of this code will not guarantee atomicity, the
      10:     // use of shared memory (i.e. not-cached values), protection from compiler
      11:     // optimization, or other required properties.
      12:     oldValue = *lockPtr;
      13:     *lockPtr = LOCKED;
      14:     // End of atomic segment
      15: 
      16:     return oldValue;
      17: }
       
    • ,用来实现多核环境下互斥锁,
      1: boolean locked := false // shared lock variable
      2: procedure EnterCritical() {
      3:   do {
      4:     while (locked == true) skip // spin until lock seems free
      5:   } while TestAndSet(locked) // actual atomic locking
      6: }

    2.CAS 在各个平台下的实现

     

    2.1 Linux GCC 支持的 CAS

    GCC4.1+版本中支持CAS的原子操作(完整的原子操作可参看)

    1: bool __sync_bool_compare_and_swap (type *ptr, type oldval type newval, ...)
    2: type __sync_val_compare_and_swap (type *ptr, type oldval type newval, ...)

    2.2  Windows支持的CAS

    在Windows下,你可以使用下面的Windows API来完成CAS:(完整的Windows原子操作可参看MSDN的)

  • 1: InterlockedCompareExchange ( __inout LONG volatile *Target,
    2:                                 __in LONG Exchange,
    3:                                 __in LONG Comperand);

    2.3  C++ 11支持的CAS

    C++11中的STL中的atomic类的函数可以让你跨平台。(完整的C++11的原子操作可参看 )

    1: template< class T >
    2: bool atomic_compare_exchange_weak( std::atomic
    * obj,
    3:                                    T* expected, T desired );
    4: template< class T >
    5: bool atomic_compare_exchange_weak( volatile std::atomic
    * obj,
    6:                                    T* expected, T desired );
     
  • 3.CAS原子操作实现无锁的性能分析

     

    • 3.1测试方法描述

     
             这里由于只是比较性能,所以采用很简单的方式,创建10个线程并发执行,每个线程中循环对全局变量count进行++操作(i++),循环加2000000次,这必然会涉及到并发互斥操作,在同一台机器上分析 加普通互斥锁、CAS实现的无锁、Fetch And Add实现的无锁消耗的时间,然后进行分析。

     

    3.2 加普通互斥锁代码

    1: #include 
    2: #include 
    3: #include 
    4: #include 
    5: #include "timer.h"
    6: 
    7: pthread_mutex_t mutex_lock;
    8: static volatile int count = 0;
    9: void *test_func(void *arg)
    10: {
    11:         int i = 0;
    12:         for(i = 0; i < 2000000; i++)
    13:         {
    14:                 pthread_mutex_lock(&mutex_lock);
    15:                 count++;
    16:                 pthread_mutex_unlock(&mutex_lock);
    17:         }
    18:         return NULL;
    19: }
    20: 
    21: int main(int argc, const char *argv[])
    22: {
    23:     Timer timer; // 为了计时,临时封装的一个类Timer。
    24:     timer.Start();    // 计时开始
    25:     pthread_mutex_init(&mutex_lock, NULL);
    26:     pthread_t thread_ids[10];
    27:     int i = 0;
    28:     for(i = 0; i < sizeof(thread_ids)/sizeof(pthread_t); i++)
    29:     {
    30:         pthread_create(&thread_ids[i], NULL, test_func, NULL);
    31:     }
    32: 
    33:     for(i = 0; i < sizeof(thread_ids)/sizeof(pthread_t); i++)
    34:     {
    35:         pthread_join(thread_ids[i], NULL);
    36:     }
    37: 
    38:     timer.Stop();// 计时结束
    39:     timer.Cost_time();// 打印花费时间
    40:     printf("结果:count = %d\n",count);
    41: 
    42:     return 0;
    43: }

    注:Timer类仅作统计时间用,其实现在文章最后给出。

     

    3.2 CAS实现的无锁

     

    1: #include 
    2: #include 
    3: #include 
    4: #include 
    5: #include 
    6: #include "timer.h"
    7: 
    8: int mutex = 0;
    9: int lock = 0;
    10: int unlock = 1;
    11: 
    12: static volatile int count = 0;
    13: void *test_func(void *arg)
    14: {
    15:         int i = 0;
    16:         for(i = 0; i < 2000000; i++)
    17:     {
    18:         while (!(__sync_bool_compare_and_swap (&mutex,lock, 1) ))usleep(100000);
    19:          count++;
    20:          __sync_bool_compare_and_swap (&mutex, unlock, 0);
    21:         }
    22:         return NULL;
    23: }
    24: 
    25: int main(int argc, const char *argv[])
    26: {
    27:     Timer timer;
    28:     timer.Start();
    29:     pthread_t thread_ids[10];
    30:     int i = 0;
    31: 
    32:     for(i = 0; i < sizeof(thread_ids)/sizeof(pthread_t); i++)
    33:     {
    34:             pthread_create(&thread_ids[i], NULL, test_func, NULL);
    35:     }
    36: 
    37:     for(i = 0; i < sizeof(thread_ids)/sizeof(pthread_t); i++)
    38:     {
    39:             pthread_join(thread_ids[i], NULL);
    40:     }
    41: 
    42:     timer.Stop();
    43:     timer.Cost_time();
    44:     printf("结果:count = %d\n",count);
    45: 
    46:     return 0;
    47: }
    48: 

     

    3.4 Fetch And Add 原子操作

     
    1: #include 
    2: #include 
    3: #include 
    4: #include 
    5: #include 
    6: #include "timer.h"
    7: 
    8: static volatile int count = 0;
    9: void *test_func(void *arg)
    10: {
    11:         int i = 0;
    12:         for(i = 0; i < 2000000; i++)
    13:         {
    14:             __sync_fetch_and_add(&count, 1);
    15:         }
    16:         return NULL;
    17: }
    18: 
    19: int main(int argc, const char *argv[])
    20: {
    21:     Timer timer;
    22:     timer.Start();
    23:     pthread_t thread_ids[10];
    24:     int i = 0;
    25: 
    26:     for(i = 0; i < sizeof(thread_ids)/sizeof(pthread_t); i++){
    27:             pthread_create(&thread_ids[i], NULL, test_func, NULL);
    28:     }
    29: 
    30:     for(i = 0; i < sizeof(thread_ids)/sizeof(pthread_t); i++){
    31:             pthread_join(thread_ids[i], NULL);
    32:     }
    33: 
    34:     timer.Stop();
    35:     timer.Cost_time();
    36:     printf("结果:count = %d\n",count);
    37:     return 0;
    38: }
    39: 
     

    4 实验结果和分析

    在同一台机器上,各运行以上3份代码10次,并统计平均值,其结果如下:(单位微秒)

    由此可见,无锁操作在性能上远远优于加锁操作,消耗时间仅为加锁操作的1/3左右,无锁编程方式确实能够比传统加锁方式效率高,经上面测试可以发现,可以快到3倍左右。所以在极力推荐在高并发程序中采用无锁编程的方式可以进一步提高程序效率。

    5.时间统计类Timer

    timer.h

    1: #ifndef TIMER_H
    2: #define TIMER_H
    3: 
    4: #include 
    5: class Timer
    6: {
    7: public:   
    8:     Timer();
    9:     // 开始计时时间
    10:     void Start();
    11:     // 终止计时时间
    12:     void Stop();
    13:     // 重新设定
    14:     void Reset();
    15:     // 耗时时间
    16:     void Cost_time();
    17: private:
    18:     struct timeval t1;
    19:     struct timeval t2;
    20:     bool b1,b2;
    21: };
    22: #endif

    timer.cpp

    1: #include "timer.h"
    2: #include 
    3: 
    4: Timer::Timer()
    5: {
    6:     b1 = false;
    7:     b2 = false;
    8: }
    9: void Timer::Start()
    10: {
    11:     gettimeofday(&t1,NULL); 
    12:     b1 = true;
    13:     b2 = false;
    14: }
    15: 
    16: void Timer::Stop()
    17: {   
    18:     if (b1 == true)
    19:     {
    20:         gettimeofday(&t2,NULL); 
    21:         b2 = true;
    22:     }
    23: }
    24: 
    25: void Timer::Reset()
    26: {   
    27:     b1 = false;
    28:     b2 = false;
    29: }
    30: 
    31: void Timer::Cost_time()
    32: {
    33:     if (b1 == false)
    34:     {
    35:         printf("计时出错,应该先执行Start(),然后执行Stop(),再来执行Cost_time()");
    36:         return ;
    37:     }
    38:     else if (b2 == false)
    39:     {
    40:         printf("计时出错,应该执行完Stop(),再来执行Cost_time()");
    41:         return ;
    42:     }
    43:     else
    44:     {
    45:         int usec,sec;
    46:         bool borrow = false;
    47:         if (t2.tv_usec > t1.tv_usec)
    48:         {
    49:             usec = t2.tv_usec - t1.tv_usec;
    50:         }
    51:         else
    52:         {
    53:             borrow = true;
    54:             usec = t2.tv_usec+1000000 - t1.tv_usec;
    55:         }
    56: 
    57:         if (borrow)
    58:         {
    59:             sec = t2.tv_sec-1 - t1.tv_sec;
    60:         }
    61:         else
    62:         {
    63:             sec = t2.tv_sec - t1.tv_sec;
    64:         }
    65:         printf("花费时间:%d秒 %d微秒\n",sec,usec);
    66:     }
    67: }
    68: 
  • 6.参考

  • 1.      
  •  2.
  • -

  • Echo Chen:

       -

7
 
1

我的同类文章

  • 2014-11-18
  • 2014-10-08
  • 2014-10-11
  • 2014-09-26
猜你在找
查看评论
9楼 
2015-09-22 22:01发表 
多谢分享,学习了~
8楼 
2015-01-16 14:51发表 
楼主文章不错~
7楼 
2014-12-22 16:03发表 
学习了
6楼 
2014-12-02 17:09发表 
pthread mutex implementation also makes use of CAS under the hood. The major difference, with your test, is that pthread mutex would make a system call to block calling thread until the lock is available, while your CAS implementation makes a fixed-time sleep.
Re: 
2014-12-05 10:37发表 
回复fubar:usleep is also a system call, is't it?
why the version which used usleep is much faster then the mutex one?
Re: 
2014-12-04 19:45发表 
回复fubar:Thanks for your remarks! I'll learn more about it.
Re: 
2014-12-04 18:26发表 
这篇文章,包括很多人对无锁的理解都是非常错误的,这里其实使用了spin锁,实际上也是基于锁的。
另外fubar说的很对,CAS本身没什么神秘的到处都在用。无锁只是说基于CAS来实现,因为它被证明是充分必要条件,而其他原子操作要不不够,要不像ll/sc更强不被广泛支持,两者不是等同的概念。
5楼 
2014-11-26 16:22发表 
学习了,谢谢!
另外有个疑问,CAS实现的无锁,如果拿不到锁就sleep 100ms,,如果时间改小一点会不会有所提高呢?毕竟如果有一次拿到就浪费了100ms了。。
Re: 
2014-12-18 22:09发表 
回复scgywx:有一种极端情况:若是它能拿到锁的时候永远都在sleep咋办。。。
Re: 
2014-11-26 17:37发表 
回复scgywx:应该会,你可以测测,我没试
4楼 
2014-11-25 09:38发表 
能够用C++ 11特性, 别使用pthread, 写个线程安全的计数器
在linux下测试没找到pthread
3楼 
2014-11-17 12:36发表 
CAS没有LL-SC那么灵活强大
Re: 
2015-09-22 23:16发表 
回复zenny_chen:LL-SC 很强大么,把 一个CAS操作分开来做,还要考虑到期间硬件中断和缓存行被修改的问题,貌似store-conditional 失败率更高
Re: 
2015-09-23 20:44发表 
回复workrun:LL-SC可不是将一个CAS分开来。LL-SC比起CAS来有两个优势:
LL/SC has two advantages over CAS when designing a load-store architecture: reads and writes are separate instructions, as required by the design philosophy (and pipeline architecture); and both instructions can be performed using only two registers (address and value), fitting naturally into LSA encoding schemes. CAS, on the other hand, requires three registers (address, old value, new value) and a dependency between the value read and the value written. x86, being a CISC architecture, does not have this constraint; though modern chips may well translate a CAS instruction into separate LL/SC micro-operations internally.
Re: 
2015-09-24 13:07发表 
回复zenny_chen:难道它不是把读写操作分开来的么,读取-监视-条件写,可以看一下这篇http://blogs.msdn.com/b/oldnewthing/archive/2013/09/13/10448736.aspx,
请求CPU监视一个内存地址依赖于CPU自己的实现。但要记住一件事情,CPU在同一时间只能监视一个内存地址,并且这个时间是很短暂的。如果你的代码被抢占了或者在load-link后有一个硬件中断到来,那么你的store-conditional将会失败,因为CPU因为硬件中断而分心了,完全忘记了你要求它监视的内存地址(即使CPU成功的记住了它,也不会记太久,因为硬件中断几乎都会执行自己的load-link指令,因此会替换成它自己要求监视的内存地址)。
另外,CPU可能会有点懒,在监视时并不监视内存地址,而是监视cache line,如果有人修改了一个不同的内存位置,但是刚好跟要被监视的内存地址在同一个cache line里,store-conditional操作也会失败,即使它事实上可以成功完成。ARM架构的CPU是太懒了,以至于任何向同一块2048字节写入的操作都会导致store-conditional失败。
Re: 
2014-11-30 21:28发表 
回复zenny_chen:X86系列的CPU是没有LL-SC指令的
Re: 
2014-11-17 15:23发表 
回复zenny_chen:我要研究下,谢谢共享知识
Re: 
2014-12-01 10:33发表 
回复chen19870707:呼呼,推荐阅读:http://en.wikipedia.org/wiki/Non-blocking_algorithm
其中,see also里的Load-Link/Store-Conditional就是LL-SC
2楼 
2014-11-16 08:05发表 
谢谢分享 学习了 `(*∩_∩*)′
1楼 
2014-11-14 17:59发表   
您的文章已被推荐到博客首页和个人页侧边栏推荐文章,感谢您的分享。
你可能感兴趣的文章
idea在搭建ssm框架时mybatis整合问题 无法找到mapper
查看>>
java设计基本原则----单一职责原则
查看>>
HashMap的实现
查看>>
互斥锁 synchronized分析
查看>>
java等待-通知机制 synchronized和waity()的使用实践
查看>>
win10 Docke安装mysql8.0
查看>>
docker 启动已经停止的容器
查看>>
order by 排序原理及性能优化
查看>>
Lock重入锁
查看>>
docker安装 rabbitMq
查看>>
git 常用命令 入门
查看>>
linux安装docker
查看>>
关闭selinx nginx无法使用代理
查看>>
shell 脚本部署项目
查看>>
spring cloud zuul网关上传大文件
查看>>
springboot+mybatis日志显示SQL
查看>>
工作流中文乱码问题解决
查看>>
maven打包本地依赖包
查看>>
spring boot jpa 实现拦截器
查看>>
jenkins + maven+ gitlab 自动化部署
查看>>