JAVA如何进行CAS
讲到java的队列时,讲到java中的CAS操作 回顾下java中的cas,主要采用compareAndSet方法,如AtomicReference中所使用的: AtomicRefrence.java
public final boolean compareAndSet(V expect, V update) {
return unsafe.compareAndSwapObject(this, valueOffset, expect, update);
}
unsafe.java
public final native boolean compareAndSwapObject(Object o, long offset,
Object expected,
Object x);
- obj :包含要修改的字段对象;
- offset :字段在对象内的偏移量;
- expect : 字段的期望值;
- update : 如果该字段的值等于字段的期望值,用于更新字段的新值;
compareAndSwapObject是一个本地方法,调用的c++的实现
virtual jboolean compareAndSwapObject(::java::lang::Object *, jlong, ::java::lang::Object *, ::java::lang::Object *);
static inline bool
compareAndSwap (volatile jobject *addr, jobject old, jobject new_val)
{
jboolean result = false;
spinlock lock;
if ((result = (*addr == old)))
*addr = new_val;
return result;
}
jboolean
sun::misc::Unsafe::compareAndSwapObject (jobject obj, jlong offset,
jobject expect, jobject update)
{
jobject *addr = (jobject*)((char *) obj + offset);
return compareAndSwap (addr, expect, update);
}
这段代码主要做的是: 1、通过对象的首地址跟字段在对象内的偏移量来获取字段的地址 2、判断字段地址是否与我们期望的地址相同,如果相同即更新新的地址
在java中,地址相同说明两个对象相同。
注意到c++源码中使用的 spinlock,其实现:
class spinlock
{
static volatile obj_addr_t lock;
public:
spinlock ()
{
while (! compare_and_swap (&lock, 0, 1))
_Jv_ThreadYield ();
}
~spinlock ()
{
release_set (&lock, 0);
}
};
volatile obj_addr_t spinlock::lock;
volatile关键字让编译器不进行优化,从而lock值每次都从内存中读取。
C++如何进行CAS
通过前半文的了解,我们知道c 提供的函数 compare_and_swap
bool compare_and_swap ( int *memory_location, int expected_value, int new_value)
{
if (*memory_location == expected_value)
{
*memory_location = new_value;
return true;
}
return false;
}
(1)GGC对CAS支持 GCC4.1+版本中支持CAS原子操作。
bool __sync_bool_compare_and_swap (type *ptr, type oldval type newval, ...);
type __sync_val_compare_and_swap (type *ptr, type oldval type newval, ...);
(2)Windows对CAS支持 Windows中使用Windows API支持CAS。
LONG InterlockedCompareExchange(
LONG volatile *Destination,
LONG ExChange,
LONG Comperand
);
(3)C11对CAS支持 C11 STL中atomic函数支持CAS并可以跨平台。
template< class T >
bool atomic_compare_exchange_weak( std::atomic* obj,T* expected, T desired );
template< class T >
bool atomic_compare_exchange_weak( volatile std::atomic* obj,T* expected, T desired );
atomic_compare_exchange_weak当存储值时会发生失败情况,且返回false,所以通常采用while直至比对交换成功,相比使用aotmic_compare_exchange_strong会有更高的性能,而使用while也即是通常所说的自旋效果。
已有的c++ 无锁队列
1、生产级队列ConcurrencyQueue 这是我在生产中使用的,因为消费者可以采用wait方式监听队列消息进行消费,所以蛮方便的。
2、boost方案: boost提供了三种无锁方案,分别适用不同使用场景。 boost::lockfree::queue是支持多个生产者和多个消费者线程的无锁队列。 boost::lockfree::stack是支持多个生产者和多个消费者线程的无锁栈。 boost::lockfree::spsc_queue是仅支持单个生产者和单个消费者线程的无锁队列,比boost::lockfree::queue性能更好。 Boost无锁数据结构的API通过轻量级原子锁实现lock-free,不是真正意义的无锁。 Boost提供的queue可以设置初始容量,添加新元素时如果容量不够,则总容量自动增长;但对于无锁数据结构,添加新元素时如果容量不够,总容量不会自动增长。
因为boost太大了,所以生产比较少用
自己造轮子
当我们能把一样东西做出来,说明我们才是真正的了解,所以多造轮子对自己有帮助。
RingBuffer 环形队列
数据结构
我们采用数组的线性空间来实现环形接口,当数据到达尾部时将其转回到0的位置重写入。
环形结构的容量位置从数组q[0] 到 q[max - 1]。
head表示队列头,tail表示队列尾,当(tail + 1) % max 即表示队列已满
算法
%max 取余 可以 通过位运算 head & (max - 1),需要保证max是2的幂次方
RingBuffer实现
单消费者,单生产者
#pragma once
template<typename T>
class RingBuffer {
private:
unsigned int _size;
int _front;
int _tail;
T* _data;
public:
RingBuffer(unsigned int size) :_size(size),_front(0), _tail(0) {
_data = new T[size];
}
~RingBuffer() {
delete[]T;
_data = nullptr;
}
inline bool isEmpty() {
return _front == _tail;
}
inline bool isFull() {
return _front == (_tail + 1) % _size;
}
inline bool push(const T& v) {
if (isFull()) {
return false;
}
_data[_tail] = v;
_tail = (_tail + 1) % _size;
return true;
}
inline bool push(const T* v) {
return push(*v);
}
inline bool pop(T& v) {
if (isEmpty()) {
return false;
}
v = _data[_front];
_front = (_front + 1) % _size;
return true;
}
inline unsigned int front() {
return _front;
}
inline unsigned int tail() {
return _tail;
}
inline unsigned int size() {
return _size;
}
};
上文的head,tail并不是线程安全的,当多生产者多消费者,需要进行加锁,而这个时候,我们便可以使用atomic_compare_exchange_weak了,当然了,我们也可以使用c++11 提供的atomic
LockFreeQueue实现
LockFreeQueue.hpp
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdbool.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/time.h>
#include <sys/mman.h>
#define SHM_NAME_LEN 128
#define MIN(a, b) ((a) > (b) ? (b) : (a))
#define IS_POT(x) ((x) && !((x) & ((x)-1)))
#define MEMORY_BARRIER __sync_synchronize()
template <class T>
class LockFreeQueue
{
protected:
typedef struct
{
int m_lock;
inline void spinlock_init()
{
m_lock = 0;
}
inline void spinlock_lock()
{
while(!__sync_bool_compare_and_swap(&m_lock, 0, 1)) {}
}
inline void spinlock_unlock()
{
__sync_lock_release(&m_lock);
}
} spinlock_t;
public:
LockFreeQueue(unsigned int size, const char* name = NULL)
{
memset(shm_name, 0, sizeof(shm_name));
createQueue(name, size);
}
~LockFreeQueue()
{
if(shm_name[0] == 0)
{
delete [] m_buffer;
m_buffer = NULL;
}
else
{
if (munmap(m_buffer, m_size * sizeof(T)) == -1) {
perror("munmap");
}
if (shm_unlink(shm_name) == -1) {
perror("shm_unlink");
}
}
}
bool isFull()const
{
#ifdef USE_POT
return m_head == (m_tail + 1) & (m_size - 1);
#else
return m_head == (m_tail + 1) % m_size;
#endif
}
bool isEmpty()const
{
return m_head == m_tail;
}
unsigned int front()const
{
return m_head;
}
unsigned int tail()const
{
return m_tail;
}
bool push(const T& value)
{
#ifdef USE_LOCK
m_spinLock.spinlock_lock();
#endif
if(isFull())
{
#ifdef USE_LOCK
m_spinLock.spinlock_unlock();
#endif
return false;
}
memcpy(m_buffer + m_tail, &value, sizeof(T));
#ifdef USE_MB
MEMORY_BARRIER;
#endif
#ifdef USE_POT
m_tail = (m_tail + 1) & (m_size - 1);
#else
m_tail = (m_tail + 1) % m_size;
#endif
#ifdef USE_LOCK
m_spinLock.spinlock_unlock();
#endif
return true;
}
bool pop(T& value)
{
#ifdef USE_LOCK
m_spinLock.spinlock_lock();
#endif
if (isEmpty())
{
#ifdef USE_LOCK
m_spinLock.spinlock_unlock();
#endif
return false;
}
memcpy(&value, m_buffer + m_head, sizeof(T));
#ifdef USE_MB
MEMORY_BARRIER;
#endif
#ifdef USE_POT
m_head = (m_head + 1) & (m_size - 1);
#else
m_head = (m_head + 1) % m_size;
#endif
#ifdef USE_LOCK
m_spinLock.spinlock_unlock();
#endif
return true;
}
protected:
virtual void createQueue(const char* name, unsigned int size)
{
#ifdef USE_POT
if (!IS_POT(size))
{
size = roundup_pow_of_two(size);
}
#endif
m_size = size;
m_head = m_tail = 0;
if(name == NULL)
{
m_buffer = new T[m_size];
}
else
{
int shm_fd = shm_open(name, O_CREAT | O_RDWR, 0666);
if (shm_fd < 0)
{
perror("shm_open");
}
if (ftruncate(shm_fd, m_size * sizeof(T)) < 0)
{
perror("ftruncate");
close(shm_fd);
}
void *addr = mmap(0, m_size * sizeof(T), PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0);
if (addr == MAP_FAILED)
{
perror("mmap");
close(shm_fd);
}
if (close(shm_fd) == -1)
{
perror("close");
exit(1);
}
m_buffer = static_cast<T*>(addr);
memcpy(shm_name, name, SHM_NAME_LEN - 1);
}
#ifdef USE_LOCK
spinlock_init(m_lock);
#endif
}
inline unsigned int roundup_pow_of_two(size_t size)
{
size |= size >> 1;
size |= size >> 2;
size |= size >> 4;
size |= size >> 8;
size |= size >> 16;
size |= size >> 32;
return size + 1;
}
protected:
char shm_name[SHM_NAME_LEN];
volatile unsigned int m_head;
volatile unsigned int m_tail;
unsigned int m_size;
#ifdef USE_LOCK
spinlock_t m_spinLock;
#endif
T* m_buffer;
};
#define USE_LOCK 开启spinlock锁,多生产者多消费者场景 #define USE_MB 开启Memory Barrier #define USE_POT 开启队列大小的2的幂对齐
kfifo内核队列
kfifo是Linux内核的一个FIFO数据结构,采用环形循环队列的数据结构来实现,提供一个无边界的字节流服务,并且使用并行无锁编程技术,即单生产者单消费者场景下两个线程可以并发操作,不需要任何加锁行为就可以保证kfifo线程安全。
kfifo的实现
struct kfifo
{
unsigned char *buffer;
unsigned int size;
unsigned int in;
unsigned int out;
spinlock_t *lock;
};
struct kfifo *kfifo_init(unsigned char *buffer, unsigned int size, gfp_t gfp_mask, spinlock_t *lock)
{
struct kfifo *fifo;
BUG_ON(!is_power_of_2(size));
fifo = kmalloc(sizeof(struct kfifo), gfp_mask);
if (!fifo)
return ERR_PTR(-ENOMEM);
fifo->buffer = buffer;
fifo->size = size;
fifo->in = fifo->out = 0;
fifo->lock = lock;
return fifo;
}
struct kfifo *kfifo_alloc(unsigned int size, gfp_t gfp_mask, spinlock_t *lock)
{
unsigned char *buffer;
struct kfifo *ret;
if (!is_power_of_2(size))
{
BUG_ON(size > 0x80000000);
size = roundup_pow_of_two(size);
}
buffer = kmalloc(size, gfp_mask);
if (!buffer)
return ERR_PTR(-ENOMEM);
ret = kfifo_init(buffer, size, gfp_mask, lock);
if (IS_ERR(ret))
kfree(buffer);
return ret;
}
void kfifo_free(struct kfifo *fifo)
{
kfree(fifo->buffer);
kfree(fifo);
}
static inline unsigned int kfifo_put(struct kfifo *fifo, const unsigned char *buffer, unsigned int len)
{
unsigned long flags;
unsigned int ret;
spin_lock_irqsave(fifo->lock, flags);
ret = __kfifo_put(fifo, buffer, len);
spin_unlock_irqrestore(fifo->lock, flags);
return ret;
}
static inline unsigned int kfifo_get(struct kfifo *fifo, unsigned char *buffer, unsigned int len)
{
unsigned long flags;
unsigned int ret;
spin_lock_irqsave(fifo->lock, flags);
ret = __kfifo_get(fifo, buffer, len);
if (fifo->in == fifo->out)
fifo->in = fifo->out = 0;
spin_unlock_irqrestore(fifo->lock, flags);
return ret;
}
unsigned int __kfifo_put(struct kfifo *fifo, const unsigned char *buffer, unsigned int len)
{
unsigned int l;
len = min(len, fifo->size - fifo->in + fifo->out);
smp_mb();
l = min(len, fifo->size - (fifo->in & (fifo->size - 1)));
memcpy(fifo->buffer + (fifo->in & (fifo->size - 1)), buffer, l);
memcpy(fifo->buffer, buffer + l, len - l);
smp_wmb();
fifo->in += len;
return len;
}
unsigned int __kfifo_get(struct kfifo *fifo, unsigned char *buffer, unsigned int len)
{
unsigned int l;
len = min(len, fifo->in - fifo->out);
smp_rmb();
l = min(len, fifo->size - (fifo->out & (fifo->size - 1)));
memcpy(buffer, fifo->buffer + (fifo->out & (fifo->size - 1)), l);
memcpy(buffer + l, fifo->buffer, len - l);
smp_mb();
fifo->out += len;
return len;
}
static inline void __kfifo_reset(struct kfifo *fifo)
{
fifo->in = fifo->out = 0;
}
static inline void kfifo_reset(struct kfifo *fifo)
{
unsigned long flags;
spin_lock_irqsave(fifo->lock, flags);
__kfifo_reset(fifo);
spin_unlock_irqrestore(fifo->lock, flags);
}
static inline unsigned int __kfifo_len(struct kfifo *fifo)
{
return fifo->in - fifo->out;
}
static inline unsigned int kfifo_len(struct kfifo *fifo)
{
unsigned long flags;
unsigned int ret;
spin_lock_irqsave(fifo->lock, flags);
ret = __kfifo_len(fifo);
spin_unlock_irqrestore(fifo->lock, flags);
return ret;
}
总结: Linux内核中有spin_lock、spin_lock_irq和spin_lock_irqsave保证同步。
static inline void __raw_spin_lock(raw_spinlock_t *lock)
{
preempt_disable();
spin_acquire(&lock->dep_map, 0, 0, _RET_IP_);
LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock);
}
static inline void __raw_spin_lock_irq(raw_spinlock_t *lock)
{
local_irq_disable();
preempt_disable();
spin_acquire(&lock->dep_map, 0, 0, _RET_IP_);
LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock);
}
spin_lock比spin_lock_irq速度快,但并不是线程安全的。spin_lock_irq增加调用local_irq_disable函数,即禁止本地中断,是线程安全的,既禁止本地中断,又禁止内核抢占。 spin_lock_irqsave是基于spin_lock_irq实现的一个辅助接口,在进入和离开临界区后,不会改变中断的开启、关闭状态。 如果自旋锁在中断处理函数中被用到,在获取自旋锁前需要关闭本地中断,spin_lock_irqsave实现如下: A、保存本地中断状态; B、关闭本地中断; C、获取自旋锁。 解锁时通过 spin_unlock_irqrestore完成释放锁、恢复本地中断到原来状态等工作。 (3)线性代码结构 代码中没有任何if-else分支来判断是否有足够的空间存放数据,kfifo每次入队或出队只是简单的 +len 判断剩余空间,并没有对kfifo->size 进行取模运算,所以kfifo->in和kfifo->out总是一直增大,直到unsigned in超过最大值时绕回到0这一起始端,但始终满足:kfifo->in - kfifo->out <= kfifo->size。 (4)使用Memory Barrier mb():适用于多处理器和单处理器的内存屏障。 rmb():适用于多处理器和单处理器的读内存屏障。 wmb():适用于多处理器和单处理器的写内存屏障。 smp_mb():适用于多处理器的内存屏障。 smp_rmb():适用于多处理器的读内存屏障。 smp_wmb():适用于多处理器的写内存屏障。 Memory Barrier使用场景如下: A、实现同步原语(synchronization primitives) B、实现无锁数据结构(lock-free data structures) C、驱动程序 程序在运行时内存实际访问顺序和程序代码编写的访问顺序不一定一致,即内存乱序访问。内存乱序访问行为出现是为了提升程序运行时的性能。内存乱序访问主要发生在两个阶段: A、编译时,编译器优化导致内存乱序访问(指令重排)。 B、运行时,多CPU间交互引起内存乱序访问。 Memory Barrier能够让CPU或编译器在内存访问上有序。Memory barrier前的内存访问操作必定先于其后的完成。Memory Barrier包括两类: A、编译器Memory Barrier。 B、CPU Memory Barrier。 通常,编译器和CPU引起内存乱序访问不会带来问题,但如果程序逻辑的正确性依赖于内存访问顺序,内存乱序访问会带来逻辑上的错误。 在编译时,编译器对代码做出优化时可能改变实际执行指令的顺序(如GCC的O2或O3都会改变实际执行指令的顺序)。 在运行时,CPU虽然会乱序执行指令,但在单个CPU上,硬件能够保证程序执行时所有的内存访问操作都是按程序代码编写的顺序执行的,Memory Barrier没有必要使用(不考虑编译器优化)。为了更快执行指令,CPU采取流水线的执行方式,编译器在编译代码时为了使指令更适合CPU的流水线执行方式以及多CPU执行,原本指令就会出现乱序的情况。在乱序执行时,CPU真正执行指令的顺序由可用的输入数据决定,而非程序员编写的顺序。
|