微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

使用 VS 2019 Worker Template,后台服务正在退出

如何解决使用 VS 2019 Worker Template,后台服务正在退出

使用 .Net core 3.1 和 Visual Studio 2019 Worker 模板。

我遵循了网上找到的示例代码后台任务 MessageReadingService 从 Amazon SQS 读取消息。消息被写入 ChannelReader。后台任务 MessageProcessingService,从 ChannelReader 读取消息并进行处理。

这对第一条消息按预期工作。在 MessageProcessingService 中设置断点在第一条消息后停止,但再也不会。我假设任务正在退出

我的代码遵循示例代码的模式(工作正常)。任何的意见都将会有帮助。也许是一种调试技术?谢谢!

 public class Program
    {
        public static void Main(string[] args)
        {
            CreateHostBuilder(args).Build().Run();
        }

        public static IHostBuilder CreateHostBuilder(string[] args) =>
            Host.CreateDefaultBuilder(args)
                .ConfigureServices((hostContext,services) =>
                {
                    services.Configure<AwsServicesConfiguration>(hostContext.Configuration.GetSection("AWS"));

                    services.AddAWSService<IAmazonSQS>();
                    services.AddTransient<IMessageProcessor,AwsMessageProcessor>();
                    services.AddSingleton<ISqsMessageChannel,SqsMessageChannel>();
                    services.AddSingleton<ISqsMessageDeleter,SqsMessageDeleter>();
                     services.AddSingleton<ISqsMessageQueue,SqsMessageQueue>();
                    services.AddHostedService<MessageReadingService>();
                    services.AddHostedService<MessageProcessingService>();

                });
    }
    
}

// Message Reading Service
 
  public class MessageReadingService : BackgroundService
    {
        private readonly ILogger<MessageReadingService> _logger;
        private readonly ISqsMessageChannel _sqsMessageChannel;
        private readonly string _queueUrl;
         private static AmazonSQSClient sqs;

        public long ReceivesAttempted { get; private set; }
        public long MessagesReceived { get; private set; }

        public MessageReadingService(
            ILogger<MessageReadingService> logger,ISqsMessageQueue sqsMessageQueue,IOptions<AwsServicesConfiguration> options,ISqsMessageChannel sqsMessageChannel)
        {
            _logger = logger;
            MComSettingsManager mComSettingsManager = new MComSettingsManager();
            BasicAWSCredentials awsCredentials = new BasicAWSCredentials(Access_Key,Secret_Key);
            AmazonSQSConfig amazonSQSConfig = new AmazonSQSConfig();
            amazonSQSConfig.ServiceURL = ServiceURL;
            sqs = new AmazonSQSClient(awsCredentials,amazonSQSConfig);
           
            _sqsMessageChannel = sqsMessageChannel;
            _queueUrl = options.Value.QueueUrl;

            _logger.Loginformation($"Reading from {_queueUrl}");

        }


    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            _logger.Loginformation("Started queue reading service.");

            var receiveMessageRequest = new ReceiveMessageRequest
            {
                AttributeNames = new List<string>() { "Body" },MaxnumberOfMessages = 10,QueueUrl = _queueUrl,WaitTimeSeconds = 5
            };

            while (!stoppingToken.IsCancellationRequested)
            {
                ReceivesAttempted++;

                var receiveMessageResponse =
                    await sqs.ReceiveMessageAsync(receiveMessageRequest,stoppingToken);

                if (receiveMessageResponse.HttpStatusCode == HttpStatusCode.OK &&
                    receiveMessageResponse.Messages.Any())
                {
                    MessagesReceived += receiveMessageResponse.Messages.Count;

                    _logger.Loginformation("Received {MessageCount} messages from the queue.",receiveMessageResponse.Messages.Count);

                    await _sqsMessageChannel.WriteMessagesAsync(receiveMessageResponse.Messages,stoppingToken);
                }
                else if (receiveMessageResponse.HttpStatusCode == HttpStatusCode.OK)
                {
                    _logger.Loginformation("No messages received. Attempting receive again in 1 minute.",receiveMessageResponse.Messages.Count);
                    await Task.Delay(TimeSpan.FromSeconds(5),stoppingToken);
                   
                }
                else if (receiveMessageResponse.HttpStatusCode != HttpStatusCode.OK)
                {
                    _logger.LogError("Unsuccessful response from AWS SQS.");
                }
            }

            _sqsMessageChannel.TryCompleteWriter();
        }

 
 
 
 public class MessageProcessingService : BackgroundService
    {
        private readonly ILogger<MessageProcessingService> _logger;
        private readonly ISqsMessageChannel _sqsMessageChannel;
        private readonly IServiceProvider _serviceProvider;
        private readonly MysqLConnection conn;

        public MessageProcessingService(
            ILogger<MessageProcessingService> logger,ISqsMessageChannel sqsMessageChannel,IServiceProvider serviceProvider
           )
        {
            _logger = logger;
            _sqsMessageChannel = sqsMessageChannel;
            _serviceProvider = serviceProvider;
            conn = new MysqLConnection(GaaSManager.ConnectionString);
            conn.open();
        }

        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            while (!stoppingToken.IsCancellationRequested)
            {
                await foreach (var message in _sqsMessageChannel.Reader.ReadAllAsync(stoppingToken))
                {
                    _logger.Loginformation("Read message to process from channel.");

                    using var scope = _serviceProvider.CreateScope();

                    var messageProcessor = scope.ServiceProvider.GetrequiredService<IMessageProcessor>();

                    await messageProcessor.ProcessOperationsFromMessageAsync(message,conn,stoppingToken);
                }
            }
            
        }
    }
 
 
 
 
 
 public interface IMessageProcessor
    {
        Task ProcessOperationsFromMessageAsync(Message message,MysqLConnection conn,CancellationToken cancellationToken = default);
    }
 
 public class AwsMessageProcessor : IMessageProcessor
    {

        private readonly ISqsMessageDeleter _sqsMessageDeleter;


        public AwsMessageProcessor(ISqsMessageDeleter sqsMessageDeleter)
        {

            _sqsMessageDeleter = sqsMessageDeleter;

        }

        public async Task ProcessOperationsFromMessageAsync(Message message,CancellationToken cancellationToken = default)
        {
            PayloadManager payloadMgr = PayloadManager.Instance;


            Class1 object1 = JsonConvert.DeserializeObject<Class1>(message.Body);



            if (object1.field1 == VALUE)
            {
                // store for later processing
                Class2 object2 = JsonConvert.DeserializeObject<Class2>(object1.JsonWork);

                ProcessObject(object2,conn);

               
            }
            else
            {
                SaveClass3(object1,conn);
            }
            
            await _sqsMessageDeleter.DeleteMessageAsync(message);
        }




      
        public void ProcessObject(Class2 object2,MysqLConnection conn)
        {
            
            Working MysqL Code
                   
                
        }

    
        public void SaveClass3(Class3 object3,MysqLConnection conn)
        {
            
                Working MysqL Code

        }


    }

解决方法

看起来您需要在 while (!stoppingToken.IsCancellationRequested) 中实现 MessageProcessingService 模式,否则 _sqsMessageChannel.Reader.ReadAllAsync 将被调用一次并停止服务。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。