本文深入探讨Java并发编程的核心机制,重点解析线程池工作原理、锁优化策略、并发容器使用技巧,通过完整的实战案例展示如何构建高性能、高可用的Java并发应用。
![图片[1]-Java并发编程深度解析:线程池原理与性能优化实战](https://blogimg.vcvcc.cc/2025/11/20251123142928983-1024x768.png?imageView2/0/format/webp/q/75)
一、Java内存模型与并发基础
1.1 JMM内存模型核心原理
/**
* Java内存模型实战演示
* 展示可见性、原子性、有序性问题及解决方案
*/
public class JMMDemo {
// 共享变量 - 存在可见性问题
private static boolean stopRequested = false;
private static volatile boolean volatileStopRequested = false; // 使用volatile解决可见性
// 非原子操作共享变量
private static int count = 0;
private static final AtomicInteger atomicCount = new AtomicInteger(0); // 原子类解决方案
public static void main(String[] args) throws InterruptedException {
// 演示可见性问题
demonstrateVisibilityProblem();
// 演示原子性问题
demonstrateAtomicityProblem();
// 演示有序性问题
demonstrateOrderingProblem();
}
/**
* 可见性问题演示
*/
private static void demonstrateVisibilityProblem() throws InterruptedException {
System.out.println("=== 可见性问题演示 ===");
// 问题代码 - 可能永远无法停止
Thread visibilityThread = new Thread(() -> {
int i = 0;
while (!stopRequested) {
i++;
// 如果循环体内有同步操作,可能会刷新工作内存
// Thread.yield();
}
System.out.println("可见性测试线程结束,循环次数: " + i);
});
visibilityThread.start();
Thread.sleep(1000);
stopRequested = true;
System.out.println("已设置停止标志");
visibilityThread.join(2000);
if (visibilityThread.isAlive()) {
System.out.println("警告:线程仍未停止,存在可见性问题!");
visibilityThread.interrupt();
}
// 使用volatile解决的版本
Thread volatileThread = new Thread(() -> {
int i = 0;
while (!volatileStopRequested) {
i++;
}
System.out.println("Volatile解决方案 - 线程结束,循环次数: " + i);
});
volatileThread.start();
Thread.sleep(100);
volatileStopRequested = true;
volatileThread.join();
}
/**
* 原子性问题演示
*/
private static void demonstrateAtomicityProblem() throws InterruptedException {
System.out.println("\n=== 原子性问题演示 ===");
int threadCount = 10;
int iterations = 10000;
// 非原子操作线程
Thread[] nonAtomicThreads = new Thread[threadCount];
for (int i = 0; i < threadCount; i++) {
nonAtomicThreads[i] = new Thread(() -> {
for (int j = 0; j < iterations; j++) {
count++; // 非原子操作
}
});
}
// 原子操作线程
Thread[] atomicThreads = new Thread[threadCount];
for (int i = 0; i < threadCount; i++) {
atomicThreads[i] = new Thread(() -> {
for (int j = 0; j < iterations; j++) {
atomicCount.incrementAndGet(); // 原子操作
}
});
}
// 启动并等待非原子操作线程
long startTime = System.currentTimeMillis();
for (Thread t : nonAtomicThreads) t.start();
for (Thread t : nonAtomicThreads) t.join();
long nonAtomicTime = System.currentTimeMillis() - startTime;
// 启动并等待原子操作线程
startTime = System.currentTimeMillis();
for (Thread t : atomicThreads) t.start();
for (Thread t : atomicThreads) t.join();
long atomicTime = System.currentTimeMillis() - startTime;
System.out.println("非原子操作结果: " + count + " (期望: " + (threadCount * iterations) + ")");
System.out.println("原子操作结果: " + atomicCount.get() + " (期望: " + (threadCount * iterations) + ")");
System.out.println("非原子操作耗时: " + nonAtomicTime + "ms");
System.out.println("原子操作耗时: " + atomicTime + "ms");
}
/**
* 有序性问题演示
*/
private static void demonstrateOrderingProblem() {
System.out.println("\n=== 有序性问题演示 ===");
// 指令重排序可能导致的奇怪行为
for (int i = 0; i < 100000; i++) {
ReorderingDemo demo = new ReorderingDemo();
if (demo.testReordering()) {
System.out.println("检测到指令重排序!循环次数: " + i);
break;
}
}
}
/**
* 指令重排序演示类
*/
static class ReorderingDemo {
private int x = 0;
private int y = 0;
private volatile boolean ready = false; // 使用volatile防止重排序
public void writer() {
x = 42;
y = 50;
ready = true; // volatile写,建立happens-before关系
}
public void reader() {
if (ready) {
if (x == 0 && y == 50) {
// 正常情况下不应该出现这种情况
System.out.println("异常情况: x=" + x + ", y=" + y);
}
}
}
public boolean testReordering() {
Thread writer = new Thread(this::writer);
Thread reader = new Thread(this::reader);
reader.start();
writer.start();
try {
writer.join();
reader.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return x == 0 && y == 50;
}
}
}
二、线程池深度解析与性能优化
2.1 线程池核心架构与实现
/**
* 高级线程池配置与监控框架
* 支持动态参数调整、运行监控、拒绝策略定制
*/
public class AdvancedThreadPool {
// 线程池状态监控
private final ThreadPoolMonitor monitor;
// 核心线程池
private final ThreadPoolExecutor executor;
// 配置信息
private final ThreadPoolConfig config;
public AdvancedThreadPool(ThreadPoolConfig config) {
this.config = config;
this.monitor = new ThreadPoolMonitor();
this.executor = createThreadPool(config);
}
/**
* 创建优化的线程池
*/
private ThreadPoolExecutor createThreadPool(ThreadPoolConfig config) {
// 自定义线程工厂
CustomThreadFactory threadFactory = new CustomThreadFactory(
config.getThreadNamePrefix(),
config.isDaemon()
);
// 自定义拒绝策略
RejectedExecutionHandler rejectionHandler = new CustomRejectionPolicy();
// 创建线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(
config.getCorePoolSize(),
config.getMaximumPoolSize(),
config.getKeepAliveTime(),
config.getUnit(),
config.getWorkQueue(),
threadFactory,
rejectionHandler
);
// 高级配置
if (config.isAllowCoreThreadTimeout()) {
executor.allowCoreThreadTimeOut(true);
}
// 预启动核心线程
if (config.isPrestartCoreThreads()) {
executor.prestartAllCoreThreads();
}
return executor;
}
/**
* 执行任务
*/
public <T> Future<T> submit(Callable<T> task) {
monitor.recordTaskSubmit();
return executor.submit(task);
}
public Future<?> submit(Runnable task) {
monitor.recordTaskSubmit();
return executor.submit(task);
}
/**
* 动态调整线程池参数
*/
public void resize(int corePoolSize, int maximumPoolSize) {
if (corePoolSize < 0 || maximumPoolSize <= 0 || corePoolSize > maximumPoolSize) {
throw new IllegalArgumentException("Invalid pool size parameters");
}
executor.setCorePoolSize(corePoolSize);
executor.setMaximumPoolSize(maximumPoolSize);
System.out.printf("线程池已调整: corePoolSize=%d, maximumPoolSize=%d%n",
corePoolSize, maximumPoolSize);
}
/**
* 优雅关闭
*/
public void gracefulShutdown() {
System.out.println("开始优雅关闭线程池...");
// 停止接受新任务
executor.shutdown();
try {
// 等待现有任务完成
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
System.out.println("强制关闭未完成的任务...");
// 强制关闭
executor.shutdownNow();
// 再次等待
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
System.err.println("线程池未能正常关闭");
}
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
System.out.println("线程池已关闭");
}
/**
* 获取监控信息
*/
public ThreadPoolStats getStats() {
return monitor.collectStats(executor);
}
/**
* 线程池配置类
*/
public static class ThreadPoolConfig {
private final int corePoolSize;
private final int maximumPoolSize;
private final long keepAliveTime;
private final TimeUnit unit;
private final BlockingQueue<Runnable> workQueue;
private final String threadNamePrefix;
private final boolean daemon;
private final boolean allowCoreThreadTimeout;
private final boolean prestartCoreThreads;
private ThreadPoolConfig(Builder builder) {
this.corePoolSize = builder.corePoolSize;
this.maximumPoolSize = builder.maximumPoolSize;
this.keepAliveTime = builder.keepAliveTime;
this.unit = builder.unit;
this.workQueue = builder.workQueue;
this.threadNamePrefix = builder.threadNamePrefix;
this.daemon = builder.daemon;
this.allowCoreThreadTimeout = builder.allowCoreThreadTimeout;
this.prestartCoreThreads = builder.prestartCoreThreads;
}
// Getter方法
public int getCorePoolSize() { return corePoolSize; }
public int getMaximumPoolSize() { return maximumPoolSize; }
public long getKeepAliveTime() { return keepAliveTime; }
public TimeUnit getUnit() { return unit; }
public BlockingQueue<Runnable> getWorkQueue() { return workQueue; }
public String getThreadNamePrefix() { return threadNamePrefix; }
public boolean isDaemon() { return daemon; }
public boolean isAllowCoreThreadTimeout() { return allowCoreThreadTimeout; }
public boolean isPrestartCoreThreads() { return prestartCoreThreads; }
/**
* 建造者模式
*/
public static class Builder {
private int corePoolSize = Runtime.getRuntime().availableProcessors();
private int maximumPoolSize = Runtime.getRuntime().availableProcessors() * 2;
private long keepAliveTime = 60L;
private TimeUnit unit = TimeUnit.SECONDS;
private BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(1000);
private String threadNamePrefix = "advanced-pool";
private boolean daemon = false;
private boolean allowCoreThreadTimeout = false;
private boolean prestartCoreThreads = false;
public Builder corePoolSize(int corePoolSize) {
this.corePoolSize = corePoolSize;
return this;
}
public Builder maximumPoolSize(int maximumPoolSize) {
this.maximumPoolSize = maximumPoolSize;
return this;
}
public Builder keepAliveTime(long keepAliveTime, TimeUnit unit) {
this.keepAliveTime = keepAliveTime;
this.unit = unit;
return this;
}
public Builder workQueue(BlockingQueue<Runnable> workQueue) {
this.workQueue = workQueue;
return this;
}
public Builder threadNamePrefix(String threadNamePrefix) {
this.threadNamePrefix = threadNamePrefix;
return this;
}
public Builder daemon(boolean daemon) {
this.daemon = daemon;
return this;
}
public Builder allowCoreThreadTimeout(boolean allowCoreThreadTimeout) {
this.allowCoreThreadTimeout = allowCoreThreadTimeout;
return this;
}
public Builder prestartCoreThreads(boolean prestartCoreThreads) {
this.prestartCoreThreads = prestartCoreThreads;
return this;
}
public ThreadPoolConfig build() {
return new ThreadPoolConfig(this);
}
}
}
/**
* 自定义线程工厂
*/
private static class CustomThreadFactory implements ThreadFactory {
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
private final boolean daemon;
public CustomThreadFactory(String namePrefix, boolean daemon) {
this.namePrefix = namePrefix + "-thread-";
this.daemon = daemon;
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, namePrefix + threadNumber.getAndIncrement());
t.setDaemon(daemon);
t.setPriority(Thread.NORM_PRIORITY);
// 设置异常处理器
t.setUncaughtExceptionHandler(new ThreadUncaughtExceptionHandler());
return t;
}
}
/**
* 线程异常处理器
*/
private static class ThreadUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
@Override
public void uncaughtException(Thread t, Throwable e) {
System.err.printf("线程 %s 执行异常: %s%n", t.getName(), e.getMessage());
// 这里可以添加日志记录、告警等逻辑
}
}
/**
* 自定义拒绝策略
*/
private class CustomRejectionPolicy implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// 记录拒绝的任务
monitor.recordTaskRejection();
if (!executor.isShutdown()) {
System.out.println("任务被拒绝,尝试重新放入队列...");
try {
// 尝试重新放入队列,等待一段时间
if (!executor.getQueue().offer(r, 1, TimeUnit.SECONDS)) {
System.out.println("重新放入队列失败,执行拒绝处理...");
// 执行降级策略
executeFallback(r);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println("重新放入队列被中断,执行拒绝处理...");
executeFallback(r);
}
}
}
private void executeFallback(Runnable r) {
// 在调用者线程中直接执行
System.out.println("在调用者线程中执行被拒绝的任务");
r.run();
}
}
/**
* 线程池监控器
*/
private class ThreadPoolMonitor {
private final AtomicLong submittedTasks = new AtomicLong(0);
private final AtomicLong completedTasks = new AtomicLong(0);
private final AtomicLong rejectedTasks = new AtomicLong(0);
private final AtomicLong failedTasks = new AtomicLong(0);
public void recordTaskSubmit() {
submittedTasks.incrementAndGet();
}
public void recordTaskRejection() {
rejectedTasks.incrementAndGet();
}
public ThreadPoolStats collectStats(ThreadPoolExecutor executor) {
return new ThreadPoolStats(
executor.getPoolSize(),
executor.getCorePoolSize(),
executor.getMaximumPoolSize(),
executor.getActiveCount(),
executor.getCompletedTaskCount(),
executor.getTaskCount(),
executor.getQueue().size(),
submittedTasks.get(),
completedTasks.get(),
rejectedTasks.get(),
failedTasks.get()
);
}
}
/**
* 线程池统计信息
*/
public static class ThreadPoolStats {
private final int poolSize;
private final int corePoolSize;
private final int maximumPoolSize;
private final int activeCount;
private final long completedTaskCount;
private final long taskCount;
private final int queueSize;
private final long submittedTasks;
private final long completedTasks;
private final long rejectedTasks;
private final long failedTasks;
public ThreadPoolStats(int poolSize, int corePoolSize, int maximumPoolSize,
int activeCount, long completedTaskCount, long taskCount,
int queueSize, long submittedTasks, long completedTasks,
long rejectedTasks, long failedTasks) {
this.poolSize = poolSize;
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.activeCount = activeCount;
this.completedTaskCount = completedTaskCount;
this.taskCount = taskCount;
this.queueSize = queueSize;
this.submittedTasks = submittedTasks;
this.completedTasks = completedTasks;
this.rejectedTasks = rejectedTasks;
this.failedTasks = failedTasks;
}
@Override
public String toString() {
return String.format(
"ThreadPoolStats{poolSize=%d, corePoolSize=%d, maximumPoolSize=%d, " +
"activeCount=%d, completedTaskCount=%d, taskCount=%d, queueSize=%d, " +
"submittedTasks=%d, completedTasks=%d, rejectedTasks=%d, failedTasks=%d}",
poolSize, corePoolSize, maximumPoolSize, activeCount, completedTaskCount,
taskCount, queueSize, submittedTasks, completedTasks, rejectedTasks, failedTasks
);
}
}
}
2.2 线程池实战应用示例
/**
* 线程池实战演示
* 展示不同场景下的线程池配置和使用
*/
public class ThreadPoolDemo {
public static void main(String[] args) throws Exception {
System.out.println("=== Java线程池实战演示 ===\n");
// 1. CPU密集型任务线程池配置
demonstrateCpuIntensivePool();
// 2. IO密集型任务线程池配置
demonstrateIoIntensivePool();
// 3. 定时任务线程池
demonstrateScheduledPool();
// 4. 自定义线程池实战
demonstrateCustomThreadPool();
}
/**
* CPU密集型任务线程池演示
*/
private static void demonstrateCpuIntensivePool() throws InterruptedException {
System.out.println("1. CPU密集型任务线程池配置");
int processorCount = Runtime.getRuntime().availableProcessors();
System.out.println("可用处理器数量: " + processorCount);
// 适合CPU密集型任务的配置
AdvancedThreadPool.ThreadPoolConfig config =
new AdvancedThreadPool.ThreadPoolConfig.Builder()
.corePoolSize(processorCount)
.maximumPoolSize(processorCount) // 固定大小,避免上下文切换
.keepAliveTime(0, TimeUnit.SECONDS) // 核心线程不会超时
.workQueue(new LinkedBlockingQueue<>(1000))
.threadNamePrefix("cpu-intensive")
.prestartCoreThreads(true)
.build();
AdvancedThreadPool cpuPool = new AdvancedThreadPool(config);
// 执行CPU密集型任务
List<Future<Long>> futures = new ArrayList<>();
int taskCount = 20;
System.out.println("提交 " + taskCount + " 个CPU密集型任务...");
long startTime = System.currentTimeMillis();
for (int i = 0; i < taskCount; i++) {
final int taskId = i;
Future<Long> future = cpuPool.submit(() -> {
System.out.println("CPU任务 " + taskId + " 开始执行,线程: " + Thread.currentThread().getName());
// 模拟CPU密集型计算
long result = 0;
for (int j = 0; j < 100000000; j++) {
result += Math.sqrt(j) * Math.sin(j);
}
System.out.println("CPU任务 " + taskId + " 完成,结果: " + result);
return result;
});
futures.add(future);
}
// 等待所有任务完成
for (Future<Long> future : futures) {
try {
future.get();
} catch (Exception e) {
e.printStackTrace();
}
}
long duration = System.currentTimeMillis() - startTime;
System.out.println("CPU密集型任务总耗时: " + duration + "ms");
System.out.println("线程池状态: " + cpuPool.getStats());
cpuPool.gracefulShutdown();
}
/**
* IO密集型任务线程池演示
*/
private static void demonstrateIoIntensivePool() throws InterruptedException {
System.out.println("\n2. IO密集型任务线程池配置");
int processorCount = Runtime.getRuntime().availableProcessors();
// 适合IO密集型任务的配置
AdvancedThreadPool.ThreadPoolConfig config =
new AdvancedThreadPool.ThreadPoolConfig.Builder()
.corePoolSize(processorCount * 2)
.maximumPoolSize(processorCount * 4) // 更大的线程数
.keepAliveTime(30, TimeUnit.SECONDS)
.workQueue(new LinkedBlockingQueue<>(500))
.threadNamePrefix("io-intensive")
.allowCoreThreadTimeout(true)
.build();
AdvancedThreadPool ioPool = new AdvancedThreadPool(config);
// 执行IO密集型任务
int taskCount = 50;
CountDownLatch latch = new CountDownLatch(taskCount);
System.out.println("提交 " + taskCount + " 个IO密集型任务...");
long startTime = System.currentTimeMillis();
for (int i = 0; i < taskCount; i++) {
final int taskId = i;
ioPool.submit(() -> {
try {
System.out.println("IO任务 " + taskId + " 开始执行,线程: " + Thread.currentThread().getName());
// 模拟IO操作(网络请求、文件读写等)
Thread.sleep(1000 + (int)(Math.random() * 2000));
System.out.println("IO任务 " + taskId + " 完成");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
latch.countDown();
}
return null;
});
}
// 等待所有任务完成
latch.await();
long duration = System.currentTimeMillis() - startTime;
System.out.println("IO密集型任务总耗时: " + duration + "ms");
System.out.println("线程池状态: " + ioPool.getStats());
ioPool.gracefulShutdown();
}
/**
* 定时任务线程池演示
*/
private static void demonstrateScheduledPool() throws InterruptedException {
System.out.println("\n3. 定时任务线程池演示");
ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(4);
System.out.println("开始定时任务演示...");
// 固定速率执行
scheduledPool.scheduleAtFixedRate(() -> {
System.out.println("固定速率任务执行 - " + new Date());
}, 0, 2, TimeUnit.SECONDS);
// 固定延迟执行
scheduledPool.scheduleWithFixedDelay(() -> {
System.out.println("固定延迟任务执行 - " + new Date());
try {
Thread.sleep(1000); // 模拟任务执行时间
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, 0, 3, TimeUnit.SECONDS);
// 单次延迟执行
scheduledPool.schedule(() -> {
System.out.println("单次延迟任务执行 - " + new Date());
}, 5, TimeUnit.SECONDS);
// 运行一段时间后关闭
Thread.sleep(15000);
System.out.println("停止定时任务...");
scheduledPool.shutdown();
scheduledPool.awaitTermination(5, TimeUnit.SECONDS);
}
/**
* 自定义线程池实战演示
*/
private static void demonstrateCustomThreadPool() throws Exception {
System.out.println("\n4. 自定义线程池高级功能演示");
// 创建自定义线程池
AdvancedThreadPool.ThreadPoolConfig config =
new AdvancedThreadPool.ThreadPoolConfig.Builder()
.corePoolSize(2)
.maximumPoolSize(4)
.keepAliveTime(10, TimeUnit.SECONDS)
.workQueue(new ArrayBlockingQueue<>(2)) // 小队列,快速触发拒绝策略
.threadNamePrefix("custom-pool")
.prestartCoreThreads(true)
.build();
AdvancedThreadPool customPool = new AdvancedThreadPool(config);
// 监控线程 - 定期打印线程池状态
Thread monitorThread = new Thread(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
Thread.sleep(2000);
System.out.println("监控 - " + customPool.getStats());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
monitorThread.setDaemon(true);
monitorThread.start();
// 提交大量任务,触发拒绝策略
int overloadTaskCount = 20;
System.out.println("提交 " + overloadTaskCount + " 个任务(测试拒绝策略)...");
List<Future<Integer>> futures = new ArrayList<>();
for (int i = 0; i < overloadTaskCount; i++) {
final int taskId = i;
try {
Future<Integer> future = customPool.submit(() -> {
System.out.println("任务 " + taskId + " 开始执行");
Thread.sleep(3000);
System.out.println("任务 " + taskId + " 完成");
return taskId;
});
futures.add(future);
} catch (Exception e) {
System.out.println("任务 " + taskId + " 提交异常: " + e.getMessage());
}
}
// 动态调整线程池大小
Thread.sleep(5000);
System.out.println("\n动态调整线程池大小...");
customPool.resize(4, 8);
// 等待剩余任务完成
for (Future<Integer> future : futures) {
try {
future.get(10, TimeUnit.SECONDS);
} catch (Exception e) {
System.out.println("任务执行异常: " + e.getMessage());
}
}
System.out.println("最终线程池状态: " + customPool.getStats());
customPool.gracefulShutdown();
monitorThread.interrupt();
}
}
三、并发容器与锁优化策略
3.1 高性能并发容器实战
/**
* 并发容器性能对比与最佳实践
*/
public class ConcurrentCollectionDemo {
public static void main(String[] args) throws Exception {
System.out.println("=== 并发容器性能对比 ===\n");
// 1. ConcurrentHashMap vs HashMap
demonstrateConcurrentMap();
// 2. CopyOnWriteArrayList vs ArrayList
demonstrateCopyOnWriteList();
// 3. BlockingQueue 实战
demonstrateBlockingQueue();
// 4. 并发计数器性能对比
demonstrateConcurrentCounters();
}
/**
* ConcurrentHashMap 性能演示
*/
private static void demonstrateConcurrentMap() throws InterruptedException {
System.out.println("1. ConcurrentHashMap vs HashMap 性能对比");
int elementCount = 100000;
int threadCount = 10;
// ConcurrentHashMap 测试
Map<Integer, String> concurrentMap = new ConcurrentHashMap<>();
long concurrentTime = testMapPerformance(concurrentMap, elementCount, threadCount);
// HashMap 测试(需要同步)
Map<Integer, String> synchronizedMap = Collections.synchronizedMap(new HashMap<>());
long synchronizedTime = testMapPerformance(synchronizedMap, elementCount, threadCount);
System.out.println("ConcurrentHashMap 耗时: " + concurrentTime + "ms");
System.out.println("SynchronizedHashMap 耗时: " + synchronizedTime + "ms");
System.out.println("性能提升: " +
String.format("%.2f", (double)(synchronizedTime - concurrentTime) / synchronizedTime * 100) + "%");
}
private static long testMapPerformance(Map<Integer, String> map, int elementCount, int threadCount)
throws InterruptedException {
CountDownLatch startLatch = new CountDownLatch(1);
CountDownLatch endLatch = new CountDownLatch(threadCount);
// 准备线程
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < threadCount; i++) {
final int threadId = i;
Thread thread = new Thread(() -> {
try {
startLatch.await();
// 每个线程处理一部分数据
int start = threadId * (elementCount / threadCount);
int end = (threadId + 1) * (elementCount / threadCount);
for (int j = start; j < end; j++) {
map.put(j, "value_" + j);
map.get(j % 1000); // 混合读写操作
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
endLatch.countDown();
}
});
threads.add(thread);
}
// 启动测试
for (Thread thread : threads) {
thread.start();
}
long startTime = System.currentTimeMillis();
startLatch.countDown(); // 同时开始
endLatch.await(); // 等待所有线程完成
long endTime = System.currentTimeMillis();
return endTime - startTime;
}
/**
* CopyOnWriteArrayList 演示
*/
private static void demonstrateCopyOnWriteList() throws InterruptedException {
System.out.println("\n2. CopyOnWriteArrayList 适用场景演示");
// 读多写少的场景
List<String> cowList = new CopyOnWriteArrayList<>();
// 初始化数据
for (int i = 0; i < 1000; i++) {
cowList.add("item_" + i);
}
int readerCount = 10;
int writerCount = 2;
CountDownLatch latch = new CountDownLatch(readerCount + writerCount);
// 创建读者线程
for (int i = 0; i < readerCount; i++) {
final int readerId = i;
new Thread(() -> {
try {
for (int j = 0; j < 1000; j++) {
// 频繁读取
String item = cowList.get(j % cowList.size());
if (j % 200 == 0) {
System.out.println("读者 " + readerId + " 读取: " + item);
}
}
} finally {
latch.countDown();
}
}).start();
}
// 创建写者线程
for (int i = 0; i < writerCount; i++) {
final int writerId = i;
new Thread(() -> {
try {
for (int j = 0; j < 10; j++) {
// 少量写入
String newItem = "new_item_" + writerId + "_" + j;
cowList.add(newItem);
System.out.println("写者 " + writerId + " 添加: " + newItem);
Thread.sleep(100); // 模拟写入间隔
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
latch.countDown();
}
}).start();
}
latch.await();
System.out.println("最终列表大小: " + cowList.size());
}
/**
* BlockingQueue 实战演示
*/
private static void demonstrateBlockingQueue() throws InterruptedException {
System.out.println("\n3. BlockingQueue 生产者消费者模式");
BlockingQueue<String> queue = new LinkedBlockingQueue<>(10);
int producerCount = 3;
int consumerCount = 2;
CountDownLatch completionLatch = new CountDownLatch(producerCount + consumerCount);
// 创建生产者
for (int i = 0; i < producerCount; i++) {
final int producerId = i;
new Thread(() -> {
try {
for (int j = 0; j < 20; j++) {
String item = "产品-" + producerId + "-" + j;
queue.put(item); // 阻塞直到有空间
System.out.println("生产者 " + producerId + " 生产: " + item);
Thread.sleep(50);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
completionLatch.countDown();
}
}).start();
}
// 创建消费者
for (int i = 0; i < consumerCount; i++) {
final int consumerId = i;
new Thread(() -> {
try {
for (int j = 0; j < 30; j++) {
String item = queue.take(); // 阻塞直到有元素
System.out.println("消费者 " + consumerId + " 消费: " + item);
Thread.sleep(100);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
completionLatch.countDown();
}
}).start();
}
// 等待一段时间后结束
Thread.sleep(5000);
System.out.println("队列剩余元素: " + queue.size());
}
/**
* 并发计数器性能对比
*/
private static void demonstrateConcurrentCounters() throws InterruptedException {
System.out.println("\n4. 并发计数器性能对比");
int threadCount = 20;
int incrementCount = 100000;
// AtomicLong 测试
AtomicLong atomicLong = new AtomicLong();
long atomicTime = testCounterPerformance(atomicLong::incrementAndGet, threadCount, incrementCount);
// LongAdder 测试(高并发场景更优)
LongAdder longAdder = new LongAdder();
long adderTime = testCounterPerformance(longAdder::increment, threadCount, incrementCount);
// synchronized 测试
long[] synchronizedCounter = {0};
Object lock = new Object();
long syncTime = testCounterPerformance(() -> {
synchronized (lock) {
synchronizedCounter[0]++;
}
}, threadCount, incrementCount);
System.out.println("AtomicLong 耗时: " + atomicTime + "ms, 结果: " + atomicLong.get());
System.out.println("LongAdder 耗时: " + adderTime + "ms, 结果: " + longAdder.sum());
System.out.println("Synchronized 耗时: " + syncTime + "ms, 结果: " + synchronizedCounter[0]);
}
private static long testCounterPerformance(Runnable incrementAction, int threadCount, int totalIncrements)
throws InterruptedException {
CountDownLatch startLatch = new CountDownLatch(1);
CountDownLatch endLatch = new CountDownLatch(threadCount);
int incrementsPerThread = totalIncrements / threadCount;
for (int i = 0; i < threadCount; i++) {
new Thread(() -> {
try {
startLatch.await();
for (int j = 0; j < incrementsPerThread; j++) {
incrementAction.run();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
endLatch.countDown();
}
}).start();
}
long startTime = System.currentTimeMillis();
startLatch.countDown();
endLatch.await();
long endTime = System.currentTimeMillis();
return endTime - startTime;
}
}
四、锁优化与并发设计模式
4.1 锁优化高级技巧
/**
* 锁优化与并发设计模式实战
*/
public class LockOptimizationDemo {
public static void main(String[] args) throws Exception {
System.out.println("=== 锁优化与并发设计模式 ===\n");
// 1. 锁分段技术
demonstrateLockStriping();
// 2. 读写锁优化
demonstrateReadWriteLock();
// 3. 无锁编程
demonstrateLockFreeProgramming();
// 4. CompletableFuture 异步编程
demonstrateCompletableFuture();
}
/**
* 锁分段技术演示
*/
private static void demonstrateLockStriping() throws InterruptedException {
System.out.println("1. 锁分段技术 (Lock Striping)");
int segmentCount = 16;
int threadCount = 32;
int operationsPerThread = 10000;
// 自定义锁分段Map
StripedLockMap<String, String> stripedMap = new StripedLockMap<>(segmentCount);
CountDownLatch latch = new CountDownLatch(threadCount);
long startTime = System.currentTimeMillis();
for (int i = 0; i < threadCount; i++) {
final int threadId = i;
new Thread(() -> {
try {
Random random = new Random();
for (int j = 0; j < operationsPerThread; j++) {
String key = "key_" + (random.nextInt(1000));
String value = "value_" + threadId + "_" + j;
if (j % 3 == 0) {
// 写操作
stripedMap.put(key, value);
} else {
// 读操作
stripedMap.get(key);
}
}
} finally {
latch.countDown();
}
}).start();
}
latch.await();
long duration = System.currentTimeMillis() - startTime;
System.out.println("锁分段Map操作完成,耗时: " + duration + "ms");
System.out.println("Map大小: " + stripedMap.size());
}
/**
* 读写锁优化演示
*/
private static void demonstrateReadWriteLock() throws InterruptedException {
System.out.println("\n2. 读写锁优化");
ReadWriteLockCache cache = new ReadWriteLockCache();
int readerCount = 10;
int writerCount = 3;
CountDownLatch latch = new CountDownLatch(readerCount + writerCount);
// 创建读者线程
for (int i = 0; i < readerCount; i++) {
final int readerId = i;
new Thread(() -> {
try {
Random random = new Random();
for (int j = 0; j < 100; j++) {
String key = "data_" + (random.nextInt(50));
String value = cache.get(key);
if (j % 20 == 0) {
System.out.println("读者 " + readerId + " 读取 " + key + ": " + value);
}
Thread.sleep(10);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
latch.countDown();
}
}).start();
}
// 创建写者线程
for (int i = 0; i < writerCount; i++) {
final int writerId = i;
new Thread(() -> {
try {
for (int j = 0; j < 20; j++) {
String key = "data_" + (j % 30);
String value = "value_" + writerId + "_" + j;
cache.put(key, value);
System.out.println("写者 " + writerId + " 写入 " + key + ": " + value);
Thread.sleep(50);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
latch.countDown();
}
}).start();
}
latch.await();
System.out.println("缓存最终大小: " + cache.size());
}
/**
* 无锁编程演示
*/
private static void demonstrateLockFreeProgramming() throws InterruptedException {
System.out.println("\n3. 无锁编程 (Lock-Free)");
LockFreeStack<Integer> stack = new LockFreeStack<>();
int threadCount = 10;
int operationsPerThread = 1000;
CountDownLatch latch = new CountDownLatch(threadCount * 2);
long startTime = System.currentTimeMillis();
// 生产者线程
for (int i = 0; i < threadCount; i++) {
final int producerId = i;
new Thread(() -> {
try {
for (int j = 0; j < operationsPerThread; j++) {
stack.push(producerId * 1000 + j);
}
} finally {
latch.countDown();
}
}).start();
}
// 消费者线程
for (int i = 0; i < threadCount; i++) {
new Thread(() -> {
try {
for (int j = 0; j < operationsPerThread; j++) {
Integer value = stack.pop();
if (value != null && j % 200 == 0) {
System.out.println("消费: " + value);
}
}
} finally {
latch.countDown();
}
}).start();
}
latch.await();
long duration = System.currentTimeMillis() - startTime;
System.out.println("无锁栈操作完成,耗时: " + duration + "ms");
System.out.println("栈剩余元素: " + stack.size());
}
/**
* CompletableFuture 异步编程
*/
private static void demonstrateCompletableFuture() throws Exception {
System.out.println("\n4. CompletableFuture 异步编程");
// 模拟异步任务执行
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
return "任务1结果";
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1500);
return "任务2结果";
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(800);
return "任务3结果";
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
// 组合多个任务
CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(future1, future2, future3);
// 处理完成结果
CompletableFuture<String> resultFuture = combinedFuture.thenApply(v -> {
try {
String result1 = future1.get();
String result2 = future2.get();
String result3 = future3.get();
return String.format("所有任务完成: %s, %s, %s", result1, result2, result3);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
// 异常处理
CompletableFuture<String> handledFuture = resultFuture.exceptionally(ex -> {
System.out.println("任务执行异常: " + ex.getMessage());
return "异常处理结果";
});
String finalResult = handledFuture.get();
System.out.println("最终结果: " + finalResult);
// 流式编程示例
CompletableFuture.supplyAsync(() -> {
System.out.println("开始复杂业务流程...");
return 10;
}).thenApplyAsync(result -> {
System.out.println("第一步处理: " + result);
return result * 2;
}).thenApplyAsync(result -> {
System.out.println("第二步处理: " + result);
return result + 5;
}).thenAcceptAsync(finalResult2 -> {
System.out.println("最终结果: " + finalResult2);
}).get();
}
/**
* 锁分段Map实现
*/
static class StripedLockMap<K, V> {
private final int segments;
private final List<Map<K, V>> maps;
private final List<Object> locks;
public StripedLockMap(int segments) {
this.segments = segments;
this.maps = new ArrayList<>(segments);
this.locks = new ArrayList<>(segments);
for (int i = 0; i < segments; i++) {
maps.add(new HashMap<>());
locks.add(new Object());
}
}
private int segment(K key) {
return Math.abs(key.hashCode()) % segments;
}
public V put(K key, V value) {
int segment = segment(key);
synchronized (locks.get(segment)) {
return maps.get(segment).put(key, value);
}
}
public V get(K key) {
int segment = segment(key);
synchronized (locks.get(segment)) {
return maps.get(segment).get(key);
}
}
public int size() {
int size = 0;
for (int i = 0; i < segments; i++) {
synchronized (locks.get(i)) {
size += maps.get(i).size();
}
}
return size;
}
}
/**
* 读写锁缓存实现
*/
static class ReadWriteLockCache {
private final Map<String, String> cache = new HashMap<>();
private final ReadWriteLock lock = new ReentrantReadWriteLock();
public String get(String key) {
lock.readLock().lock();
try {
return cache.get(key);
} finally {
lock.readLock().unlock();
}
}
public void put(String key, String value) {
lock.writeLock().lock();
try {
cache.put(key, value);
} finally {
lock.writeLock().unlock();
}
}
public int size() {
lock.readLock().lock();
try {
return cache.size();
} finally {
lock.readLock().unlock();
}
}
}
/**
* 无锁栈实现
*/
static class LockFreeStack<E> {
private static class Node<E> {
final E item;
Node<E> next;
Node(E item) {
this.item = item;
}
}
private final AtomicReference<Node<E>> top = new AtomicReference<>();
public void push(E item) {
Node<E> newHead = new Node<>(item);
Node<E> oldHead;
do {
oldHead = top.get();
newHead.next = oldHead;
} while (!top.compareAndSet(oldHead, newHead));
}
public E pop() {
Node<E> oldHead;
Node<E> newHead;
do {
oldHead = top.get();
if (oldHead == null) {
return null;
}
newHead = oldHead.next;
} while (!top.compareAndSet(oldHead, newHead));
return oldHead.item;
}
public int size() {
int count = 0;
for (Node<E> node = top.get(); node != null; node = node.next) {
count++;
}
return count;
}
}
}
总结
通过本文的深度解析和实战演示,我们掌握了Java并发编程的核心要点:
- 内存模型理解 – JMM的可见性、原子性、有序性原理
- 线程池优化 – 合理的参数配置、监控和动态调整
- 并发容器选择 – 根据场景选择合适的并发集合
- 锁优化策略 – 锁分段、读写锁、无锁编程等高级技巧
- 异步编程 – CompletableFuture等现代并发工具的使用
这些技术能够帮助构建高性能、高可用的Java并发应用,在实际项目中显著提升系统性能。
© 版权声明
THE END














暂无评论内容