java并发

Posted by Xsp on September 7, 2018

马士兵老师高并发编程系列 学习: https://www.bilibili.com/video/av11076511/

源代码:https://github.com/EduMoral/edu/blob/master/concurrent/src/yxxy/

001

public class T {

	private int count = 10;
	private Object o = new Object();

	public void m() {
		synchronized(o) { //任何线程要执行下面的代码,必须先拿到o的锁
			count--;
			System.out.println(Thread.currentThread().getName() + " count = " + count);
		}
	}
}

这里 Object o = new Object()在堆内存创建一个对象,o 则是指向这个对象的引用,synchronized(o) 会对 o 这个对象加锁,注意这个锁是加载 堆内存的对象里面的,不是 o 这个引用。synchronized 加的是互斥锁。

002

synchronized(this) { //任何线程要执行下面的代码,必须先拿到o的锁
    count--;
    System.out.println(Thread.currentThread().getName() + " count = " + count);
}

也可以这样写,this 表示对当前对象加锁。

004

public synchronized static void m() { //这里等同于synchronized(yxxy.c_004.T.class)
    count--;
    System.out.println(Thread.currentThread().getName() + " count = " + count);
}

public static void mm() {
    synchronized(T.class) { //考虑一下这里写synchronized(this)是否可以?
        count --;
    }
}

对于synchronized static方法是对类对象加锁,相当于 synchronized(T.class)

005

public class T implements Runnable {
    static CountDownLatch c = new CountDownLatch(1);
    private int count = 0;

    public static void main(String[] args) throws InterruptedException {
        T t = new T();
        for (int i = 0; i < 500; i++) {
            new Thread(t, "THREAD" + i).start();
        }

        c.await();

        System.out.println(t.count);
    }

    public /*synchronized*/ void run() {
        count++;
        c.countDown();
        System.out.println(Thread.currentThread().getName() + " count = " + count);
    }
}

以上代码的问题:

  1. count++; 不是原子操作,所以 main 函数最终输出的值可能不是 500,可能比500 小(不容易出现…)。可以对 count 加volatile解决。
  2. run 方法没有同步,所以 count++ 后的值,可能和 run里输出的count值不一致,比如三个线程可能同时执行了 count++, 然后就都输出 3。解决就是对 run 加 synchronized 同步。

007

// 同步和非同步方法同时调用

public class T {
    public static void main(String[] args) {
        T t = new T();

      /*new Thread(()->t.m1(), "t1").start();
      new Thread(()->t.m2(), "t2").start();*/

        // 学习了,这种引用方法的方式
        new Thread(t::m1, "t1").start();
        new Thread(t::m2, "t2").start();

      /*
      new Thread(new Runnable() {
         @Override
         public void run() {
            t.m1();
         }
      });
      */
    }

    public synchronized void m1() {
        System.out.println(Thread.currentThread().getName() + " m1 start...");

        try {
            TimeUnit.SECONDS.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println(Thread.currentThread().getName() + " m1 end");
    }

    public void m2() {
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println(Thread.currentThread().getName() + " m2 ");
    }
}

t::m1 是一种lambda表达式的简写java8语法,https://docs.oracle.com/javase/tutorial/java/javaOO/methodreferences.html

008

/**
 * 对业务写方法加锁
 * 对业务读方法不加锁
 * 容易产生脏读问题(dirtyRead)
 */


import java.util.concurrent.TimeUnit;

public class Account {
    String name;
    double balance;

    public static void main(String[] args) {
        Account a = new Account();
        new Thread(() -> a.set("zhangsan", 100.0)).start();

        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println(a.getBalance("zhangsan"));

        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println(a.getBalance("zhangsan"));
    }

    public synchronized void set(String name, double balance) {
        this.name = name;
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        this.balance = balance;
    }

    public /*synchronized*/ double getBalance(String name) {
        return this.balance;
    }
}

读的时候也要注意加锁。可以用读写锁 ReadWriteLock

009

synchronized获得的锁是可重入的

那和 reentrantlock 的区别?

活锁?

线程通信的两种方式:

  1. 共享内存
  2. 线程之间互相发消息

010

// 子类调用父类的同步方法
public class T {
    synchronized void m() {
        System.out.println("m start");
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(this);
        System.out.println("m end");
    }

    public static void main(String[] args) {
        new TT().m();
    }

}

class TT extends T {
    @Override
    synchronized void m() {
        System.out.println("child m start");
        System.out.println(this);
        super.m();
        System.out.println("child m end");
    }
}

这里也是可以的。注意这里的两个 synchronized 加锁的对象都是 this ,是 TT 对象。为啥?跟 T 对象没关系吗?

011

public class T {
    int count = 0;

    public static void main(String[] args) {
        T t = new T();
        Runnable r = new Runnable() {

            @Override
            public void run() {
                t.m();
            }

        };
        new Thread(r, "t1").start();

        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        new Thread(r, "t2").start();
    }

    synchronized void m() {
        System.out.println(Thread.currentThread().getName() + " start");
        while (true) {
            count++;
            System.out.println(Thread.currentThread().getName() + " count = " + count);
            try {
                TimeUnit.SECONDS.sleep(1);

            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            if (count == 5) {
                int i = 1 / 0; //此处抛出异常,锁将被释放,要想不被释放,可以在这里进行catch,然后让循环继续
            }
        }
    }
}

synchronized锁 可以被 异常 释放。可以加上try catch。修改如下:

try {
    int i = 1 / 0; //此处抛出异常,锁将被释放,要想不被释放,可以在这里进行catch,然后让循环继续
} catch (Exception e) {
    e.printStackTrace();
    continue;
}

012

public class T {
    int i = 0;
    boolean running = true;

    public static void main(String[] args) {
        T t = new T();

        new Thread(t::m, "t1").start();

        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        t.running = false;
    }

    void m() {
        while (running) {
            i++;
//            System.out.println(1);
        }

    }
}

以上代码,因为 running 变量没有加 volatile,所以 m 方法执行后会死循环。

但是while循环里加上一句 System.out.println 则不会死循环了?因为 System.out.println 是比较耗时的操作。

一种解释:当while循环里的操作的耗时非常短的时候,短时间内while执行很多次,触发JIT编译,对代码优化,优化后,读的就是缓存的值。

  • https://t.zsxq.com/aiei2rn
  • https://www.bilibili.com/video/av11076511 60:00

013

public class T {
    volatile int count = 0;
    public static void main(String[] args) {
        T t = new T();
        List<Thread> threads = new ArrayList<>();

        for (int i = 0; i < 10; i++) {
            threads.add(new Thread(t::m, "thread-" + i));
        }

        threads.forEach((o) -> o.start());
        threads.forEach((o) -> {
            try {
                o.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        System.out.println(t.count);
    }

    void m() {
        for (int i = 0; i < 10000; i++) {
            count++;
        }
    }
}

volatile 虽然可以被认为是 轻量级的 synchronized,但是 volatile 确实功能没有synchronized 多。对于如上代码,写的情况,用volatile就不行。

volatile并不能保证多个线程共同修改running变量时所带来的不一致问题,也就是说volatile不能替代synchronized。

volatile保证了可见性,但是不保证原子性。

volatile 能保证线程每次读的时候是最新的值,但是当写的时候,可能又覆盖了其他线程写的值。

上面的代码,如果不加volatile ,理论上应该 count 值更小。

http://www.hollischuang.com/archives/2648

015

public class T {
    AtomicInteger count = new AtomicInteger(0);

    public static void main(String[] args) {
        T t = new T();
        List<Thread> threads = new ArrayList<>();

        for (int i = 0; i < 10; i++) {
            threads.add(new Thread(t::m, "thread-" + i));
        }

        threads.forEach((o) -> o.start());
        threads.forEach((o) -> {
            try {
                o.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        System.out.println(t.count);
    }

    void m() {
        for (int i = 0; i < 10000; i++) {
            if (count.get() < 1000) {
                count.incrementAndGet(); //count++
            }
        }
    }
}

AtomXXX类本身方法都是原子性的,但不能保证多个方法连续调用是原子性的。如上代码,m 循环里 get 和 incrementAndGet 都是原子操作,但是这么个组合操作就不了。(怎么构造出这样的问题?读 & 写)

注意这些原子类对应的操作也是可以保证可见性的。

017

public class T {
    Object o = new Object();
    public static void main(String[] args) {
        T t = new T();
        //启动第一个线程
        new Thread(t::m, "t1").start();
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //创建第二个线程
        Thread t2 = new Thread(t::m, "t2");
        // 锁对象发生改变,所以t2线程得以执行,如果注释掉这句话,线程2将永远得不到执行机会
        t.o = new Object();
        t2.start();
    }

    void m() {
        synchronized (o) {
            while (true) {
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName());
            }
        }
    }
}

修改加锁对象,以获得锁。。。

018

public class T {
    String s1 = "Hello";
    String s2 = "Hello";

    void m1() {
        synchronized (s1) {
        }
    }

    void m2() {
        synchronized (s2) {
        }
    }
}

s1 , s2 用的都是常量池的同一个对象。

同理还有 Integer.valueOf() 缓存

019

实现一个容器,提供两个方法,add,size。写两个线程,线程1添加10个元素到容器中,线程2实现监控元素的个数,当个数到5个时,线程2给出提示并结束。

public class MyContainer1 {

    List lists = new ArrayList();
    public static void main(String[] args) {
        MyContainer1 c = new MyContainer1();

        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                c.add(new Object());
                System.out.println("add " + i);

                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "t1").start();

        new Thread(() -> {
            while (true) {
                if (c.size() == 5) {
                    break;
                }
            }
            System.out.println("t2 结束");
        }, "t2").start();
    }

    public void add(Object o) {
        lists.add(o);
    }

    public int size() {
        return lists.size();
    }
}

实现1,无法实现功能,因为编译器优化,上面的代码 while (true) 会一直死循环。

实现2:

// 添加volatile,使t2能够得到通知
volatile List lists = new ArrayList();

也可以在while(true) 循环体里加一段 System.out.println(c.size()); ,可以阻止编译器优化,也能达到目的。

public class MyContainer3 {

    //添加volatile,使t2能够得到通知
    volatile List lists = new ArrayList();

    public static void main(String[] args) {
        MyContainer3 c = new MyContainer3();

        final Object lock = new Object();

        new Thread(() -> {
            synchronized (lock) {
                System.out.println("t2启动");
                if (c.size() != 5) {
                    try {
                        lock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println("t2 结束");
            }

        }, "t2").start();

        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }

        new Thread(() -> {
            System.out.println("t1启动");
            synchronized (lock) {
                for (int i = 0; i < 10; i++) {
                    c.add(new Object());
                    System.out.println("add " + i);

                    if (c.size() == 5) {
                        lock.notify();
                    }

                    try {
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }, "t1").start();
    }

    public void add(Object o) {
        lists.add(o);
    }

    public int size() {
        return lists.size();
    }
}

实现3:使用 wait/notify。如上代码。

注意这两个方法需要在 synchronized 方法或者 synchronized 块儿中才能调用 否则会报错 java.lang.IllegalMonitorStateException,注意wait会立即释放锁,但是notify不会,等到 synchronized 语句块执行完了,才会释放。

另外,由于notify 不会立即释放锁,所以 上面的代码,t1 会等t2执行完了才输出。

实现4:

if(c.size() != 5) {
    try {
        lock.wait();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}
System.out.println("t2 结束");
//通知t1继续执行
lock.notify();

// --------------------------------------
if(c.size() == 5) {
    lock.notify();
    //释放锁,让t2得以执行
    try {
        lock.wait();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

如上代码所示,既然 notify 不能立即释放锁,那就 再用一个 wait。但是这种实现很不优雅。

实现5,CountDownLatch:

public class MyContainer5 {

    // 添加volatile,使t2能够得到通知
    volatile List lists = new ArrayList();

    public static void main(String[] args) {
        MyContainer5 c = new MyContainer5();

        CountDownLatch latch = new CountDownLatch(1);

        new Thread(() -> {
            System.out.println("t2启动");
            if (c.size() != 5) {
                try {
                    latch.await();

                    //也可以指定等待时间
                    //latch.await(5000, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println("t2 结束");

        }, "t2").start();

        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }

        new Thread(() -> {
            System.out.println("t1启动");
            for (int i = 0; i < 10; i++) {
                c.add(new Object());
                System.out.println("add " + i);

                if (c.size() == 5) {
                    // 打开门闩,让t2得以执行
                    latch.countDown();
                }

                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

        }, "t1").start();

    }

    public void add(Object o) {
        lists.add(o);
    }

    public int size() {
        return lists.size();
    }
}

实现6,cyclicBarrier:

public static void main(String[] args) {
    MyContainer5 c = new MyContainer5();

    CyclicBarrier cyclicBarrier = new CyclicBarrier(2);

    new Thread(() -> {
        System.out.println("t2启动");
        try {
            cyclicBarrier.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
        System.out.println("t2 结束");

    }, "t2").start();

    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e1) {
        e1.printStackTrace();
    }

    new Thread(() -> {
        System.out.println("t1启动");
        for (int i = 0; i < 10; i++) {
            c.add(new Object());
            System.out.println("add " + i);
            if (c.size() == 5) {
                try {
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }

            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    }, "t1").start();
}

实现7,semaphore :

public static void main(String[] args) {
    MyContainer5 c = new MyContainer5();

    Semaphore semaphore = new Semaphore(1);
    new Thread(() -> {
        try {
            semaphore.acquire();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("t1启动");
        for (int i = 0; i < 10; i++) {
            c.add(new Object());
            System.out.println("add " + i);

            if (c.size() == 5) {
                semaphore.release();
            }

            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    }, "t1").start();

    new Thread(() -> {
        System.out.println("t2启动");
        try {
            TimeUnit.SECONDS.sleep(1);
            semaphore.acquire();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("t2 结束");

        semaphore.release();
    }, "t2").start();

}

semaphore 是用来 限制线程数的,这里如果使用semaphore 来满足要求的话,需要先启动t1,即让t1先获得许可。

当不涉及同步,只是涉及线程通信的时候,用synchronized + wait/notify就显得太重了。这时应该考虑countdownlatch/cyclicbarrier/semaphore

countdownlatch 内部实现用的还是

/**
* The synchronization state.
*/
private volatile int state;

020

ReentrantLock

用法1:

public class ReentrantLock2 {
    Lock lock = new ReentrantLock();

    public static void main(String[] args) {
        ReentrantLock2 rl = new ReentrantLock2();
        new Thread(rl::m1).start();
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        new Thread(rl::m2).start();
    }

    void m1() {
        try {
            lock.lock(); //synchronized(this)
            for (int i = 0; i < 10; i++) {
                TimeUnit.SECONDS.sleep(1);

                System.out.println(i);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    void m2() {
        lock.lock();
        System.out.println("m2 ...");
        lock.unlock();
    }
}

可重入锁,注意这个加锁需要手动释放。

和 synchronized区别?

  • 使用 synchronized 锁定的话如果遇到异常,jvm会自动释放锁,但是lock必须手动释放锁,因此经常在finally中进行锁的释放。
  • ReentrantLock 可以尝试锁 tryLock。

用法2

// tryLock
public class ReentrantLock3 {
    Lock lock = new ReentrantLock();

    public static void main(String[] args) {
        ReentrantLock3 rl = new ReentrantLock3();
        new Thread(rl::m1).start();
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        new Thread(rl::m2).start();
    }

    void m1() {
        try {
            lock.lock();
            for (int i = 0; i < 10; i++) {
                TimeUnit.SECONDS.sleep(1);

                System.out.println(i);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    /**
     * 使用tryLock进行尝试锁定,不管锁定与否,方法都将继续执行
     * 可以根据tryLock的返回值来判定是否锁定
     * 也可以指定tryLock的时间,由于tryLock(time)抛出异常,所以要注意unclock的处理,必须放到finally中
     */
    void m2() {
      /*
      boolean locked = lock.tryLock();
      System.out.println("m2 ..." + locked);
      if(locked) lock.unlock();
      */

        boolean locked = false;

        try {
            locked = lock.tryLock(5, TimeUnit.SECONDS);
            System.out.println("m2 ..." + locked);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            if (locked) lock.unlock();
        }

    }
}

用法3:lockInterruptibly

可中断锁。

public class ReentrantLock4 {

    public static void main(String[] args) {
        ReentrantLock lock = new ReentrantLock();

        Thread t1 = new Thread(() -> {
            try {
                lock.lock();
                System.out.println("t1 start");
                TimeUnit.SECONDS.sleep(5);
                System.out.println("t1 end");
            } catch (InterruptedException e) {
                System.out.println("interrupted!");
            } finally {
                lock.unlock();
            }
        });
        t1.start();

        Thread t2 = new Thread(() -> {
            boolean locked = false;
            try {
                lock.lockInterruptibly(); //可以对interrupt()方法做出响应
                System.out.println("t2 start");
                TimeUnit.SECONDS.sleep(5);
                System.out.println("t2 end");
            } catch (InterruptedException e) {
                System.out.println("interrupted!");
            } finally {
                // 注意检查一下,否则报错 IllegalMonitorStateException
                if (lock.isHeldByCurrentThread()) {
                    lock.unlock();
                }
            }
        });
        t2.start();

        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        t2.interrupt(); //打断线程2的等待
    }
}

用法4,ReentrantLock还可以指定为公平锁

非公平锁,即锁释放后,所有的线程共同竞争锁,效率更高,所以默认是非公平锁。

public class ReentrantLock5 extends Thread {

    // 参数为true表示为公平锁,请对比输出结果
    private static ReentrantLock lock = new ReentrantLock(true);

    public static void main(String[] args) {
        ReentrantLock5 rl = new ReentrantLock5();
        Thread th1 = new Thread(rl);
        Thread th2 = new Thread(rl);
        th1.start();
        th2.start();
    }

    public void run() {
        for (int i = 0; i < 100; i++) {
            lock.lock();
            try {
                System.out.println(Thread.currentThread().getName() + "获得锁");
            } finally {
                lock.unlock();
            }
        }
    }
}

以上代码输出,th1 和 th2 交替获得锁。

021

面试题:写一个固定容量同步容器,拥有put和get方法,以及getCount方法,能够支持2个生产者线程以及10个消费者线程的阻塞调用

实现1:使用wait和notify/notifyAll来实现

public class MyContainer1<T> {
    final private LinkedList<T> lists = new LinkedList<>();
    final private int MAX = 10;
    private int count = 0;

    public static void main(String[] args) {
        MyContainer1<String> c = new MyContainer1<>();
        //启动消费者线程
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                for (int j = 0; j < 5; j++) System.out.println(c.get());
            }, "c" + i).start();
        }

        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        //启动生产者线程
        for (int i = 0; i < 2; i++) {
            new Thread(() -> {
                for (int j = 0; j < 25; j++) c.put(Thread.currentThread().getName() + " " + j);
            }, "p" + i).start();
        }
    }

    public synchronized void put(T t) {
        while (lists.size() == MAX) { //想想为什么用while而不是用if?
            try {
                this.wait(); //effective java
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        lists.add(t);
        ++count;
        this.notifyAll(); //通知消费者线程进行消费
    }

    public synchronized T get() {
        T t = null;
        while (lists.size() == 0) {
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        t = lists.removeFirst();
        count--;
        this.notifyAll(); //通知生产者进行生产
        return t;
    }
}

需要注意的点:

  1. wait/notify 需要给方法加锁。

  2. 判断条件需要用 while ,不能用if。因为唤醒的时候,只有一个线程获得锁并继续往下运行,对于其他线程继续在wait等,如果等到获得锁的时候,继续往下运行,可能条件已经不满足了。

  3. notifyAll 通知后,其他线程还是要竞争锁。

实现2,ReentrantLock,Lock / Condition:

ReentrantLock 可以添加多个条件。

public class MyContainer2<T> {
    final private LinkedList<T> lists = new LinkedList<>();
    final private int MAX = 10;

    private Lock lock = new ReentrantLock();
    private Condition producer = lock.newCondition();
    private Condition consumer = lock.newCondition();

    public static void main(String[] args) {
        MyContainer2<String> c = new MyContainer2<>();
        //启动消费者线程
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                for (int j = 0; j < 5; j++) {
                    System.out.println(c.get());
                }
            }, "c" + i).start();
        }

        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        //启动生产者线程
        for (int i = 0; i < 2; i++) {
            new Thread(() -> {
                for (int j = 0; j < 25; j++) {
                    c.put(Thread.currentThread().getName() + " " + j);
                }
            }, "p" + i).start();
        }
    }

    public void put(T t) {
        try {
            lock.lock();
            while (lists.size() == MAX) {
                producer.await();
            }

            lists.add(t);
            ++count;
            consumer.signalAll(); //通知消费者线程进行消费
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public T get() {
        T t = null;
        try {
            lock.lock();
            while (lists.size() == 0) {
                consumer.await();
            }
            t = lists.removeFirst();
            count--;
            producer.signalAll(); //通知生产者进行生产
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
        return t;
    }
}

022

ThreadLocal线程局部变量

ThreadLocal是使用空间换时间,synchronized是使用时间换空间

比如在hibernate中session就存在与ThreadLocal中,避免synchronized的使用

public class ThreadLocal2 {

    static ThreadLocal<Person> tl = new ThreadLocal<>();

    public static void main(String[] args) {

        new Thread(() -> {
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            System.out.println(tl.get());
        }).start();

        new Thread(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            tl.set(new Person());
        }).start();
    }

    static class Person {
        String name = "zhangsan";
    }
}

023

参考一个比较详细的:https://blog.csdn.net/cselmu9/article/details/51366946

/**
 * 线程安全的单例模式:
 * http://www.cnblogs.com/xudong-bupt/p/3433643.html
 */

import java.util.Arrays;

public class Singleton {

    private Singleton() {
        System.out.println("single");
    }

    public static Singleton getSingle() {
        return Inner.s;
    }

    public static void main(String[] args) {
        Thread[] ths = new Thread[200];
        for (int i = 0; i < ths.length; i++) {
            ths[i] = new Thread(() -> {
                System.out.println(Singleton.getSingle());
            });
        }
        Arrays.asList(ths).forEach(o -> o.start());
    }

    private static class Inner {
        private static Singleton s = new Singleton();
    }

}

024

/**
 * 有N张火车票,每张票都有一个编号
 * 同时有10个窗口对外售票
 * 请写一个模拟程序
 * <p>
 * 分析下面的程序可能会产生哪些问题?
 * 重复销售?超量销售?
 */
import java.util.ArrayList;
import java.util.List;

public class TicketSeller1 {
    static List<String> tickets = new ArrayList<>();

    static {
        for (int i = 0; i < 10000; i++) tickets.add("票编号:" + i);
    }

    public static void main(String[] args) {
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                while (tickets.size() > 0) {
                    System.out.println("销售了--" + tickets.remove(0));
                }
            }).start();
        }
    }
}

以上代码会出现重复销售的问题,因为 remove 操作不是原子的,在remove某个元素的过程中,某个元素还没从List中移除,其他线程也来操作了。

改进1:用vector

但是 以下这段代码还是会有问题。

while (tickets.size() > 0) {
	System.out.println("销售了--" + tickets.remove(0));
}

改进2:synchronized 锁

主要是 把 tickets.size() 和 tickets.remove() 这两个操作放到一个同步块。

while (true) {
    synchronized (tickets) {
        if (tickets.size() <= 0) break;

        try {
            TimeUnit.MILLISECONDS.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("销售了--" + tickets.remove(0));
    }
}

改进3:synchronized并发性不高,改用并发容器 ConcurrentLinkedQueue

static Queue<String> tickets = new ConcurrentLinkedQueue<>();
static {
    for (int i = 0; i < 1000; i++) tickets.add("票 编号:" + i);
}
public static void main(String[] args) {
    for (int i = 0; i < 10; i++) {
        new Thread(() -> {
            while (true) {
                String s = tickets.poll();
                if (s == null) break;
                else System.out.println("销售了--" + s);
            }
        }).start();
    }
}

poll 内部是CAS操作,性能高一些。虽然这个也能有上面说的问题,但是 就算s==null了,poll() 也不会出错,顶多就是再 取一次null值。

https://github.com/EduMoral/edu/blob/master/concurrent/src/yxxy/c_024/readme.txt

025

并发容器

ConcurrentHashMap

问题:

  1. 分段锁为什么分16

  2. 1.8 用CAS 替代了分段锁

02 ConcurrentSkipListMap

高并发且排序

CopyOnWriteArrayList

写时复制,读效率高,写效率低。

ConcurrentLinkedQueue

链表实现的队列 无界队列 并发加锁

LinkedBlockingQueue

阻塞式队列