您现在的位置是:主页 > news > 陕西高端品牌网站建设/志鸿优化网
陕西高端品牌网站建设/志鸿优化网
admin2025/5/5 18:42:05【news】
简介陕西高端品牌网站建设,志鸿优化网,自学网站建设靠谱吗,下载一个百度导航高并发学习(三) 线程安全的集合ArrayListHashSetHashMap常用辅助类CountDownLatchCyclicBarrierSemaphore读写锁阻塞队列线程池三种创建方式7大参数4种拒绝策略线程池原理线程安全的集合 ArrayList List list new ArrayList();线程不安全 会发生Concu…
高并发学习(三)
- 线程安全的集合
- ArrayList
- HashSet
- HashMap
- 常用辅助类
- CountDownLatch
- CyclicBarrier
- Semaphore
- 读写锁
- 阻塞队列
- 线程池
- 三种创建方式
- 7大参数
- 4种拒绝策略
- 线程池原理
线程安全的集合
ArrayList
List list = new ArrayList();线程不安全 会发生ConcurrentModificationException异常
解决方案:
1、vector线程安全。List list = new Vector<>();
2、使用工具类,将ArrayList转为线程安全的List list = Collections.synchronizedList(new ArrayList<>());
3、使用JUC 下的CopyOnWriteArrayList。该List采用写时加锁然后复制,实现线程安全
package UnSafeDataStructure;import java.util.ArrayList;
import java.util.Collections;
import java.util.List;public class ArrayListDemo {public static void main(String[] args) {List<Integer> list = Collections.synchronizedList(new ArrayList<>());for (int i = 0; i < 50; i++) {new Thread(()->{list.add((int) (Math.random() * 1000 ));System.out.println(list);}).start();}}
}
HashSet
Set set = new HashSet<>();线程不安全 会发生ConcurrentModificationException异常
解决方案:
1、 使用JUC包下的CopyOnWriteArraySet 。 Set set = new CopyOnWriteArraySet<>();
2、 使用Collections工具类 ,syn方法返回一个线程安全的集合。Set set = Collections.synchronizedSet(new HashSet<>);
package UnSafeDataStructure;import java.util.Collections;
import java.util.HashSet;
import java.util.Set;public class HashSetDemo {public static void main(String[] args) {Set<Integer> set = Collections.synchronizedSet(new HashSet<>());for (int i = 0; i < 50; i++) {new Thread(()->{set.add( (int) (Math.random() * 1000 ) );try {System.out.println(set);
// System.out.println(Thread.currentThread().getName()+" "+set);Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}, i +" ").start();}}
}
HashMap
Map<Integer,Integer> map = new HashMap<>();线程不安全,
解决方案:
1、 使用JUC包下的ConcurrentHashMap,Map<Integer,Integer> map = new ConcurrentHashMap<>();
2、 使用工具类转换。Map<Integer,Integer> map = Collections.synchronizedMap(new HashMap<>());
package UnSafeDataStructure;import java.util.Collections;
import java.util.HashMap;
import java.util.Map;public class HashMapDemo {static int time []= {10,20,30,40,50,60,70,80,90,100};public static void main(String[] args) {Map<Integer,Integer> map = Collections.synchronizedMap(new HashMap<>());for (int i = 0; i < 50; i++) {new Thread(()->{map.put((int) (Math.random() * 1000 ) , (int) (Math.random() * 1000 ));/*若增加打印语句,或者使用线程休眠。则可能由于延时使得线程得以先后执行*/
// try {
// Thread.sleep((int) (Math.random() * 100 ));
// } catch (InterruptedException e) {
// e.printStackTrace();
// }//System.out.println( Thread.currentThread().getName());System.out.println(map);}, "" + i).start();}}
}
常用辅助类
CountDownLatch
CountDownLatch
官网例子:
class Driver { // ...void main() throws InterruptedException {CountDownLatch startSignal = new CountDownLatch(1);CountDownLatch doneSignal = new CountDownLatch(N);for (int i = 0; i < N; ++i) // create and start threadsnew Thread(new Worker(startSignal, doneSignal)).start();doSomethingElse(); // don't let run yetstartSignal.countDown(); // let all threads proceeddoSomethingElse();doneSignal.await(); // wait for all to finish}}class Worker implements Runnable {private final CountDownLatch startSignal;private final CountDownLatch doneSignal;Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {this.startSignal = startSignal;this.doneSignal = doneSignal;}public void run() {try {startSignal.await();doWork();doneSignal.countDown();} catch (InterruptedException ex) {} // return;}void doWork() { ... }}
package SynAid;import java.util.concurrent.CountDownLatch;public class LatchDemo {public static void main(String[] args) throws InterruptedException {//1、创建计数器CountDownLatch downLatch = new CountDownLatch(6);for (int i = 0; i < 5; i++) {new Thread(()->{System.out.println("【线程"+Thread.currentThread().getName()+"】 让计数器减1");downLatch.countDown();}, ""+i).start();}//2、若计数器不为0 则会在这里阻塞downLatch.await();System.out.println("计数器已减至0");}
}
CyclicBarrier
官网例子
class Solver {final int N;final float[][] data;final CyclicBarrier barrier;class Worker implements Runnable {int myRow;Worker(int row) { myRow = row; }public void run() {while (!done()) {processRow(myRow);try {barrier.await();} catch (InterruptedException ex) {return;} catch (BrokenBarrierException ex) {return;}}}}public Solver(float[][] matrix) {data = matrix;N = matrix.length;Runnable barrierAction = () -> mergeRows(...);barrier = new CyclicBarrier(N, barrierAction);List<Thread> threads = new ArrayList<>(N);for (int i = 0; i < N; i++) {Thread thread = new Thread(new Worker(i));threads.add(thread);thread.start();}// wait until donefor (Thread thread : threads)thread.join();}}
package SynAid;import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;public class CyclicBarrierDemo {public static void main(String[] args) {// 若没有达到设置的parties,Runnable方法不执行CyclicBarrier barrier = new CyclicBarrier(6, () -> {System.out.println("计数器到达6");});for (int i = 0; i < 5; i++) {new Thread(()->{try {System.out.println("【线程"+Thread.currentThread().getName()+"】 让计数器加1");barrier.await();} catch (InterruptedException e) {e.printStackTrace();} catch (BrokenBarrierException e) {e.printStackTrace();}}, ""+i).start();}}
}
Semaphore
acquire请求获取permit
release释放掉获取的permit。
未得到permit的线程只能等待
package SynAid;import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;public class SemaphoreDemo {public static void main(String[] args) {//创建2个permitSemaphore semaphore = new Semaphore(2);for (int i = 0; i < 4; i++) {final int tag = i;new Thread(()->{try {//请求获取permitsemaphore.acquire();System.out.println("【线程"+Thread.currentThread().getName()+"】获取到信号量");TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}finally {System.out.println("【线程"+Thread.currentThread().getName()+"】释放信号量");//释放掉持有的permitsemaphore.release();}}, "" +tag ).start();}}
}
读写锁
读写锁的特点是:
(1) 读的时候允许多个线程同时读。由读锁(也叫共享锁)控制。
(2)写的时候只允许一个线程写。由写锁(也叫独享锁)控制。
package RWLock;import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;public class RWLockDemo {public static void main(String[] args) {Data data = new Data();for(int i = 0; i < 4; i ++){new Thread(()->{data.write();data.read();}, i + "").start();}}
}class Data{private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();public void write(){Lock lock = readWriteLock.writeLock();lock.lock();try {System.out.println("【线程"+Thread.currentThread().getName()+"】 正在写");}finally {System.out.println("【线程"+Thread.currentThread().getName()+"】 写完毕");lock.unlock();}}public void read(){Lock lock = readWriteLock.readLock();lock.lock();try {System.out.println("【线程"+Thread.currentThread().getName()+"】 正在读");}finally {lock.unlock();}}
}
执行结果
【线程0】 正在写
【线程0】 写完毕
【线程3】 正在写
【线程3】 写完毕
【线程1】 正在写
【线程1】 写完毕
【线程2】 正在写
【线程2】 写完毕
【线程1】 正在读
【线程2】 正在读
【线程0】 正在读
【线程3】 正在读
阻塞队列
package BQueueDemo;import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;public class BlockQueueDemo {/*抛出异常的增加元素和获取元素增加超出容量:Exception in thread "main" java.lang.IllegalStateException: Queue full删除为空:Exception in thread "main" java.util.NoSuchElementException*/public static void test1(){BlockingQueue<Integer> bq = new LinkedBlockingQueue<>(3);bq.add(1);bq.add(2);bq.add(3);
// bq.add(5); //Exception in thread "main" java.lang.IllegalStateException: Queue fullbq.remove();bq.remove();bq.remove();
// bq.remove(); //Exception in thread "main" java.util.NoSuchElementException}/*不抛出异常的增加和删除方法*/public static void test2(){BlockingQueue<Integer> bq = new ArrayBlockingQueue<>(3);bq.offer(1);bq.offer(2);bq.offer(3);System.out.println(bq.offer(4));//falsebq.poll( );bq.poll( );bq.poll( );try {System.out.println(bq.poll(1, TimeUnit.SECONDS) ); //null 等待超时后返回null} catch (InterruptedException e) {e.printStackTrace();}}/*** 不带超时的阻塞*/public static void test3(){BlockingQueue<Integer> bq = new ArrayBlockingQueue<>(3);try {bq.put(1);bq.put(2);bq.put(3);bq.put(5); //会一直阻塞 InterruptedException;} catch (InterruptedException e) {e.printStackTrace();}try {bq.take();bq.take();bq.take();bq.take(); //会一直阻塞 InterruptedException;} catch (InterruptedException e) {e.printStackTrace();}}public static void main(String[] args) {// test1();
// test2();test3();}
}
线程池
线程池的优点:
- 响应快。无需等待线程创建即可从线程池中返回线程立即执行。
- 节约资源。通过重复利用已经创建的线程,降低了每次new线程、销毁线程的时间开销。
- 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,也会降低系统的稳定性。使用线程可以进行统一的分配,调优和监控。
三种创建方式
Executors工具类有三种创建线程的方式:
1、 只有单个线程的线程池。ExecutorService threadPool = Executors.newSingleThreadExecutor();
2、 固定线程个数的线程池。 ExecutorService threadPool = Executors.newFixedThreadPool(5);
3、 动态个数线程的线程池。ExecutorService threadPool = Executors.newCachedThreadPool();
本质都是调用ThreadPoolExecutor的构造方法
ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue,
ThreadFactory threadFactory)
package ThreadPool;import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class ThreePool {public static void main(String[] args) {// ExecutorService threadPool = Executors.newSingleThreadExecutor();
// ExecutorService threadPool = Executors.newFixedThreadPool(5);//coreThread:0, maxThread:Integer.MAX_VALUE,//需要多少线程就分配多少线程 理论上最大线程数为 Integer.MAX_VALUE 实际上收CPU核数限制//本机测试 是85ExecutorService threadPool = Executors.newCachedThreadPool();for (int i = 0; i < 100; i++) {threadPool.execute(()->System.out.println("【"+ Thread.currentThread().getName()+"】"+"单个线程的线程池!"));}System.out.println("CPU核数:"+Runtime.getRuntime().availableProcessors());}}
new LinkedBlockingQueue()构造方法默认长度为:Integer.MAX_VALUE
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
阿里开发手册中明确规定:
7大参数
7大参数:public ThreadPoolExecutor(int corePoolSize, 池中核心线程数(相当于银行实际服务窗口个数)int maximumPoolSize, 池中最大线程个数(相当于银行最大能开放服务窗口个数)long keepAliveTime, 线程等待最大时间 当线程池数量超过corePoolSize,多余的空闲线程存活时间(超过corePoolSize的空闲线程会在多久内销毁)TimeUnit unit, 等待时间的单位BlockingQueue<Runnable> workQueue, 阻塞队列(相当于银行的等候区)ThreadFactory threadFactory, 线程工厂RejectedExecutionHandler handler 拒绝策略,当客户数量大于 maximumPoolSize + workQueue的长度时对于新增客户的策略,默认为 new AbortPolicy();,抛出异常)
package ThreadPool;import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;/*当线程个数大于8个时,会抛出异常:java.util.concurrent.RejectedExecutionException*/
public class ThreadPoolDemo {public static void main(String[] args) {ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(2, 5,1, TimeUnit.SECONDS,new ArrayBlockingQueue<>(3), Executors.defaultThreadFactory() ,new ThreadPoolExecutor.AbortPolicy());for (int i = 0; i < 9; i++) {poolExecutor.execute(()->System.out.println("【"+ Thread.currentThread().getName()+"】"+"单个线程的线程池!"));}}
}
4种拒绝策略
AbortPolicy 队列满时不处理后来的任务,直接抛出异常
CallerRunsPolicy “调用者运行”调节机制 。比如队列满时,多出的回退给main线程
DiscardPolicy 队列满了,会直接丢掉后来的任务,不会抛出异常
DiscardOldestPolicy 队列满了,会和最早的竞争
线程池原理
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));// Packing and unpacking ctlprivate static int runStateOf(int c) { return c & ~COUNT_MASK; }private static int workerCountOf(int c) { return c & COUNT_MASK; } //返回线程池中的任务个数private static int ctlOf(int rs, int wc) { return rs | wc; } //public final int get() { return value;}public void execute(Runnable command) {if (command == null) //如果任务为空 抛出空指针异常throw new NullPointerException();//ctl中保存线程池当前的一些状态信息int c = ctl.get();/**1、如果线程池任务数小于orePoolSize,会调用addWorker(command, true)新建一个线程,并将该任务添加到线程中,然后启动线程执行任务*/if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}/**2、如果线程池中任务数量大于等于corePoolSize通过isRunning()方法判断线程池的状态 线程池处于Running状态且阻塞队列未满,该任务才会被加入*/if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();/*通过isRunning(recheck)再次获取线程池状态,如果线程池不是Running状态就需要通过remove(command)从阻塞队列中移除任务,并尝试判断线程是否全部执行完毕,同时执行拒绝策略。*/if (! isRunning(recheck) && remove(command))reject(command);//如果线程池的任务个数为0 则新建一个空线程else if (workerCountOf(recheck) == 0)addWorker(null, false);}/**3、通过addWorker(command, false)创建一个线程,并将任务添加到该线程中。然后,启动该线从而执行任务。addWorker(command, false)执行失败,则通过reject()执行相应的拒绝策略的内容*/else if (!addWorker(command, false))reject(command);}
实际上就是进行是哪个判断:
首先核心线程池是否已满。
然后阻塞队列是否已满
最后最大线程池个数是否达到。