博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
【Java多线程】JDK1.5并发包API杂谈
阅读量:6038 次
发布时间:2019-06-20

本文共 38295 字,大约阅读时间需要 127 分钟。

并发与并行

并发

一个或多个处理器执行更多的任务(通过划分时间片来执行更多的任务),从逻辑上实现同时运行:

463931-20170731203846193-1988782437.png

如,N个并发请求在一个两核CPU上:

463931-20170731205134490-2134013996.png

并行

N个处理器分别同时执行N个任务,从物理上实现同时运行:

463931-20170731204107630-1808100771.png

线程互斥

阻塞地加锁,通过ReentrantLock.lock()阻塞地加锁

阻塞地加锁的意义,在于在多线程环境下,同一时刻只有一个线程执行加锁代码,其他线程阻塞在加锁代码之前。

463931-20170731221621849-103899047.png

ReentrantLock继承LockLock接口提供了这些方法:

463931-20170711230342087-620153029.png

ReentrantLocksynchronized既相似,又有所不同,比如:

  • ReentrantLock支持公平非公平加锁,synchronized只支持非公平加锁
  • ReentrantLock支持非阻塞尝试获取锁,synchronized并不支持
  • ReentrantLock阻塞获取锁支持响应中断,而synchronized获取锁阻塞时不响应中断
package com.nicchagil.exercies.reentrantlock.lock;import java.util.concurrent.TimeUnit;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;import java.util.logging.Logger;public class LockExercise {        private static Logger logger = Logger.getLogger(LockExercise.class.getName());    private static Lock lock = new ReentrantLock();    public static void main(String[] args) {        new Thread(new Runnable() {            @Override            public void run() {                lock.lock();                try {                    logger.info(Thread.currentThread().getName() + " run.");                                        try {                        TimeUnit.SECONDS.sleep(3);                    } catch (InterruptedException e) {                        // TODO Auto-generated catch block                        e.printStackTrace();                    }                } finally {                    lock.unlock();                }            }        }).start();                new Thread(new Runnable() {            @Override            public void run() {                lock.lock();                try {                    logger.info(Thread.currentThread().getName() + " run.");                                        try {                        TimeUnit.SECONDS.sleep(3);                    } catch (InterruptedException e) {                        // TODO Auto-generated catch block                        e.printStackTrace();                    }                } finally {                    lock.unlock();                }            }        }).start();            }}

阻塞地加锁,通过synchronized阻塞地加锁

463931-20170731221621849-103899047.png

package com.nicchagil.exercies.reentrantlock.lock;import java.util.concurrent.TimeUnit;import java.util.logging.Logger;public class SynchronizedExercise {        private static Logger logger = Logger.getLogger(SynchronizedExercise.class.getName());    private static Object obj = new Object();    public static void main(String[] args) {        new Thread(new Runnable() {            @Override            public void run() {                synchronized (obj) {                    logger.info(Thread.currentThread().getName() + " run.");                                        try {                        TimeUnit.SECONDS.sleep(3);                    } catch (InterruptedException e) {                        // TODO Auto-generated catch block                        e.printStackTrace();                    }                }            }        }).start();                new Thread(new Runnable() {            @Override            public void run() {                synchronized (obj) {                    logger.info(Thread.currentThread().getName() + " run.");                                        try {                        TimeUnit.SECONDS.sleep(3);                    } catch (InterruptedException e) {                        // TODO Auto-generated catch block                        e.printStackTrace();                    }                }            }        }).start();            }}

获取锁阻塞时能响应中断

ReentrantLock使用lockInterruptibly()阻塞获取锁时,能响应中断

package com.nicchagil.exercies.reentrantlock.interruptibly;import java.util.concurrent.TimeUnit;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;import java.util.logging.Logger;public class LockInterruptiblyExercise {        private static Logger logger = Logger.getLogger(LockInterruptiblyExercise.class.getName());    public static void main(String[] args) {        Lock lock = new ReentrantLock(); // 声明可重入锁                lock.lock(); // 阻塞获取锁        logger.info("阻塞获取锁");        try {                        Thread t1 = new Thread(new Runnable() {                @Override                public void run() {                    try {                        lock.lockInterruptibly(); // 尝试获取锁                    } catch (InterruptedException e) {                        logger.info(Thread.currentThread().getName() + "获取锁被打断");                    }                }            });            t1.start();                        try {                TimeUnit.SECONDS.sleep(3);            } catch (InterruptedException e) {                // TODO Auto-generated catch block                e.printStackTrace();            }                        t1.interrupt(); // 打断线程                        try {                TimeUnit.SECONDS.sleep(3);            } catch (InterruptedException e) {                // TODO Auto-generated catch block                e.printStackTrace();            }                    } finally {            lock.unlock(); // 释放锁            logger.info("释放锁");        }    }}

结果:

八月 01, 2017 1:59:33 下午 com.nicchagil.exercies.reentrantlock.interruptibly.LockInterruptiblyExercise main信息: 阻塞获取锁八月 01, 2017 1:59:36 下午 com.nicchagil.exercies.reentrantlock.interruptibly.LockInterruptiblyExercise$1 run信息: Thread-1获取锁被打断八月 01, 2017 1:59:39 下午 com.nicchagil.exercies.reentrantlock.interruptibly.LockInterruptiblyExercise main信息: 释放锁

synchronized阻塞获取锁时不响应中断

package com.nicchagil.exercies.reentrantlock.interruptibly;import java.util.concurrent.TimeUnit;import java.util.logging.Logger;public class SyncInterruptiblyExercise {        private static Logger logger = Logger.getLogger(SyncInterruptiblyExercise.class.getName());    private static Object obj = new Object();    /**     * 测试synchronized获取锁时被打断是否抛出InterruptedException     * 结果:     * 七月 12, 2017 9:30:42 下午 com.nicchagil.exercies.reentrantlock.interruptibly.SyncInterruptiblyExercise main     * 信息: 阻塞获取锁     * 七月 12, 2017 9:30:48 下午 com.nicchagil.exercies.reentrantlock.interruptibly.SyncInterruptiblyExercise main     * 信息: 释放锁     */    public static void main(String[] args) {        synchronized (obj) {            logger.info("阻塞获取锁");                        Thread t1 = new Thread(new Runnable() {                @Override                public void run() {                    try {                        synchronized (obj) {                                                    }                    } catch (Exception e) {                        logger.info(Thread.currentThread().getName() + "获取锁被打断");                    }                }            });            t1.start();                        try {                TimeUnit.SECONDS.sleep(3);            } catch (InterruptedException e) {                // TODO Auto-generated catch block                e.printStackTrace();            }                        t1.interrupt(); // 打断线程                        try {                TimeUnit.SECONDS.sleep(3);            } catch (InterruptedException e) {                // TODO Auto-generated catch block                e.printStackTrace();            }                        logger.info("释放锁");        }    }}

结果:

八月 01, 2017 2:01:11 下午 com.nicchagil.exercies.reentrantlock.interruptibly.SyncInterruptiblyExercise main信息: 阻塞获取锁八月 01, 2017 2:01:17 下午 com.nicchagil.exercies.reentrantlock.interruptibly.SyncInterruptiblyExercise main信息: 释放锁

读写锁,ReentrantReadWriteLock

加上写锁后,无论读锁还是写锁均堵塞:

package com.nicchagil.exercies.reentrantreadwritelock;import java.util.concurrent.TimeUnit;import java.util.concurrent.locks.ReentrantReadWriteLock;import java.util.logging.Logger;public class ReentrantReadWriteLockWriteLockExercise {        private static Logger logger = Logger.getLogger(ReentrantReadWriteLockWriteLockExercise.class.getName());        private static ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();    public static void main(String[] args) {                /* 先加写锁 */        new Thread(new Runnable() {            @Override            public void run() {                reentrantReadWriteLock.writeLock().lock();                logger.info(Thread.currentThread().getName() + "加写锁");                try {                    TimeUnit.SECONDS.sleep(10);                } catch (InterruptedException e) {                    e.printStackTrace();                } finally {                    reentrantReadWriteLock.writeLock().unlock();                    logger.info(Thread.currentThread().getName() + "解写锁");                }            }        }).start();                try {            TimeUnit.SECONDS.sleep(1);        } catch (InterruptedException e1) {            e1.printStackTrace();        }                /* 然后加写锁 */        new Thread(new Runnable() {            @Override            public void run() {                reentrantReadWriteLock.writeLock().lock();                logger.info(Thread.currentThread().getName() + "加写锁");                try {                    TimeUnit.SECONDS.sleep(3);                } catch (InterruptedException e) {                    e.printStackTrace();                } finally {                    reentrantReadWriteLock.writeLock().unlock();                    logger.info(Thread.currentThread().getName() + "解写锁");                }            }        }).start();                /* 然后加读锁 */        new Thread(new Runnable() {            @Override            public void run() {                reentrantReadWriteLock.readLock().lock();                logger.info(Thread.currentThread().getName() + "加读锁");                try {                    TimeUnit.SECONDS.sleep(3);                } catch (InterruptedException e) {                    e.printStackTrace();                } finally {                    reentrantReadWriteLock.readLock().unlock();                    logger.info(Thread.currentThread().getName() + "解读锁");                }            }        }).start();            }}

结果:

八月 01, 2017 1:42:44 下午 com.nicchagil.exercies.reentrantreadwritelock.ReentrantReadWriteLockWriteLockExercise$1 run信息: Thread-1加写锁八月 01, 2017 1:42:54 下午 com.nicchagil.exercies.reentrantreadwritelock.ReentrantReadWriteLockWriteLockExercise$1 run信息: Thread-1解写锁八月 01, 2017 1:42:54 下午 com.nicchagil.exercies.reentrantreadwritelock.ReentrantReadWriteLockWriteLockExercise$2 run信息: Thread-2加写锁八月 01, 2017 1:42:57 下午 com.nicchagil.exercies.reentrantreadwritelock.ReentrantReadWriteLockWriteLockExercise$2 run信息: Thread-2解写锁八月 01, 2017 1:42:57 下午 com.nicchagil.exercies.reentrantreadwritelock.ReentrantReadWriteLockWriteLockExercise$3 run信息: Thread-3加读锁八月 01, 2017 1:43:00 下午 com.nicchagil.exercies.reentrantreadwritelock.ReentrantReadWriteLockWriteLockExercise$3 run信息: Thread-3解读锁

获取读锁后,再获取读锁不堵塞,但获取写锁堵塞:

package com.nicchagil.exercies.reentrantreadwritelock;import java.util.concurrent.TimeUnit;import java.util.concurrent.locks.ReentrantReadWriteLock;import java.util.logging.Logger;public class ReentrantReadWriteLockReadLockExercise {        private static Logger logger = Logger.getLogger(ReentrantReadWriteLockReadLockExercise.class.getName());        private static ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();    public static void main(String[] args) {                /* 先加读锁 */        new Thread(new Runnable() {            @Override            public void run() {                reentrantReadWriteLock.readLock().lock();                logger.info(Thread.currentThread().getName() + "加读锁");                try {                    TimeUnit.SECONDS.sleep(10);                } catch (InterruptedException e) {                    e.printStackTrace();                } finally {                    reentrantReadWriteLock.readLock().unlock();                    logger.info(Thread.currentThread().getName() + "解读锁");                }            }        }).start();                try {            TimeUnit.SECONDS.sleep(1);        } catch (InterruptedException e1) {            e1.printStackTrace();        }                /* 然后加读锁 */        new Thread(new Runnable() {            @Override            public void run() {                reentrantReadWriteLock.readLock().lock();                logger.info(Thread.currentThread().getName() + "加读锁");                try {                    TimeUnit.SECONDS.sleep(3);                } catch (InterruptedException e) {                    // TODO Auto-generated catch block                    e.printStackTrace();                } finally {                    reentrantReadWriteLock.readLock().unlock();                    logger.info(Thread.currentThread().getName() + "解读锁");                }            }        }).start();                /* 然后加写锁 */        new Thread(new Runnable() {            @Override            public void run() {                reentrantReadWriteLock.writeLock().lock();                logger.info(Thread.currentThread().getName() + "加写锁");                try {                    TimeUnit.SECONDS.sleep(3);                } catch (InterruptedException e) {                    e.printStackTrace();                } finally {                    reentrantReadWriteLock.writeLock().unlock();                    logger.info(Thread.currentThread().getName() + "解写锁");                }            }        }).start();                /* 然后加读锁 */        new Thread(new Runnable() {            @Override            public void run() {                reentrantReadWriteLock.readLock().lock();                logger.info(Thread.currentThread().getName() + "加读锁");                try {                    TimeUnit.SECONDS.sleep(3);                } catch (InterruptedException e) {                    e.printStackTrace();                } finally {                    reentrantReadWriteLock.readLock().unlock();                    logger.info(Thread.currentThread().getName() + "解读锁");                }            }        }).start();            }}

结果:

八月 01, 2017 1:44:06 下午 com.nicchagil.exercies.reentrantreadwritelock.ReentrantReadWriteLockReadLockExercise$1 run信息: Thread-1加读锁八月 01, 2017 1:44:07 下午 com.nicchagil.exercies.reentrantreadwritelock.ReentrantReadWriteLockReadLockExercise$2 run信息: Thread-2加读锁八月 01, 2017 1:44:10 下午 com.nicchagil.exercies.reentrantreadwritelock.ReentrantReadWriteLockReadLockExercise$2 run信息: Thread-2解读锁八月 01, 2017 1:44:16 下午 com.nicchagil.exercies.reentrantreadwritelock.ReentrantReadWriteLockReadLockExercise$1 run信息: Thread-1解读锁八月 01, 2017 1:44:16 下午 com.nicchagil.exercies.reentrantreadwritelock.ReentrantReadWriteLockReadLockExercise$3 run信息: Thread-3加写锁八月 01, 2017 1:44:19 下午 com.nicchagil.exercies.reentrantreadwritelock.ReentrantReadWriteLockReadLockExercise$3 run信息: Thread-3解写锁八月 01, 2017 1:44:19 下午 com.nicchagil.exercies.reentrantreadwritelock.ReentrantReadWriteLockReadLockExercise$4 run信息: Thread-4加读锁八月 01, 2017 1:44:22 下午 com.nicchagil.exercies.reentrantreadwritelock.ReentrantReadWriteLockReadLockExercise$4 run信息: Thread-4解读锁

阻塞与唤醒(线程间交互)

指定线程的阻塞与唤醒,LockSupport.park(Object blocker)

463931-20170731222627193-433174855.png

使用LockSupport.park()

package com.nicchagil.exercies.locksupportpart;import java.util.concurrent.TimeUnit;import java.util.concurrent.locks.LockSupport;import java.util.logging.Logger;public class PartUnpartExercise {        private static Logger logger = Logger.getLogger(PartUnpartExercise.class.getName());    public static void main(String[] args) {        Thread mainThread = Thread.currentThread();                /* 其他线程在30S后唤醒主线程 */        new Thread(new Runnable() {            @Override            public void run() {                try {                    TimeUnit.SECONDS.sleep(30);                } catch (InterruptedException e) {                    // TODO Auto-generated catch block                    e.printStackTrace();                }                                LockSupport.unpark(mainThread); // 唤醒                logger.info(Thread.currentThread().getName() + "唤醒" + mainThread.getName());            }        }).start();                logger.info(Thread.currentThread().getName() + "准备被阻塞");        LockSupport.park(); // 阻塞        logger.info(Thread.currentThread().getName() + "被唤醒,开始执行");    }}

使用LockSupport.park(Object blocker)

package com.nicchagil.exercies.locksupportpart;import java.util.concurrent.TimeUnit;import java.util.concurrent.locks.LockSupport;import java.util.logging.Logger;public class MyPartUnpartExercise {        private static Logger logger = Logger.getLogger(MyPartUnpartExercise.class.getName());    private static Object object = new Object();    public static void main(String[] args) {        Thread mainThread = Thread.currentThread();                /* 其他线程在30S后唤醒主线程 */        new Thread(new Runnable() {            @Override            public void run() {                try {                    TimeUnit.SECONDS.sleep(30);                } catch (InterruptedException e) {                    // TODO Auto-generated catch block                    e.printStackTrace();                }                                LockSupport.unpark(mainThread); // 唤醒                logger.info(Thread.currentThread().getName() + "唤醒" + mainThread.getName());            }        }).start();                logger.info(Thread.currentThread().getName() + "准备被阻塞");        LockSupport.park(object); // 阻塞        logger.info(Thread.currentThread().getName() + "被唤醒,开始执行");    }}

LockSupport.park()LockSupport.park(Object blocker)区别在于阻塞时是否有标识等待的对象,后者是JDK6添加的,可传入等待的对象。用jstack工具生成的线程快照的对比可见下图:

463931-20170715134140431-536591216.png

获得锁的线程阻塞和唤醒,Condition.await()、Condition.signal()或Object.wait()、Object.notify()

在获取锁的情况下,线程阻塞和唤醒可分别使用Condition.await()Condition.signal(),如果在没获得前下调用,会报异常java.lang.IllegalMonitorStateException

463931-20170731224346818-593710303.png

package com.nicchagil.exercies.condition;import java.util.concurrent.TimeUnit;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;import java.util.logging.Logger;public class ReentrantLockConditionExercise {        private static Logger logger = Logger.getLogger(ReentrantLockConditionExercise.class.getName());        private static volatile boolean flag = false;        public static void main(String[] args) {        Lock lock = new ReentrantLock();        Condition condition = lock.newCondition();                new Thread(new Runnable() {            @Override            public void run() {                lock.lock();                                try {                    while (!flag) {                        logger.info(Thread.currentThread().getName() + "继续等待(条件还不成熟)");                        condition.await(); // 等待其他线程改变当前线程需要的条件(会释放锁)                    }                    logger.info(Thread.currentThread().getName() + "继续业务(条件已成熟)");                } catch (InterruptedException e) {                    // TODO Auto-generated catch block                    e.printStackTrace();                } finally {                    lock.unlock();                }            }        }).start();                new Thread(new Runnable() {            @Override            public void run() {                lock.lock();                                try {                    try {                        TimeUnit.SECONDS.sleep(3);                    } catch (InterruptedException e) {                        // TODO Auto-generated catch block                        e.printStackTrace();                    }                                        logger.info(Thread.currentThread().getName() + "开始改变数据");                    flag = true;                    condition.signal(); // 唤醒其他线程(释放锁)                    logger.info(Thread.currentThread().getName() + "改变数据完毕,并通知其它线");                } finally {                    lock.unlock();                }            }        }).start();            }}

当然,也可使用Object.wait()Object.notify()实现此功能。

463931-20170731224326490-1500805333.png

package com.nicchagil.exercies.condition.waitnotify;import java.util.concurrent.TimeUnit;import java.util.logging.Logger;import com.nicchagil.exercies.condition.ReentrantLockConditionExercise;public class WaitNotifyExercise {        /*     * 内部类,封装boolean(不直接用Boolean,因为唤醒前改变数值时使用“flag = true”会修改flag的对象,导致用没加锁的对象调用“notify()”从而报异常)     */    static class MyFlag {        private Boolean flag = false;        public Boolean getFlag() {            return flag;        }        public void setFlag(Boolean flag) {            this.flag = flag;        }    }    private static Logger logger = Logger.getLogger(ReentrantLockConditionExercise.class.getName());    private static volatile MyFlag myFlag = new MyFlag();        public static void main(String[] args) {                new Thread(new Runnable() {            @Override            public void run() {                synchronized (myFlag) {                    try {                        while (!myFlag.getFlag()) {                            logger.info(Thread.currentThread().getName() + "继续等待(条件还不成熟)");                            myFlag.wait(); // 等待其他线程改变当前线程需要的条件(会释放锁)                        }                        logger.info(Thread.currentThread().getName() + "继续业务(条件已成熟)");                    } catch (InterruptedException e) {                        // TODO Auto-generated catch block                        e.printStackTrace();                    }                }            }        }).start();                new Thread(new Runnable() {            @Override            public void run() {                synchronized (myFlag) {                    try {                        TimeUnit.SECONDS.sleep(3);                    } catch (InterruptedException e) {                        // TODO Auto-generated catch block                        e.printStackTrace();                    }                                        logger.info(Thread.currentThread().getName() + "开始改变数据");                    myFlag.setFlag(true);                    myFlag.notify(); // 唤醒其他线程(释放锁)                    logger.info(Thread.currentThread().getName() + "改变数据完毕,并通知其它线");                }            }        }).start();            }}

等待其它线程结束,CountDownLatch.countDown()、CountDownLatch.await()

常见场景,比如A、B、C三个业务逻辑,3个业务之间没有依赖,可以并行运行,3个业务都执行完毕后向前端反馈结果。

一个线程等待其他线程结束才继续运行,可以用CountDownLatch.countDown()CountDownLatch.await()CyclicBarrier.await()Thread.join()

当一个线程的业务执行完,使用CountDownLatch.countDown()减1个任务,在一个线程中使用CountDownLatch.await()等待任务数减至0:

463931-20170731225325224-1028964068.png

package com.nicchagil.exercies.countdownlatch;import java.util.concurrent.CountDownLatch;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.TimeUnit;import java.util.logging.Logger;public class CountDownLatchExercise {        private static Logger logger = Logger.getLogger(CountDownLatchExercise.class.getName());        private static CountDownLatch countDownLatch = new CountDownLatch(2);    public static void main(String[] args) throws InterruptedException {        logger.info(Thread.currentThread().getName() + " start..."); // 主任务开始                ExecutorService executorService = Executors.newCachedThreadPool();        executorService.execute(new Runnable() {            @Override            public void run() {                try {                    TimeUnit.SECONDS.sleep(3);                } catch (InterruptedException e) {                    // TODO Auto-generated catch block                    e.printStackTrace();                }                                logger.info(Thread.currentThread().getName() + " complete..."); // 子任务一完成                countDownLatch.countDown();            }        });                executorService.execute(new Runnable() {            @Override            public void run() {                try {                    TimeUnit.SECONDS.sleep(5);                } catch (InterruptedException e) {                    // TODO Auto-generated catch block                    e.printStackTrace();                }                                logger.info(Thread.currentThread().getName() + " complete..."); // 子任务二完成                countDownLatch.countDown();            }        });                countDownLatch.await();        logger.info(Thread.currentThread().getName() + " complete..."); // 主任务完成    }}

等待其它线程结束,CyclicBarrier.await()

各线程执行完毕都使用CyclicBarrier.await(),表示到达Barrier(屏障)。另外CyclicBarrierCountDownLatch的区别还有,前者可通过cyclicBarrier.reset()重置数值,可通过构造方式CyclicBarrier(int parties, Runnable barrierAction)声明当屏障要被越过时由最后到达屏障的线程执行barrierAction任务:

463931-20170731230840177-1218466695.png

package com.nicchagil.exercies.cyclicbarrier;import java.util.concurrent.BrokenBarrierException;import java.util.concurrent.CyclicBarrier;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.TimeUnit;import java.util.logging.Logger;import com.nicchagil.exercies.countdownlatch.CountDownLatchExercise;public class CyclicBarrierExercise {        private static Logger logger = Logger.getLogger(CountDownLatchExercise.class.getName());        private static CyclicBarrier cyclicBarrier = new CyclicBarrier(3);    public static void main(String[] args) {        logger.info(Thread.currentThread().getName() + " start..."); // 主任务开始                ExecutorService executorService = Executors.newCachedThreadPool();        executorService.execute(new Runnable() {            @Override            public void run() {                try {                    TimeUnit.SECONDS.sleep(3);                } catch (InterruptedException e) {                    // TODO Auto-generated catch block                    e.printStackTrace();                }                                logger.info(Thread.currentThread().getName() + " complete..."); // 子任务一完成                try {                    cyclicBarrier.await();                } catch (InterruptedException e) {                    // TODO Auto-generated catch block                    e.printStackTrace();                } catch (BrokenBarrierException e) {                    // TODO Auto-generated catch block                    e.printStackTrace();                }            }        });                executorService.execute(new Runnable() {            @Override            public void run() {                try {                    TimeUnit.SECONDS.sleep(5);                } catch (InterruptedException e) {                    // TODO Auto-generated catch block                    e.printStackTrace();                }                                logger.info(Thread.currentThread().getName() + " complete..."); // 子任务二完成                try {                    cyclicBarrier.await();                } catch (InterruptedException e) {                    // TODO Auto-generated catch block                    e.printStackTrace();                } catch (BrokenBarrierException e) {                    // TODO Auto-generated catch block                    e.printStackTrace();                }            }        });                try {            cyclicBarrier.await();        } catch (InterruptedException e) {            // TODO Auto-generated catch block            e.printStackTrace();        } catch (BrokenBarrierException e) {            // TODO Auto-generated catch block            e.printStackTrace();        }        logger.info(Thread.currentThread().getName() + " complete..."); // 主任务完成    }}

等待join()的线程完成,Thread.join()

使用Thread.join()

463931-20170731232137849-431813635.png

package com.nicchagil.exercies.countdownlatch.joinimplement;import java.util.concurrent.TimeUnit;import java.util.logging.Logger;public class JoinExercise {        private static Logger logger = Logger.getLogger(JoinExercise.class.getName());        public static void main(String[] args) {        logger.info(Thread.currentThread().getName() + " start..."); // 主任务开始                Thread t1 = new Thread(new Runnable() {            @Override            public void run() {                try {                    TimeUnit.SECONDS.sleep(3);                } catch (InterruptedException e) {                    // TODO Auto-generated catch block                    e.printStackTrace();                }                                logger.info(Thread.currentThread().getName() + " complete..."); // 子任务一完成            }        });                Thread t2 = new Thread(new Runnable() {            @Override            public void run() {                try {                    TimeUnit.SECONDS.sleep(5);                } catch (InterruptedException e) {                    // TODO Auto-generated catch block                    e.printStackTrace();                }                                logger.info(Thread.currentThread().getName() + " complete..."); // 子任务二完成            }        });                t1.start();        t2.start();                /* 插入主线程,让主线程等待其完成 */        try {            t1.join();        } catch (InterruptedException e) {            // TODO Auto-generated catch block            e.printStackTrace();        }                try {            t2.join();        } catch (InterruptedException e) {            // TODO Auto-generated catch block            e.printStackTrace();        }                logger.info(Thread.currentThread().getName() + " complete..."); // 主任务完成    }}

线程睡眠,Thread.sleep(long millis)或TimeUnit.sleep(long timeout)

常用此俩方法可使线程睡眠,但不会释放锁。

使用Thread.sleep(long millis)

package com.nicchagil.exercies.threadsleep;import java.util.logging.Logger;public class ThreadSleep {        private static Logger logger = Logger.getLogger(ThreadSleep.class.getName());    public static void main(String[] args) {        logger.info("开始睡眠");                try {            Thread.sleep(3000);        } catch (InterruptedException e) {            // TODO Auto-generated catch block            e.printStackTrace();        }        logger.info("结束睡眠");    }}

使用TimeUnit.sleep(long timeout)

package com.nicchagil.exercies.threadsleep;import java.util.concurrent.TimeUnit;import java.util.logging.Logger;public class TimeUnitThreadSleep {        private static Logger logger = Logger.getLogger(TimeUnitThreadSleep.class.getName());    public static void main(String[] args) {        logger.info("开始睡眠");                try {            TimeUnit.SECONDS.sleep(3);        } catch (InterruptedException e) {            // TODO Auto-generated catch block            e.printStackTrace();        }        logger.info("结束睡眠");    }}

狭路相逢勇者胜,同一时间限制指定数量的线程访问,Semaphore

在多线程环境,某些资源是有限的,比如文件IO数据库连接,我们需要作流量控制,可以使用Semaphore.acquire()获取一个许可,Semaphore.release()释放一个许可:

package com.nicchagil.exercies.semaphore;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Semaphore;import java.util.concurrent.TimeUnit;import java.util.logging.Logger;public class SemaphoreExercise {        private static Logger logger = Logger.getLogger(SemaphoreExercise.class.getName());        private static Semaphore semaphore = new Semaphore(3); // 最多同时通过3个信号的信号量    public static void main(String[] args) {        ExecutorService executorService = Executors.newCachedThreadPool();                for (int i = 0; i <= 10; i++) {            executorService.execute(new Runnable() {                @Override                public void run() {                    try {                        semaphore.acquire(); // 获取一个信号                    } catch (InterruptedException e1) {                        // TODO Auto-generated catch block                        e1.printStackTrace();                    }                                        /* 睡眠3S */                    try {                        TimeUnit.SECONDS.sleep(3);                    } catch (InterruptedException e) {                        // TODO Auto-generated catch block                        e.printStackTrace();                    }                                        logger.info(Thread.currentThread().getName() + " run...");                                        semaphore.release(); // 释放一个信号                }            });        }            }}

数据库连接作为受限资源,同时最多只放行3个线程:

463931-20170731220129411-954466284.png

也许你会说,我一开始声明受限的线程数量就可以了,比如启动3个线程数(如下图)。但是,并非所有情况均如你所愿,比如线程不是由你启动的,由Servlet容器启动的呢;再比如,在数据库访问前有部分业务操作,这些操作比访问数据库耗时些,多启动些线程能增大吞吐量。

463931-20170731220825630-1754454425.png

缓存线程,线程池,ExecutorService、Executors、ThreadPoolExecutor

将线程缓存起来重复利用,可以减低线程创建、销毁的成本,还可以对其进行管理。比如系统中线程的数量是有限的,不能无止境的创建。

线程池执行器,ThreadPoolExecutor

我们常用的Executors.newFixedThreadPool(int)Executors.newCachedThreadPool()都是基于ThreadPoolExecutor,所以,先讲后者。

构造方法ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)的参数分别为:

  1. corePoolSize,核心线程池线程的数量
  2. maximumPoolSize,总线程池线程的最大数量
  3. keepAliveTime,当总线程池中除了核心线程池的线程空闲时保持等待时间,超过此时间就回收此线程
  4. unit,keepAliveTime时间的单位
  5. workQueue,当提交的线程数超过核心线程池线程数量,线程在此队列中排队
  • 提交线程,优先在核心线程池中创建线程执行
  • 如果核心线程池已满,则在队列中排队待执行
  • 如果队列已满,则在总线程池创建线程执行
  • 如果总线程池也满了,则调用RejectedExecutionHandler拒绝执行处理器
package com.nicchagil.exercies.threadpool;import java.util.concurrent.ExecutorService;import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;import java.util.logging.Logger;public class ThreadPoolExecutorExercise {        private static Logger logger = Logger.getLogger(ThreadPoolExecutorExercise.class.getName());    public static void main(String[] args) {        /* 核心线程池为3,最大线程池位6,链式堵塞队列长度为2 */        ExecutorService executorService = new ThreadPoolExecutor(3, 6, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue
(2)); for (int i = 0; i <= 10; i++) { try { executorService.execute(new Runnable() { @Override public void run() { logger.info(Thread.currentThread().getName() + "开始运行"); try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); } catch (Exception e) { logger.info("第几个线程提交失败:" + i); } } }}

结果如下:

七月 16, 2017 1:42:54 下午 com.nicchagil.exercies.threadpool.ThreadPoolExecutorExercise$1 run信息: pool-1-thread-1开始运行七月 16, 2017 1:42:54 下午 com.nicchagil.exercies.threadpool.ThreadPoolExecutorExercise$1 run信息: pool-1-thread-5开始运行七月 16, 2017 1:42:54 下午 com.nicchagil.exercies.threadpool.ThreadPoolExecutorExercise$1 run信息: pool-1-thread-2开始运行七月 16, 2017 1:42:54 下午 com.nicchagil.exercies.threadpool.ThreadPoolExecutorExercise$1 run信息: pool-1-thread-4开始运行七月 16, 2017 1:42:54 下午 com.nicchagil.exercies.threadpool.ThreadPoolExecutorExercise$1 run信息: pool-1-thread-3开始运行七月 16, 2017 1:42:54 下午 com.nicchagil.exercies.threadpool.ThreadPoolExecutorExercise$1 run信息: pool-1-thread-6开始运行七月 16, 2017 1:42:54 下午 com.nicchagil.exercies.threadpool.ThreadPoolExecutorExercise main信息: 第几个线程提交失败:8七月 16, 2017 1:42:54 下午 com.nicchagil.exercies.threadpool.ThreadPoolExecutorExercise main信息: 第几个线程提交失败:9七月 16, 2017 1:42:54 下午 com.nicchagil.exercies.threadpool.ThreadPoolExecutorExercise main信息: 第几个线程提交失败:10七月 16, 2017 1:42:59 下午 com.nicchagil.exercies.threadpool.ThreadPoolExecutorExercise$1 run信息: pool-1-thread-5开始运行七月 16, 2017 1:42:59 下午 com.nicchagil.exercies.threadpool.ThreadPoolExecutorExercise$1 run信息: pool-1-thread-2开始运行

用指定数量的线程执行任务,Executors.newFixedThreadPool(int)

Executors.newFixedThreadPool(int),实际上是new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()),可知:

  • 核心线程池、总线程池大小为nThreads
  • 总线程池空闲线程不等待(实际上因核心线程池、总线程池大小相等,总线程池也没有额外的线程了)
  • 使用链式堵塞队列,其最大容量为Integer.MAX_VALUE,可以视为无限吧(你提交2的31次方-1个任务试试?)
package com.nicchagil.exercies.threadpool;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.TimeUnit;import java.util.logging.Logger;public class NewFixedThreadPoolExercise {        private static Logger logger = Logger.getLogger(NewFixedThreadPoolExercise.class.getName());        public static void main(String[] args) {        // = new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue
()) ExecutorService executorService = Executors.newFixedThreadPool(3); // = new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue
())) // Executors.newSingleThreadExecutor(); for (int i = 0; i <= 10; i++) { executorService.execute(new Runnable() { @Override public void run() { logger.info(Thread.currentThread().getName() + "开始运行"); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); } }}

用动态缓存的线程执行任务,Executors.newCachedThreadPool()

Executors.newCachedThreadPool(),实际上是new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()),可知:

  • 核心线程数为0,总线程数为Integer.MAX_VALUE,可视为无限吧
  • 总线程池空闲线程等待新任务60秒,超时回收线程
  • 使用同步队列。此队列特点为,无容量;总线程池空闲线程调用SynchronousQueue.poll(long timeout, TimeUnit unit)在指定时间内等待新任务,如果总线程池没有空闲线程,则在总线程池中创建新线程,而总线程池的容量又可视为无限的,所以提交任务的速度大于执行任务的速度,会创建大量线程,导致CPU耗尽,内存溢出。
package com.nicchagil.exercies.threadpool;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.TimeUnit;import java.util.logging.Logger;public class NewCachedThreadPoolExercise {        private static Logger logger = Logger.getLogger(NewCachedThreadPoolExercise.class.getName());        public static void main(String[] args) {        // = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue
()) ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 0; i <= 10; i++) { executorService.execute(new Runnable() { @Override public void run() { logger.info(Thread.currentThread().getName() + "开始运行"); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); } }}

转载地址:http://qoohx.baihongyu.com/

你可能感兴趣的文章
CentOS 5.5 使用 EPEL 和 RPMForge 软件库
查看>>
Damien Katz弃Apache CouchDB,继以Couchbase Server
查看>>
Target runtime Apache Tomcat is not defined.错误解决方法
查看>>
某机字长为32位,存储容量为64MB,若按字节编址.它的寻址范围是多少?
查看>>
VC++ 监视文件(夹)
查看>>
【转】keyCode对照表及JS监听组合按键
查看>>
[Java开发之路](14)反射机制
查看>>
mac gentoo-prefix安装git svn
查看>>
浅尝异步IO
查看>>
C - Train Problem II——(HDU 1023 Catalan 数)
查看>>
Speak loudly
查看>>
iOS-在项目中引入RSA算法
查看>>
[译] 听说你想学 React.js ?
查看>>
gulp压缩合并js与css
查看>>
块级、内联、内联块级
查看>>
Predicate
查看>>
[面试题记录01]实现一个function sum达到一下目的
查看>>
这个季节的忧伤,点到为止
查看>>
mysql通过配置文件进行优化
查看>>
省级网站群建设关注点
查看>>