DTO实例
public class MessageRequest
{
public string Content { get; set; } = string.Empty;
public DateTime Timestamp { get; set; } = DateTime.UtcNow;
}
防腐接口
public interface IRabbitMQService
{
Task PublishMessageAsync(string message);
Task StartConsumingAsync();
Task StopConsumingAsync();
}
接口实现
public class RabbitMQService : IRabbitMQService, IDisposable
{
private readonly ILogger<RabbitMQService> _logger;
private readonly IConfiguration _configuration;
private IConnection? _connection;
private IChannel? _channel;
private string? _consumerTag;
public RabbitMQService(ILogger<RabbitMQService> logger, IConfiguration configuration)
{
_logger = logger;
_configuration = configuration;
}
private async Task EnsureConnectionAsync()
{
if (_connection == null || !_connection.IsOpen)
{
var factory = new ConnectionFactory
{
HostName = _configuration["RabbitMQ:HostName"] ?? "localhost",
Port = int.Parse(_configuration["RabbitMQ:Port"] ?? "5672"),
UserName = _configuration["RabbitMQ:UserName"] ?? "guest",
Password = _configuration["RabbitMQ:Password"] ?? "guest",
VirtualHost = _configuration["RabbitMQ:VirtualHost"] ?? "/",
ClientProvidedName = "RabbitMQWebApi"
};
_connection = await factory.CreateConnectionAsync();
_logger.LogInformation("RabbitMQ 连接已建立");
}
if (_channel == null || !_channel.IsOpen)
{
_channel = await _connection.CreateChannelAsync();
var exchangeName = _configuration["RabbitMQ:ExchangeName"] ?? "demo-exchange";
var queueName = _configuration["RabbitMQ:QueueName"] ?? "demo-queue";
var routingKey = _configuration["RabbitMQ:RoutingKey"] ?? "demo-routing-key";
// 声明交换器
await _channel.ExchangeDeclareAsync(
exchange: exchangeName,
type: ExchangeType.Direct,
durable: true,
autoDelete: false);
// 声明队列
await _channel.QueueDeclareAsync(
queue: queueName,
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
// 绑定队列到交换器
await _channel.QueueBindAsync(
queue: queueName,
exchange: exchangeName,
routingKey: routingKey);
_logger.LogInformation("RabbitMQ 通道已创建,交换器和队列已配置");
}
}
public async Task PublishMessageAsync(string message)
{
await EnsureConnectionAsync();
var exchangeName = _configuration["RabbitMQ:ExchangeName"] ?? "demo-exchange";
var routingKey = _configuration["RabbitMQ:RoutingKey"] ?? "demo-routing-key";
var body = Encoding.UTF8.GetBytes(message);
var props = new BasicProperties
{
ContentType = "text/plain",
DeliveryMode = DeliveryModes.Persistent,
Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeSeconds())
};
await _channel!.BasicPublishAsync(
exchange: exchangeName,
routingKey: routingKey,
mandatory: false,
basicProperties: props,
body: body);
_logger.LogInformation($"消息已发布: {message}");
}
public async Task StartConsumingAsync()
{
await EnsureConnectionAsync();
var queueName = _configuration["RabbitMQ:QueueName"] ?? "demo-queue";
var consumer = new AsyncEventingBasicConsumer(_channel!);
consumer.ReceivedAsync += async (sender, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
_logger.LogInformation($"收到消息: {message}");
// 手动确认消息
await _channel.BasicAckAsync(ea.DeliveryTag, false);
};
_consumerTag = await _channel.BasicConsumeAsync(
queue: queueName,
autoAck: false,
consumer: consumer);
_logger.LogInformation("开始消费消息");
}
public async Task StopConsumingAsync()
{
if (_channel != null && _consumerTag != null)
{
await _channel.BasicCancelAsync(_consumerTag);
_logger.LogInformation("停止消费消息");
}
}
public void Dispose()
{
_channel?.CloseAsync().GetAwaiter().GetResult();
_channel?.DisposeAsync().GetAwaiter().GetResult();
_connection?.CloseAsync().GetAwaiter().GetResult();
_connection?.DisposeAsync().GetAwaiter().GetResult();
_logger.LogInformation("RabbitMQ 连接已关闭");
}
}