Hey ReadAndWrite
Victory won’t come to me unless I go to it.–胜利是不会向我们走来的,我必须自己走向胜利.
有个需求,多线程并发访问某一临界资源,规定每次读1秒,写1秒,两个线程对其进行写操作,八个线程对其读操作,那么为了保证原子性,肯定会对其进行加锁,那么有多少种方式呢?
第一种 通过 重入锁 Lock 互斥锁 进行完成.
爱码如下:
public class TestSafeList {
public static void main(String[] args) {
// 模拟多线程并发访问
// 创建线程池
ExecutorService es = Executors.newFixedThreadPool(10);
TestSafeList tsl = new TestSafeList();
long start = System.currentTimeMillis();
Runnable task1 = new Runnable() {
@Override
public void run() {
tsl.setAge(10);
}
};
Runnable task2 = new Runnable() {
@Override
public void run() {
tsl.getAge();
}
};
// 进行 多线程写
for (int i = 0; i <2 ; i++) {
es.submit(task1);
}
// 多线程读
for (int i = 0; i <8 ; i++) {
es.submit(task2);
}
// 关闭线程
es.shutdown();
while (!es.isTerminated()){
System.out.println("Running Execute....");
}
System.out.println(System.currentTimeMillis() - start);
}
private Lock locker = new ReentrantLock();
private Integer age;
public Integer getAge() {
locker.lock();
try {
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
return this.age;
} finally {
locker.unlock();
}
}
public void setAge(Integer age) {
locker.lock();
try {
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
this.age = age;
} finally {
locker.unlock();
}
}
}
测试如下:
Running Execute....
Running Execute....
....
Running Execute....
Running Execute....
Running Execute....
Running Execute....
Running Execute....
Running Execute....
Running Execute....
10262
大约 10 秒 左右 ,那么有什么好办法能够缩短时间吗? 当然有,因为我们的需求是 2 个写 ,8 个读,那么只保证 写写锁互斥就够了,读读锁可以是没有的.所以java 为我们提供一个ReentrantReadWriteLock 读写锁,该锁在jdk 1.5发布的,位于 java.util.concurrent.locks 的包下.
第二种: 读写锁:ReentrantReadWriteLock
爱码如下:
public class TestSafeList1 {
public static void main(String[] args) {
// 模拟多线程并发访问
// 创建线程池
ExecutorService es = Executors.newFixedThreadPool(10);
TestSafeList1 tsl = new TestSafeList1();
long start = System.currentTimeMillis();
Runnable task1 = new Runnable() {
@Override
public void run() {
tsl.setAge(10);
}
};
Runnable task2 = new Runnable() {
@Override
public void run() {
tsl.getAge();
}
};
// 进行 多线程写
for (int i = 0; i <2 ; i++) {
es.submit(task1);
}
// 多线程读
for (int i = 0; i <8 ; i++) {
es.submit(task2);
}
// 关闭线程
es.shutdown();
while (!es.isTerminated()){
System.out.println("Running Execute....");
}
System.out.println(System.currentTimeMillis() - start);
}
// 读写锁
private ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
// 读锁
private ReentrantReadWriteLock.ReadLock rl = rwl.readLock();
// 写锁
private ReentrantReadWriteLock.WriteLock wl = rwl.writeLock();
private Integer age;
public Integer getAge() {
rl.lock();
try {
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
return age;
} finally {
rl.unlock();
}
}
public void setAge(Integer age) {
wl.lock();
try {
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
this.age = age;
} finally {
wl.unlock();
}
}
}
结果如下:
Running Execute....
Running Execute....
Running Execute....
Running Execute....
...
Running Execute....
Running Execute....
Running Execute....
Running Execute....
Running Execute....
3047
根据结果可知 比第一种方式的 互斥锁效率高了3倍之多,就是因为 没有了读读互斥锁,所以效率提高了. 那效率这么高了,究竟还有没有更高的呢?当然,答案是有的,并且还没有加锁,线程安全.
第三种:使用ConcurrentLinkedQueue 队列 它实现了 Queue队列接口,在java.util.concurrent包下,发布于 jdk1.5版本.
1 线程安全,可高效读写队列,高并发下性能最好的队列.
2 无锁,通过CAS比较交换算法,修改的方法包括三个核心参数(V,E,N);
3 V 要更新的变量,E 预期值,N 新值
4 只有当 V==E 时 ,V=N ;否则表示已经被更新过了.则取消当前操作.
测试代码如下:
Queue queue = new ConcurrentLinkedQueue();
queue.offer("1");//添加
queue.offer("2");// 添加
queue.poll();// 删除
queue.peek();// 得到 2
源码如下:
public boolean offer(E e) {
// 检查当前元素
checkNotNull(e);
// 创建节点
final Node<E> newNode = new Node<E>(e);
// 进行遍历
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
if (q == null) {
// p is last node
// 进行比较 值
if (p.casNext(null, newNode)) {
// Successful CAS is the linearization point
// for e to become an element of this queue,
// and for newNode to become "live".
// 进行CAS 算法交换
if (p != t) // hop two nodes at a time
casTail(t, newNode); // Failure is OK.
return true;
}
// Lost CAS race to another thread; re-read next
}
else if (p == q)
// We have fallen off list. If tail is unchanged, it
// will also be off-list, in which case we need to
// jump to head, from which all live nodes are always
// reachable. Else the new tail is a better bet.
p = (t != (t = tail)) ? t : head;
else
// Check for tail updates after two hops.
p = (p != t && t != (t = tail)) ? t : q;
}
CAS 算法
private boolean casTail(Node<E> cmp, Node<E> val) {
// 预期值 当前值 新值
return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);
}
以上就是 针对于读写的操作.