Decade
Decade
Published on 2025-07-22 / 22 Visits
0
0

新多线程

C# 多线程编程学习笔记

📚 目录

  1. 多线程基础概念

  2. Thread 基础操作

  3. 线程同步机制

  4. 并行 LINQ (PLINQ)

  5. 实战示例


🚀 多线程基础概念

什么是多线程?

多线程是指在一个程序中同时运行多个执行流程,让程序能够并发处理多个任务。就像餐厅里有多个服务员同时为不同桌的客人服务一样。

为什么需要多线程?

  • 提高程序响应性:UI线程不会被耗时操作阻塞

  • 充分利用多核CPU:让多个CPU核心同时工作

  • 提高程序吞吐量:并行处理多个任务


C.pdf

🧵 Thread 基础操作

创建和启动线程

using System;
using System.Threading;

class Program
{
    static void Main()
    {
        // 创建线程
        Thread thread = new Thread(DoWork);
        
        // 启动线程
        thread.Start();
        
        Console.WriteLine("主线程继续执行其他工作...");
        
        // 主线程也做一些工作
        for (int i = 0; i < 5; i++)
        {
            Console.WriteLine($"主线程: {i}");
            Thread.Sleep(1000);
        }
    }
    
    static void DoWork()
    {
        for (int i = 0; i < 5; i++)
        {
            Console.WriteLine($"工作线程: {i}");
            Thread.Sleep(800);
        }
    }
}

Join() 方法详解

基础 Join() 用法

Join() 方法让当前线程等待另一个线程完成,就像排队等前面的人办完事再轮到自己。

static void Main()
{
    Thread workerThread = new Thread(DoWork);
    workerThread.Start();
    
    Console.WriteLine("等待工作线程完成...");
    
    // 等待工作线程完成
    workerThread.Join();
    
    Console.WriteLine("工作线程已完成,主线程继续执行");
}

带超时参数的 Join()

Join(int millisecondsTimeout) 允许设置最长等待时间,避免无限等待:

static void Main()
{
    Thread longRunningThread = new Thread(LongRunningTask);
    longRunningThread.Start();
    
    Console.WriteLine("等待线程完成,最多等待3秒...");
    
    // 最多等待3秒
    bool completed = longRunningThread.Join(3000);
    
    if (completed)
    {
        Console.WriteLine("线程在3秒内完成了");
    }
    else
    {
        Console.WriteLine("线程超过3秒还没完成,不再等待");
        // 可以选择中断线程
        longRunningThread.Interrupt();
    }
}

static void LongRunningTask()
{
    try
    {
        // 模拟长时间运行的任务
        Thread.Sleep(5000); // 睡眠5秒
        Console.WriteLine("长时间任务完成");
    }
    catch (ThreadInterruptedException)
    {
        Console.WriteLine("线程被中断");
    }
}

Interrupt() 方法

Interrupt() 用于中断正在等待的线程(如 Sleep、Wait、Join 等状态):

static void Main()
{
    Thread workerThread = new Thread(InterruptibleWork);
    workerThread.Start();
    
    // 让工作线程运行2秒
    Thread.Sleep(2000);
    
    Console.WriteLine("中断工作线程...");
    workerThread.Interrupt();
    
    workerThread.Join();
    Console.WriteLine("程序结束");
}

static void InterruptibleWork()
{
    try
    {
        Console.WriteLine("开始长时间工作...");
        
        // 这个长时间的睡眠会被中断
        Thread.Sleep(10000);
        
        Console.WriteLine("工作完成"); // 这行不会执行
    }
    catch (ThreadInterruptedException)
    {
        Console.WriteLine("工作被中断了");
    }
}

Sleep() 的重要性

对于CPU密集型操作,适当使用 Sleep() 可以:

  • 让其他线程有机会执行

  • 降低CPU使用率

  • 避免饿死其他线程

static void CpuIntensiveWork()
{
    for (int i = 0; i < 1000000; i++)
    {
        // CPU密集型计算
        double result = Math.Sqrt(i * i + 1);
        
        // 每处理1000次就休息1毫秒,让其他线程有机会运行
        if (i % 1000 == 0)
        {
            Thread.Sleep(1);
        }
    }
}

🔒 线程同步机制

原子操作 (Atomic Operations)

原子操作是不可分割的操作,要么完全执行,要么不执行,不会被其他线程中断。

什么时候需要原子操作?

// ❌ 非线程安全的计数器
class UnsafeCounter
{
    private int count = 0;
    
    public void Increment()
    {
        count++; // 这不是原子操作!实际包含:读取、加1、写回
    }
    
    public int GetCount() => count;
}

// 多线程环境下可能出现问题
static void TestUnsafeCounter()
{
    var counter = new UnsafeCounter();
    var threads = new Thread[10];
    
    for (int i = 0; i < 10; i++)
    {
        threads[i] = new Thread(() =>
        {
            for (int j = 0; j < 1000; j++)
            {
                counter.Increment();
            }
        });
        threads[i].Start();
    }
    
    foreach (var thread in threads)
        thread.Join();
    
    Console.WriteLine($"期望值: 10000, 实际值: {counter.GetCount()}");
    // 实际值可能小于10000,因为存在竞争条件
}

Interlocked 类

Interlocked 类提供了原子操作,保证操作的完整性:

// ✅ 线程安全的计数器
class SafeCounter
{
    private int count = 0;
    
    public void Increment()
    {
        Interlocked.Increment(ref count); // 原子递增
    }
    
    public void Add(int value)
    {
        Interlocked.Add(ref count, value); // 原子加法
    }
    
    public int GetCount()
    {
        return Interlocked.Read(ref count); // 原子读取
    }
    
    public int Exchange(int newValue)
    {
        return Interlocked.Exchange(ref count, newValue); // 原子交换
    }
    
    public int CompareExchange(int newValue, int comparand)
    {
        // 如果 count 等于 comparand,则设置为 newValue
        return Interlocked.CompareExchange(ref count, newValue, comparand);
    }
}

// 使用示例
static void TestSafeCounter()
{
    var counter = new SafeCounter();
    var threads = new Thread[10];
    
    for (int i = 0; i < 10; i++)
    {
        threads[i] = new Thread(() =>
        {
            for (int j = 0; j < 1000; j++)
            {
                counter.Increment();
            }
        });
        threads[i].Start();
    }
    
    foreach (var thread in threads)
        thread.Join();
    
    Console.WriteLine($"安全计数器结果: {counter.GetCount()}"); // 总是10000
}

Semaphore 信号量

信号量控制同时访问某个资源的线程数量,就像停车场的车位限制。

class ResourcePool
{
    private readonly Semaphore semaphore;
    private readonly List<string> resources;
    
    public ResourcePool(int maxConcurrency)
    {
        // 创建信号量,最多允许 maxConcurrency 个线程同时访问
        semaphore = new Semaphore(maxConcurrency, maxConcurrency);
        
        // 初始化资源池
        resources = new List<string>();
        for (int i = 0; i < maxConcurrency; i++)
        {
            resources.Add($"资源{i + 1}");
        }
    }
    
    public void UseResource(int threadId)
    {
        Console.WriteLine($"线程 {threadId} 等待获取资源...");
        
        // 等待信号量(获取资源)
        semaphore.WaitOne();
        
        try
        {
            Console.WriteLine($"线程 {threadId} 获取到资源,开始使用");
            
            // 模拟使用资源
            Thread.Sleep(2000);
            
            Console.WriteLine($"线程 {threadId} 使用完资源");
        }
        finally
        {
            // 释放信号量(归还资源)
            semaphore.Release();
            Console.WriteLine($"线程 {threadId} 释放资源");
        }
    }
    
    public void Dispose()
    {
        semaphore?.Dispose();
    }
}

// 使用示例
static void TestSemaphore()
{
    // 创建最多允许3个线程同时访问的资源池
    var resourcePool = new ResourcePool(3);
    
    // 创建5个线程尝试访问资源
    var threads = new Thread[5];
    for (int i = 0; i < 5; i++)
    {
        int threadId = i + 1;
        threads[i] = new Thread(() => resourcePool.UseResource(threadId));
        threads[i].Start();
    }
    
    // 等待所有线程完成
    foreach (var thread in threads)
        thread.Join();
    
    resourcePool.Dispose();
}

⚡ 并行 LINQ (PLINQ)

AsParallel() 基础用法

PLINQ 让 LINQ 查询能够并行执行,充分利用多核CPU:

using System.Linq;

static void BasicPlinqExample()
{
    // 创建大量数据
    var numbers = Enumerable.Range(1, 10_000_000).ToArray();
    
    var stopwatch = System.Diagnostics.Stopwatch.StartNew();
    
    // 传统的顺序执行
    var sequentialResult = numbers
        .Where(x => IsPrime(x))
        .Count();
    
    stopwatch.Stop();
    Console.WriteLine($"顺序执行时间: {stopwatch.ElapsedMilliseconds}ms, 结果: {sequentialResult}");
    
    stopwatch.Restart();
    
    // 并行执行
    var parallelResult = numbers
        .AsParallel()  // 启用并行执行
        .Where(x => IsPrime(x))
        .Count();
    
    stopwatch.Stop();
    Console.WriteLine($"并行执行时间: {stopwatch.ElapsedMilliseconds}ms, 结果: {parallelResult}");
}

// 判断是否为质数(CPU密集型操作)
static bool IsPrime(int number)
{
    if (number < 2) return false;
    if (number == 2) return true;
    if (number % 2 == 0) return false;
    
    for (int i = 3; i <= Math.Sqrt(number); i += 2)
    {
        if (number % i == 0) return false;
    }
    return true;
}

AsParallel() 执行流程详解

static void PlinqExecutionFlow()
{
    var data = Enumerable.Range(1, 20).ToArray();
    
    Console.WriteLine("=== PLINQ 执行流程演示 ===");
    
    var result = data
        .AsParallel()
        .WithDegreeOfParallelism(4) // 设置并行度为4
        .Select(x =>
        {
            // 显示当前执行的线程
            int threadId = Thread.CurrentThread.ManagedThreadId;
            Console.WriteLine($"处理数据 {x} - 线程ID: {threadId}");
            
            // 模拟CPU密集型操作
            Thread.Sleep(100);
            return x * x;
        })
        .Where(x => x % 2 == 0)
        .OrderBy(x => x) // 注意:OrderBy会强制收集所有结果
        .ToArray();
    
    Console.WriteLine($"最终结果: [{string.Join(", ", result)}]");
}

PLINQ 配置选项

static void PlinqConfiguration()
{
    var data = Enumerable.Range(1, 1000).ToArray();
    
    var result = data
        .AsParallel()
        .WithDegreeOfParallelism(Environment.ProcessorCount) // 设置并行度
        .WithCancellation(CancellationToken.None) // 支持取消
        .WithExecutionMode(ParallelExecutionMode.ForceParallelism) // 强制并行
        .Select(x => x * 2)
        .Where(x => x > 100)
        .AsOrdered() // 保持顺序(会影响性能)
        .Take(10)
        .ToArray();
    
    Console.WriteLine($"配置后的PLINQ结果: [{string.Join(", ", result)}]");
}

何时使用 PLINQ

static void WhenToUsePlinq()
{
    Console.WriteLine("=== PLINQ 适用场景分析 ===");
    
    // ✅ 适合PLINQ:CPU密集型,大数据量
    var largeData = Enumerable.Range(1, 1_000_000);
    var cpuIntensiveResult = largeData
        .AsParallel()
        .Where(x => IsPrime(x))
        .Count();
    Console.WriteLine($"CPU密集型任务适合PLINQ: {cpuIntensiveResult}");
    
    // ❌ 不适合PLINQ:简单操作,小数据量
    var smallData = Enumerable.Range(1, 100);
    var simpleResult = smallData
        .AsParallel() // 并行开销可能大于收益
        .Select(x => x + 1)
        .ToArray();
    Console.WriteLine($"简单操作不适合PLINQ: {simpleResult.Length}");
    
    // ⚠️ 需要注意:有副作用的操作
    var counter = 0;
    var problematicResult = Enumerable.Range(1, 1000)
        .AsParallel()
        .Select(x =>
        {
            Interlocked.Increment(ref counter); // 需要使用线程安全操作
            return x;
        })
        .ToArray();
    Console.WriteLine($"有副作用的操作需要注意线程安全: counter = {counter}");
}

💡 实战示例

综合示例:文件处理器

class ParallelFileProcessor
{
    private readonly Semaphore _semaphore;
    private int _processedCount = 0;
    
    public ParallelFileProcessor(int maxConcurrency = 4)
    {
        _semaphore = new Semaphore(maxConcurrency, maxConcurrency);
    }
    
    public async Task ProcessFilesAsync(string[] filePaths)
    {
        Console.WriteLine($"开始处理 {filePaths.Length} 个文件...");
        
        // 使用PLINQ并行处理文件
        var tasks = filePaths
            .AsParallel()
            .WithDegreeOfParallelism(Environment.ProcessorCount)
            .Select(ProcessSingleFileAsync)
            .ToArray();
        
        // 等待所有任务完成
        await Task.WhenAll(tasks);
        
        Console.WriteLine($"所有文件处理完成,共处理 {_processedCount} 个文件");
    }
    
    private async Task ProcessSingleFileAsync(string filePath)
    {
        // 获取信号量
        _semaphore.WaitOne();
        
        try
        {
            int threadId = Thread.CurrentThread.ManagedThreadId;
            Console.WriteLine($"线程 {threadId} 开始处理文件: {Path.GetFileName(filePath)}");
            
            // 模拟文件处理(CPU密集型操作)
            await Task.Run(() =>
            {
                // 模拟复杂的文件处理
                for (int i = 0; i < 100; i++)
                {
                    Math.Sqrt(i * 1000);
                    
                    // 适当让出CPU时间
                    if (i % 10 == 0)
                        Thread.Sleep(1);
                }
            });
            
            // 原子地增加计数器
            Interlocked.Increment(ref _processedCount);
            
            Console.WriteLine($"线程 {threadId} 完成处理文件: {Path.GetFileName(filePath)}");
        }
        finally
        {
            // 释放信号量
            _semaphore.Release();
        }
    }
    
    public void Dispose()
    {
        _semaphore?.Dispose();
    }
}

// 使用示例
static async Task FileProcessorExample()
{
    // 模拟文件路径
    var filePaths = Enumerable.Range(1, 20)
        .Select(i => $"file_{i:D3}.txt")
        .ToArray();
    
    var processor = new ParallelFileProcessor(maxConcurrency: 3);
    
    var stopwatch = System.Diagnostics.Stopwatch.StartNew();
    await processor.ProcessFilesAsync(filePaths);
    stopwatch.Stop();
    
    Console.WriteLine($"总耗时: {stopwatch.ElapsedMilliseconds}ms");
    
    processor.Dispose();
}


Comment