C# 多线程编程学习笔记
📚 目录
🚀 多线程基础概念
什么是多线程?
多线程是指在一个程序中同时运行多个执行流程,让程序能够并发处理多个任务。就像餐厅里有多个服务员同时为不同桌的客人服务一样。
为什么需要多线程?
提高程序响应性:UI线程不会被耗时操作阻塞
充分利用多核CPU:让多个CPU核心同时工作
提高程序吞吐量:并行处理多个任务
🧵 Thread 基础操作
创建和启动线程
using System;
using System.Threading;
class Program
{
static void Main()
{
// 创建线程
Thread thread = new Thread(DoWork);
// 启动线程
thread.Start();
Console.WriteLine("主线程继续执行其他工作...");
// 主线程也做一些工作
for (int i = 0; i < 5; i++)
{
Console.WriteLine($"主线程: {i}");
Thread.Sleep(1000);
}
}
static void DoWork()
{
for (int i = 0; i < 5; i++)
{
Console.WriteLine($"工作线程: {i}");
Thread.Sleep(800);
}
}
}
Join() 方法详解
基础 Join() 用法
Join()
方法让当前线程等待另一个线程完成,就像排队等前面的人办完事再轮到自己。
static void Main()
{
Thread workerThread = new Thread(DoWork);
workerThread.Start();
Console.WriteLine("等待工作线程完成...");
// 等待工作线程完成
workerThread.Join();
Console.WriteLine("工作线程已完成,主线程继续执行");
}
带超时参数的 Join()
Join(int millisecondsTimeout)
允许设置最长等待时间,避免无限等待:
static void Main()
{
Thread longRunningThread = new Thread(LongRunningTask);
longRunningThread.Start();
Console.WriteLine("等待线程完成,最多等待3秒...");
// 最多等待3秒
bool completed = longRunningThread.Join(3000);
if (completed)
{
Console.WriteLine("线程在3秒内完成了");
}
else
{
Console.WriteLine("线程超过3秒还没完成,不再等待");
// 可以选择中断线程
longRunningThread.Interrupt();
}
}
static void LongRunningTask()
{
try
{
// 模拟长时间运行的任务
Thread.Sleep(5000); // 睡眠5秒
Console.WriteLine("长时间任务完成");
}
catch (ThreadInterruptedException)
{
Console.WriteLine("线程被中断");
}
}
Interrupt() 方法
Interrupt()
用于中断正在等待的线程(如 Sleep、Wait、Join 等状态):
static void Main()
{
Thread workerThread = new Thread(InterruptibleWork);
workerThread.Start();
// 让工作线程运行2秒
Thread.Sleep(2000);
Console.WriteLine("中断工作线程...");
workerThread.Interrupt();
workerThread.Join();
Console.WriteLine("程序结束");
}
static void InterruptibleWork()
{
try
{
Console.WriteLine("开始长时间工作...");
// 这个长时间的睡眠会被中断
Thread.Sleep(10000);
Console.WriteLine("工作完成"); // 这行不会执行
}
catch (ThreadInterruptedException)
{
Console.WriteLine("工作被中断了");
}
}
Sleep() 的重要性
对于CPU密集型操作,适当使用 Sleep()
可以:
让其他线程有机会执行
降低CPU使用率
避免饿死其他线程
static void CpuIntensiveWork()
{
for (int i = 0; i < 1000000; i++)
{
// CPU密集型计算
double result = Math.Sqrt(i * i + 1);
// 每处理1000次就休息1毫秒,让其他线程有机会运行
if (i % 1000 == 0)
{
Thread.Sleep(1);
}
}
}
🔒 线程同步机制
原子操作 (Atomic Operations)
原子操作是不可分割的操作,要么完全执行,要么不执行,不会被其他线程中断。
什么时候需要原子操作?
// ❌ 非线程安全的计数器
class UnsafeCounter
{
private int count = 0;
public void Increment()
{
count++; // 这不是原子操作!实际包含:读取、加1、写回
}
public int GetCount() => count;
}
// 多线程环境下可能出现问题
static void TestUnsafeCounter()
{
var counter = new UnsafeCounter();
var threads = new Thread[10];
for (int i = 0; i < 10; i++)
{
threads[i] = new Thread(() =>
{
for (int j = 0; j < 1000; j++)
{
counter.Increment();
}
});
threads[i].Start();
}
foreach (var thread in threads)
thread.Join();
Console.WriteLine($"期望值: 10000, 实际值: {counter.GetCount()}");
// 实际值可能小于10000,因为存在竞争条件
}
Interlocked 类
Interlocked
类提供了原子操作,保证操作的完整性:
// ✅ 线程安全的计数器
class SafeCounter
{
private int count = 0;
public void Increment()
{
Interlocked.Increment(ref count); // 原子递增
}
public void Add(int value)
{
Interlocked.Add(ref count, value); // 原子加法
}
public int GetCount()
{
return Interlocked.Read(ref count); // 原子读取
}
public int Exchange(int newValue)
{
return Interlocked.Exchange(ref count, newValue); // 原子交换
}
public int CompareExchange(int newValue, int comparand)
{
// 如果 count 等于 comparand,则设置为 newValue
return Interlocked.CompareExchange(ref count, newValue, comparand);
}
}
// 使用示例
static void TestSafeCounter()
{
var counter = new SafeCounter();
var threads = new Thread[10];
for (int i = 0; i < 10; i++)
{
threads[i] = new Thread(() =>
{
for (int j = 0; j < 1000; j++)
{
counter.Increment();
}
});
threads[i].Start();
}
foreach (var thread in threads)
thread.Join();
Console.WriteLine($"安全计数器结果: {counter.GetCount()}"); // 总是10000
}
Semaphore 信号量
信号量控制同时访问某个资源的线程数量,就像停车场的车位限制。
class ResourcePool
{
private readonly Semaphore semaphore;
private readonly List<string> resources;
public ResourcePool(int maxConcurrency)
{
// 创建信号量,最多允许 maxConcurrency 个线程同时访问
semaphore = new Semaphore(maxConcurrency, maxConcurrency);
// 初始化资源池
resources = new List<string>();
for (int i = 0; i < maxConcurrency; i++)
{
resources.Add($"资源{i + 1}");
}
}
public void UseResource(int threadId)
{
Console.WriteLine($"线程 {threadId} 等待获取资源...");
// 等待信号量(获取资源)
semaphore.WaitOne();
try
{
Console.WriteLine($"线程 {threadId} 获取到资源,开始使用");
// 模拟使用资源
Thread.Sleep(2000);
Console.WriteLine($"线程 {threadId} 使用完资源");
}
finally
{
// 释放信号量(归还资源)
semaphore.Release();
Console.WriteLine($"线程 {threadId} 释放资源");
}
}
public void Dispose()
{
semaphore?.Dispose();
}
}
// 使用示例
static void TestSemaphore()
{
// 创建最多允许3个线程同时访问的资源池
var resourcePool = new ResourcePool(3);
// 创建5个线程尝试访问资源
var threads = new Thread[5];
for (int i = 0; i < 5; i++)
{
int threadId = i + 1;
threads[i] = new Thread(() => resourcePool.UseResource(threadId));
threads[i].Start();
}
// 等待所有线程完成
foreach (var thread in threads)
thread.Join();
resourcePool.Dispose();
}
⚡ 并行 LINQ (PLINQ)
AsParallel() 基础用法
PLINQ 让 LINQ 查询能够并行执行,充分利用多核CPU:
using System.Linq;
static void BasicPlinqExample()
{
// 创建大量数据
var numbers = Enumerable.Range(1, 10_000_000).ToArray();
var stopwatch = System.Diagnostics.Stopwatch.StartNew();
// 传统的顺序执行
var sequentialResult = numbers
.Where(x => IsPrime(x))
.Count();
stopwatch.Stop();
Console.WriteLine($"顺序执行时间: {stopwatch.ElapsedMilliseconds}ms, 结果: {sequentialResult}");
stopwatch.Restart();
// 并行执行
var parallelResult = numbers
.AsParallel() // 启用并行执行
.Where(x => IsPrime(x))
.Count();
stopwatch.Stop();
Console.WriteLine($"并行执行时间: {stopwatch.ElapsedMilliseconds}ms, 结果: {parallelResult}");
}
// 判断是否为质数(CPU密集型操作)
static bool IsPrime(int number)
{
if (number < 2) return false;
if (number == 2) return true;
if (number % 2 == 0) return false;
for (int i = 3; i <= Math.Sqrt(number); i += 2)
{
if (number % i == 0) return false;
}
return true;
}
AsParallel() 执行流程详解
static void PlinqExecutionFlow()
{
var data = Enumerable.Range(1, 20).ToArray();
Console.WriteLine("=== PLINQ 执行流程演示 ===");
var result = data
.AsParallel()
.WithDegreeOfParallelism(4) // 设置并行度为4
.Select(x =>
{
// 显示当前执行的线程
int threadId = Thread.CurrentThread.ManagedThreadId;
Console.WriteLine($"处理数据 {x} - 线程ID: {threadId}");
// 模拟CPU密集型操作
Thread.Sleep(100);
return x * x;
})
.Where(x => x % 2 == 0)
.OrderBy(x => x) // 注意:OrderBy会强制收集所有结果
.ToArray();
Console.WriteLine($"最终结果: [{string.Join(", ", result)}]");
}
PLINQ 配置选项
static void PlinqConfiguration()
{
var data = Enumerable.Range(1, 1000).ToArray();
var result = data
.AsParallel()
.WithDegreeOfParallelism(Environment.ProcessorCount) // 设置并行度
.WithCancellation(CancellationToken.None) // 支持取消
.WithExecutionMode(ParallelExecutionMode.ForceParallelism) // 强制并行
.Select(x => x * 2)
.Where(x => x > 100)
.AsOrdered() // 保持顺序(会影响性能)
.Take(10)
.ToArray();
Console.WriteLine($"配置后的PLINQ结果: [{string.Join(", ", result)}]");
}
何时使用 PLINQ
static void WhenToUsePlinq()
{
Console.WriteLine("=== PLINQ 适用场景分析 ===");
// ✅ 适合PLINQ:CPU密集型,大数据量
var largeData = Enumerable.Range(1, 1_000_000);
var cpuIntensiveResult = largeData
.AsParallel()
.Where(x => IsPrime(x))
.Count();
Console.WriteLine($"CPU密集型任务适合PLINQ: {cpuIntensiveResult}");
// ❌ 不适合PLINQ:简单操作,小数据量
var smallData = Enumerable.Range(1, 100);
var simpleResult = smallData
.AsParallel() // 并行开销可能大于收益
.Select(x => x + 1)
.ToArray();
Console.WriteLine($"简单操作不适合PLINQ: {simpleResult.Length}");
// ⚠️ 需要注意:有副作用的操作
var counter = 0;
var problematicResult = Enumerable.Range(1, 1000)
.AsParallel()
.Select(x =>
{
Interlocked.Increment(ref counter); // 需要使用线程安全操作
return x;
})
.ToArray();
Console.WriteLine($"有副作用的操作需要注意线程安全: counter = {counter}");
}
💡 实战示例
综合示例:文件处理器
class ParallelFileProcessor
{
private readonly Semaphore _semaphore;
private int _processedCount = 0;
public ParallelFileProcessor(int maxConcurrency = 4)
{
_semaphore = new Semaphore(maxConcurrency, maxConcurrency);
}
public async Task ProcessFilesAsync(string[] filePaths)
{
Console.WriteLine($"开始处理 {filePaths.Length} 个文件...");
// 使用PLINQ并行处理文件
var tasks = filePaths
.AsParallel()
.WithDegreeOfParallelism(Environment.ProcessorCount)
.Select(ProcessSingleFileAsync)
.ToArray();
// 等待所有任务完成
await Task.WhenAll(tasks);
Console.WriteLine($"所有文件处理完成,共处理 {_processedCount} 个文件");
}
private async Task ProcessSingleFileAsync(string filePath)
{
// 获取信号量
_semaphore.WaitOne();
try
{
int threadId = Thread.CurrentThread.ManagedThreadId;
Console.WriteLine($"线程 {threadId} 开始处理文件: {Path.GetFileName(filePath)}");
// 模拟文件处理(CPU密集型操作)
await Task.Run(() =>
{
// 模拟复杂的文件处理
for (int i = 0; i < 100; i++)
{
Math.Sqrt(i * 1000);
// 适当让出CPU时间
if (i % 10 == 0)
Thread.Sleep(1);
}
});
// 原子地增加计数器
Interlocked.Increment(ref _processedCount);
Console.WriteLine($"线程 {threadId} 完成处理文件: {Path.GetFileName(filePath)}");
}
finally
{
// 释放信号量
_semaphore.Release();
}
}
public void Dispose()
{
_semaphore?.Dispose();
}
}
// 使用示例
static async Task FileProcessorExample()
{
// 模拟文件路径
var filePaths = Enumerable.Range(1, 20)
.Select(i => $"file_{i:D3}.txt")
.ToArray();
var processor = new ParallelFileProcessor(maxConcurrency: 3);
var stopwatch = System.Diagnostics.Stopwatch.StartNew();
await processor.ProcessFilesAsync(filePaths);
stopwatch.Stop();
Console.WriteLine($"总耗时: {stopwatch.ElapsedMilliseconds}ms");
processor.Dispose();
}