如何"锁",使读写效率更高?

基于读写锁ReentrantReadWriteLock实现并发访问

Posted by MasterJen on November 21, 2018

Hey ReadAndWrite

Victory won’t come to me unless I go to it.–胜利是不会向我们走来的,我必须自己走向胜利.

有个需求,多线程并发访问某一临界资源,规定每次读1秒,写1秒,两个线程对其进行写操作,八个线程对其读操作,那么为了保证原子性,肯定会对其进行加锁,那么有多少种方式呢?

第一种 通过 重入锁 Lock 互斥锁 进行完成.

爱码如下:

public class TestSafeList {
    public static void main(String[] args) {
//        模拟多线程并发访问
//        创建线程池
        ExecutorService es = Executors.newFixedThreadPool(10);
        TestSafeList tsl = new TestSafeList();
        long start = System.currentTimeMillis();
        Runnable task1 = new Runnable() {
            @Override
            public void run() {
              tsl.setAge(10);
            }
        };
        Runnable task2 = new Runnable() {
            @Override
            public void run() {
                tsl.getAge();
            }
        };
//     进行 多线程写
        for (int i = 0; i <2 ; i++) {
            es.submit(task1);
        }
//      多线程读
        for (int i = 0; i <8 ; i++) {
            es.submit(task2);
        }
//        关闭线程
        es.shutdown();

        while (!es.isTerminated()){
            System.out.println("Running Execute....");
        }
        System.out.println(System.currentTimeMillis() - start);

    }

    private Lock locker = new ReentrantLock();

    private Integer age;

    public Integer getAge() {
        locker.lock();
        try {
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return this.age;
        } finally {
            locker.unlock();
        }

    }

    public void setAge(Integer age) {
        locker.lock();
        try {
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            this.age = age;
        } finally {
            locker.unlock();
        }

    }
}

测试如下:

    Running Execute....
    Running Execute....
    ....
    Running Execute....
    Running Execute....
    Running Execute....
    Running Execute....
    Running Execute....
    Running Execute....
    Running Execute....
    10262

大约 10 秒 左右 ,那么有什么好办法能够缩短时间吗? 当然有,因为我们的需求是 2 个写 ,8 个读,那么只保证 写写锁互斥就够了,读读锁可以是没有的.所以java 为我们提供一个ReentrantReadWriteLock 读写锁,该锁在jdk 1.5发布的,位于 java.util.concurrent.locks 的包下.

第二种: 读写锁:ReentrantReadWriteLock

爱码如下:

     public class TestSafeList1 {
         public static void main(String[] args) {
     
     //        模拟多线程并发访问
     //        创建线程池
             ExecutorService es = Executors.newFixedThreadPool(10);
             TestSafeList1 tsl = new TestSafeList1();
             long start = System.currentTimeMillis();
             Runnable task1 = new Runnable() {
                 @Override
                 public void run() {
                     tsl.setAge(10);
                 }
             };
             Runnable task2 = new Runnable() {
                 @Override
                 public void run() {
                     tsl.getAge();
                 }
             };
     //     进行 多线程写
             for (int i = 0; i <2 ; i++) {
                 es.submit(task1);
             }
     //      多线程读
             for (int i = 0; i <8 ; i++) {
                 es.submit(task2);
             }
     //        关闭线程
             es.shutdown();
     
             while (!es.isTerminated()){
                 System.out.println("Running Execute....");
             }
             System.out.println(System.currentTimeMillis() - start);
     
         }
     // 读写锁
        private ReentrantReadWriteLock rwl =  new ReentrantReadWriteLock();
     //    读锁
         private ReentrantReadWriteLock.ReadLock rl = rwl.readLock();
     //    写锁
         private ReentrantReadWriteLock.WriteLock wl = rwl.writeLock();
     
     
         private Integer age;
     
         public Integer getAge() {
             rl.lock();
             try {
                 try {
                     Thread.sleep(1000L);
                 } catch (InterruptedException e) {
                     e.printStackTrace();
                 }
                 return age;
             } finally {
                 rl.unlock();
             }
     
         }
     
         public void setAge(Integer age) {
             wl.lock();
             try {
                 try {
                     Thread.sleep(1000L);
                 } catch (InterruptedException e) {
                     e.printStackTrace();
                 }
                 this.age = age;
             } finally {
                 wl.unlock();
             }
         }
     }

结果如下:

    Running Execute....
    Running Execute....
    Running Execute....
    Running Execute....
    ...
    Running Execute....
    Running Execute....
    Running Execute....
    Running Execute....
    Running Execute....
    3047

根据结果可知 比第一种方式的 互斥锁效率高了3倍之多,就是因为 没有了读读互斥锁,所以效率提高了. 那效率这么高了,究竟还有没有更高的呢?当然,答案是有的,并且还没有加锁,线程安全.

第三种:使用ConcurrentLinkedQueue 队列 它实现了 Queue队列接口,在java.util.concurrent包下,发布于 jdk1.5版本.

  1 线程安全,可高效读写队列,高并发下性能最好的队列.
  2 无锁,通过CAS比较交换算法,修改的方法包括三个核心参数(V,E,N);
  3 V 要更新的变量,E 预期值,N 新值
  4 只有当 V==E 时 ,V=N ;否则表示已经被更新过了.则取消当前操作.

测试代码如下:

       Queue queue = new ConcurrentLinkedQueue();
        queue.offer("1");//添加
        queue.offer("2");// 添加
        queue.poll();// 删除
        queue.peek();// 得到 2

源码如下:

     public boolean offer(E e) {
     // 检查当前元素
            checkNotNull(e);
            // 创建节点
            final Node<E> newNode = new Node<E>(e);
        // 进行遍历 
            for (Node<E> t = tail, p = t;;) {
                Node<E> q = p.next;
                if (q == null) {
                    // p is last node
                    // 进行比较 值
                    if (p.casNext(null, newNode)) {
                        // Successful CAS is the linearization point
                        // for e to become an element of this queue,
                        // and for newNode to become "live".
                        //  进行CAS 算法交换
                        if (p != t) // hop two nodes at a time
                            casTail(t, newNode);  // Failure is OK.
                        return true;
                    }
                    // Lost CAS race to another thread; re-read next
                }
                else if (p == q)
                    // We have fallen off list.  If tail is unchanged, it
                    // will also be off-list, in which case we need to
                    // jump to head, from which all live nodes are always
                    // reachable.  Else the new tail is a better bet.
                    p = (t != (t = tail)) ? t : head;
                else
                    // Check for tail updates after two hops.
                    p = (p != t && t != (t = tail)) ? t : q;
            }     
            
            CAS 算法
            
            
private boolean casTail(Node<E> cmp, Node<E> val) {
//  预期值 当前值  新值   
    return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);
}

以上就是 针对于读写的操作.