如何解决使用 VS 2019 Worker Template,后台服务正在退出
使用 .Net core 3.1 和 Visual Studio 2019 Worker 模板。
我遵循了网上找到的示例代码。后台任务 MessageReadingService 从 Amazon SQS 读取消息。消息被写入 ChannelReader。后台任务 MessageProcessingService,从 ChannelReader 读取消息并进行处理。
这对第一条消息按预期工作。在 MessageProcessingService 中设置断点在第一条消息后停止,但再也不会。我假设任务正在退出。
我的代码遵循示例代码的模式(工作正常)。任何的意见都将会有帮助。也许是一种调试技术?谢谢!
public class Program
{
public static void Main(string[] args)
{
CreateHostBuilder(args).Build().Run();
}
public static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.ConfigureServices((hostContext,services) =>
{
services.Configure<AwsServicesConfiguration>(hostContext.Configuration.GetSection("AWS"));
services.AddAWSService<IAmazonSQS>();
services.AddTransient<IMessageProcessor,AwsMessageProcessor>();
services.AddSingleton<ISqsMessageChannel,SqsMessageChannel>();
services.AddSingleton<ISqsMessageDeleter,SqsMessageDeleter>();
services.AddSingleton<ISqsMessageQueue,SqsMessageQueue>();
services.AddHostedService<MessageReadingService>();
services.AddHostedService<MessageProcessingService>();
});
}
}
// Message Reading Service
public class MessageReadingService : BackgroundService
{
private readonly ILogger<MessageReadingService> _logger;
private readonly ISqsMessageChannel _sqsMessageChannel;
private readonly string _queueUrl;
private static AmazonSQSClient sqs;
public long ReceivesAttempted { get; private set; }
public long MessagesReceived { get; private set; }
public MessageReadingService(
ILogger<MessageReadingService> logger,ISqsMessageQueue sqsMessageQueue,IOptions<AwsServicesConfiguration> options,ISqsMessageChannel sqsMessageChannel)
{
_logger = logger;
MComSettingsManager mComSettingsManager = new MComSettingsManager();
BasicAWSCredentials awsCredentials = new BasicAWSCredentials(Access_Key,Secret_Key);
AmazonSQSConfig amazonSQSConfig = new AmazonSQSConfig();
amazonSQSConfig.ServiceURL = ServiceURL;
sqs = new AmazonSQSClient(awsCredentials,amazonSQSConfig);
_sqsMessageChannel = sqsMessageChannel;
_queueUrl = options.Value.QueueUrl;
_logger.Loginformation($"Reading from {_queueUrl}");
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.Loginformation("Started queue reading service.");
var receiveMessageRequest = new ReceiveMessageRequest
{
AttributeNames = new List<string>() { "Body" },MaxnumberOfMessages = 10,QueueUrl = _queueUrl,WaitTimeSeconds = 5
};
while (!stoppingToken.IsCancellationRequested)
{
ReceivesAttempted++;
var receiveMessageResponse =
await sqs.ReceiveMessageAsync(receiveMessageRequest,stoppingToken);
if (receiveMessageResponse.HttpStatusCode == HttpStatusCode.OK &&
receiveMessageResponse.Messages.Any())
{
MessagesReceived += receiveMessageResponse.Messages.Count;
_logger.Loginformation("Received {MessageCount} messages from the queue.",receiveMessageResponse.Messages.Count);
await _sqsMessageChannel.WriteMessagesAsync(receiveMessageResponse.Messages,stoppingToken);
}
else if (receiveMessageResponse.HttpStatusCode == HttpStatusCode.OK)
{
_logger.Loginformation("No messages received. Attempting receive again in 1 minute.",receiveMessageResponse.Messages.Count);
await Task.Delay(TimeSpan.FromSeconds(5),stoppingToken);
}
else if (receiveMessageResponse.HttpStatusCode != HttpStatusCode.OK)
{
_logger.LogError("Unsuccessful response from AWS SQS.");
}
}
_sqsMessageChannel.TryCompleteWriter();
}
public class MessageProcessingService : BackgroundService
{
private readonly ILogger<MessageProcessingService> _logger;
private readonly ISqsMessageChannel _sqsMessageChannel;
private readonly IServiceProvider _serviceProvider;
private readonly MysqLConnection conn;
public MessageProcessingService(
ILogger<MessageProcessingService> logger,ISqsMessageChannel sqsMessageChannel,IServiceProvider serviceProvider
)
{
_logger = logger;
_sqsMessageChannel = sqsMessageChannel;
_serviceProvider = serviceProvider;
conn = new MysqLConnection(GaaSManager.ConnectionString);
conn.open();
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
await foreach (var message in _sqsMessageChannel.Reader.ReadAllAsync(stoppingToken))
{
_logger.Loginformation("Read message to process from channel.");
using var scope = _serviceProvider.CreateScope();
var messageProcessor = scope.ServiceProvider.GetrequiredService<IMessageProcessor>();
await messageProcessor.ProcessOperationsFromMessageAsync(message,conn,stoppingToken);
}
}
}
}
public interface IMessageProcessor
{
Task ProcessOperationsFromMessageAsync(Message message,MysqLConnection conn,CancellationToken cancellationToken = default);
}
public class AwsMessageProcessor : IMessageProcessor
{
private readonly ISqsMessageDeleter _sqsMessageDeleter;
public AwsMessageProcessor(ISqsMessageDeleter sqsMessageDeleter)
{
_sqsMessageDeleter = sqsMessageDeleter;
}
public async Task ProcessOperationsFromMessageAsync(Message message,CancellationToken cancellationToken = default)
{
PayloadManager payloadMgr = PayloadManager.Instance;
Class1 object1 = JsonConvert.DeserializeObject<Class1>(message.Body);
if (object1.field1 == VALUE)
{
// store for later processing
Class2 object2 = JsonConvert.DeserializeObject<Class2>(object1.JsonWork);
ProcessObject(object2,conn);
}
else
{
SaveClass3(object1,conn);
}
await _sqsMessageDeleter.DeleteMessageAsync(message);
}
public void ProcessObject(Class2 object2,MysqLConnection conn)
{
Working MysqL Code
}
public void SaveClass3(Class3 object3,MysqLConnection conn)
{
Working MysqL Code
}
}
解决方法
看起来您需要在 while (!stoppingToken.IsCancellationRequested)
中实现 MessageProcessingService
模式,否则 _sqsMessageChannel.Reader.ReadAllAsync
将被调用一次并停止服务。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。