JUC部分并发类使用方式
下面介绍的是JUC包下一些线程安全类的一些简单使用和一些小demo。
Semaphore
信号量,即可以同时使用的线程数,tryrequire就是将信号量减一,release就是信号量+1,当等于0就会阻塞,大于零才会唤醒。
当需要控制线程访问数量,可以使用信号量来做控制,比较简单。
下面是使用信号量改进的数据库连接池
@Slf4j
public class SemaphoreConnection {
// 连接池对象数组
private Connection[] connections;
// 使用标记
private AtomicIntegerArray flagArrays;
// 线程池大小
private Integer poolSize;
/**
* 信号量
*/
Semaphore semaphore;
public SemaphoreConnection(){
this.poolSize = 5;
connections = new MarkConnection[5];
flagArrays = new AtomicIntegerArray(5);
for (int i = 0; i < connections.length; i++) {
connections[i] = new MarkConnection("连接" + i+1);
}
semaphore = new Semaphore(5);
}
// 连接池的初始化
public SemaphoreConnection(int poolSize) {
this.poolSize = poolSize;
connections = new MarkConnection[poolSize];
flagArrays = new AtomicIntegerArray(poolSize);
for (int i = 0; i < connections.length; i++) {
connections[i] = new MarkConnection("连接" + i);
}
semaphore = new Semaphore(poolSize);
}
// 向连接池中请求连接
public Connection getConnection(){
try {
semaphore.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
while (true){
for (int i = 0; i < poolSize; i++) {
// 进行cas请求,如果请求失败就失败
if (flagArrays.compareAndSet(i,0,1)){
return connections[i];
}
}
}
}
/**
* 释放连接
* @param con
*/
public void releaseConnection(Connection con){
for (int i = 0; i < poolSize; i++) {
if (con == connections[i]){
// 将连接标识置为0即空间连接
flagArrays.set(i,0);
semaphore.release();
}
}
}
public static void main(String[] args) {
SemaphoreConnection myConnectionPoll = new SemaphoreConnection(3);
for (int i = 0; i < 10; i++) {
new Thread(() ->{
MarkConnection connection =(MarkConnection) myConnectionPoll.getConnection();
log.debug("获得锁{}",connection.getConName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
myConnectionPoll.releaseConnection(connection);
log.debug("释放锁{}",connection.getConName());
}).start();
}
}
}
CountDownLatch
就是一个用来计数的,来解决线程之间通信的问题。
比如我需要两个线程执行完毕后再去执行主线程,我们可以用join方法,但是我们同样可以使用这个类来控制。
通过一个count,当其被减为0时,调用await阻塞的线程就会唤醒。
// 一个简单的demo
@Slf4j
public class Test2 {
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(2);
// for(int i = 0;i < 3;i++){
new Thread(()->{
log.debug("来啦");
try {
Thread.sleep(1000);
countDownLatch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("干啦兄弟们");
}).start();
new Thread(()->{
log.debug("来啦");
try {
Thread.sleep(2000);
countDownLatch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("干啦兄弟们");
}).start();
// }
countDownLatch.await();
log.debug("开吃了兄弟们");
}
}
同时可以作为微服务下的多线程调用,当我们需要多个微服务的结果时,我们可以使用多线程,调用多线程池使用多线程的方法远程调用,可以使用该类控制主线程等待线程池中的线程执行完后执行主线程。但是使用Future可以解决问题,所以还是建议使用future,使用callable方法。
CyclicBarrier
作为一个和CountDownLatch功能相似,但是它却是可以循环使用的,当其置为0时,唤醒线程后,就会恢复到初始值。可以继续使用,而CountDownLatch却是一次性的,需要重复创建线程。
CyclicBarrier主要是调用一个await方法,等待其他线程,当到达数量的线程完成后,就会唤醒被await方法阻塞的线程。
// demo
@Slf4j
public class Test3 {
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
new Thread(()->{
log.debug("来啦");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("干啦兄弟们");
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
new Thread(()->{
log.debug("来啦");
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("干啦兄弟们");
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
log.debug("来了啊");
}
}
ConcurrentHashMap
保证了多线程下hashmap的安全。
jdk1.7时会产生死链,因为jdk1.7时多线程下是在链表头往里面加,而 不是1.8在链表尾加,所以会出现多线程下链表出现环,无法结束程序。
Concurrent类在多线程下的表现为
- 内部使用cas进行优化,可以支持较大的吞吐量
- 弱一致性
- 迭代时的弱一致性,当在迭代是被更新,迭代不会报错但是会表现出旧值
- size的弱一致性,会读到旧值
- 读取一致性
fast-fail快速失败就是当其他线程修改了,非安全容器就会让正在遍历的直接抛出异常。