消息队列RabbitMQ在Webapi的简单使用

decade
7
2025-12-30

DTO实例

public class MessageRequest
{
    public string Content { get; set; } = string.Empty;
    public DateTime Timestamp { get; set; } = DateTime.UtcNow;
}

防腐接口

public interface IRabbitMQService
{
    Task PublishMessageAsync(string message);
    Task StartConsumingAsync();
    Task StopConsumingAsync();
}

接口实现

public class RabbitMQService : IRabbitMQService, IDisposable
{
    private readonly ILogger<RabbitMQService> _logger;
    private readonly IConfiguration _configuration;
    private IConnection? _connection;
    private IChannel? _channel;
    private string? _consumerTag;

    public RabbitMQService(ILogger<RabbitMQService> logger, IConfiguration configuration)
    {
        _logger = logger;
        _configuration = configuration;
    }

    private async Task EnsureConnectionAsync()
    {
        if (_connection == null || !_connection.IsOpen)
        {
            var factory = new ConnectionFactory
            {
                HostName = _configuration["RabbitMQ:HostName"] ?? "localhost",
                Port = int.Parse(_configuration["RabbitMQ:Port"] ?? "5672"),
                UserName = _configuration["RabbitMQ:UserName"] ?? "guest",
                Password = _configuration["RabbitMQ:Password"] ?? "guest",
                VirtualHost = _configuration["RabbitMQ:VirtualHost"] ?? "/",
                ClientProvidedName = "RabbitMQWebApi"
            };

            _connection = await factory.CreateConnectionAsync();
            _logger.LogInformation("RabbitMQ 连接已建立");
        }

        if (_channel == null || !_channel.IsOpen)
        {
            _channel = await _connection.CreateChannelAsync();
            
            var exchangeName = _configuration["RabbitMQ:ExchangeName"] ?? "demo-exchange";
            var queueName = _configuration["RabbitMQ:QueueName"] ?? "demo-queue";
            var routingKey = _configuration["RabbitMQ:RoutingKey"] ?? "demo-routing-key";

            // 声明交换器
            await _channel.ExchangeDeclareAsync(
                exchange: exchangeName,
                type: ExchangeType.Direct,
                durable: true,
                autoDelete: false);

            // 声明队列
            await _channel.QueueDeclareAsync(
                queue: queueName,
                durable: true,
                exclusive: false,
                autoDelete: false,
                arguments: null);

            // 绑定队列到交换器
            await _channel.QueueBindAsync(
                queue: queueName,
                exchange: exchangeName,
                routingKey: routingKey);

            _logger.LogInformation("RabbitMQ 通道已创建,交换器和队列已配置");
        }
    }

    public async Task PublishMessageAsync(string message)
    {
        await EnsureConnectionAsync();

        var exchangeName = _configuration["RabbitMQ:ExchangeName"] ?? "demo-exchange";
        var routingKey = _configuration["RabbitMQ:RoutingKey"] ?? "demo-routing-key";

        var body = Encoding.UTF8.GetBytes(message);
        var props = new BasicProperties
        {
            ContentType = "text/plain",
            DeliveryMode = DeliveryModes.Persistent,
            Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeSeconds())
        };

        await _channel!.BasicPublishAsync(
            exchange: exchangeName,
            routingKey: routingKey,
            mandatory: false,
            basicProperties: props,
            body: body);

        _logger.LogInformation($"消息已发布: {message}");
    }

    public async Task StartConsumingAsync()
    {
        await EnsureConnectionAsync();

        var queueName = _configuration["RabbitMQ:QueueName"] ?? "demo-queue";

        var consumer = new AsyncEventingBasicConsumer(_channel!);
        
        consumer.ReceivedAsync += async (sender, ea) =>
        {
            var body = ea.Body.ToArray();
            var message = Encoding.UTF8.GetString(body);
            
            _logger.LogInformation($"收到消息: {message}");
            
            // 手动确认消息
            await _channel.BasicAckAsync(ea.DeliveryTag, false);
        };

        _consumerTag = await _channel.BasicConsumeAsync(
            queue: queueName,
            autoAck: false,
            consumer: consumer);

        _logger.LogInformation("开始消费消息");
    }

    public async Task StopConsumingAsync()
    {
        if (_channel != null && _consumerTag != null)
        {
            await _channel.BasicCancelAsync(_consumerTag);
            _logger.LogInformation("停止消费消息");
        }
    }

    public void Dispose()
    {
        _channel?.CloseAsync().GetAwaiter().GetResult();
        _channel?.DisposeAsync().GetAwaiter().GetResult();
        
        _connection?.CloseAsync().GetAwaiter().GetResult();
        _connection?.DisposeAsync().GetAwaiter().GetResult();
        
        _logger.LogInformation("RabbitMQ 连接已关闭");
    }
}