微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

读取消息时未设置带有quartz.net 对象引用的大众运输调度程序

如何解决读取消息时未设置带有quartz.net 对象引用的大众运输调度程序

我在使用 Masstransit 设置quartz.net 调度程序时收到以下错误

消息在RabbitMQ上调度后,quartz.net尝试读取时,报错:

MT-Fault-Message:   Object reference not set to an instance of an object.
MT-Fault-Timestamp: 2021-06-02T18:46:56.1335404Z
MT-Fault-StackTrace:    at Masstransit.QuartzIntegration.ScheduleMessageConsumer.TranslateJsonBody(String body,String destination)
at Masstransit.QuartzIntegration.ScheduleMessageConsumer.CreateJobDetail(ConsumeContext context,Uri destination,JobKey jobKey,Nullable`1 tokenId)
at Masstransit.QuartzIntegration.ScheduleMessageConsumer.Consume(ConsumeContext`1 context)
at Masstransit.Pipeline.ConsumerFactories.DelegateConsumerFactory`1.Send[TMessage](ConsumeContext`1 context,IPipe`1 next)
at Masstransit.Pipeline.ConsumerFactories.DelegateConsumerFactory`1.Send[TMessage](ConsumeContext`1 context,IPipe`1 next)
at Masstransit.Pipeline.Filters.ConsumerMessageFilter`2.GreenPipes.IFilter<Masstransit.ConsumeContext<TMessage>>.Send(ConsumeContext`1 context,IPipe`1 next)
at GreenPipes.Partitioning.Partition.Send[T](T context,IPipe`1 next)
at GreenPipes.Filters.TeeFilter`1.<>c__displayClass5_0.<<Send>g__SendAsync|1>d.MoveNext()
--- End of stack trace from prevIoUs location ---
at GreenPipes.Filters.OutputPipeFilter`2.SendToOutput(IPipe`1 next,TOutput pipeContext)
at GreenPipes.Filters.OutputPipeFilter`2.SendToOutput(IPipe`1 next,TOutput pipeContext)
at GreenPipes.Filters.DynamicFilter`1.<>c__displayClass10_0.<<Send>g__SendAsync|0>d.MoveNext()
--- End of stack trace from prevIoUs location ---
at Masstransit.Pipeline.Filters.Deserializefilter.Send(ReceiveContext context,IPipe`1 next)
at GreenPipes.Filters.RescueFilter`2.GreenPipes.IFilter<TContext>.Send(TContext context,IPipe`1 next)
MT-Fault-ConsumerType:  Masstransit.QuartzIntegration.ScheduleMessageConsumer
MT-Fault-MessageType:   Masstransit.Scheduling.ScheduleMessage

ConfigureServices 是这样设置的:

.ConfigureServices((host,services) =>
                {
                    services.Configure<OtherOptions>(host.Configuration);
                    services.Configure<QuartzOptions>(host.Configuration.GetSection("Quartz"));

                    services.AddSingleton<QuartzConfiguration>();

                    services.AddMasstransit(x =>
                    {
                        x.UsingRabbitMq((context,cfg) =>
                        {
                            var options = context.GetService<QuartzConfiguration>();

                            cfg.AddScheduling(s =>
                            {
                                s.SchedulerFactory = new StdSchedulerFactory(options.Configuration);
                                s.QueueName = options.Queue;
                            });

                            var vhost = host.Configuration.GetValue<string>("RabbitMQ:VirtualHost");

                            cfg.Host(string.Empty,vhost,h =>
                            {
                                h.Username( host.Configuration.GetValue<string>("RabbitMQ:User"));
                                h.Password(host.Configuration.GetValue<string>("RabbitMQ:Password"));
                                h.UseCluster(c =>
                                {
                                    c.Node(host.Configuration.GetValue<string>("RabbitMQ:Node1"));
                                    c.Node(host.Configuration.GetValue<string>("RabbitMQ:Node2"));
                                });
                            });

                        });
                    });

                    services.AddMasstransitHostedService();
                });

这是石英配置的设置方式:

public NameValueCollection Configuration
        {
            get
            {
                var configuration = new NameValueCollection(13)
                {
                    {"quartz.scheduler.instanceName",_options.Value.InstanceName},{"quartz.scheduler.instanceId","AUTO"},{"quartz.plugin.timeZoneConverter.type","Quartz.Plugin.TimeZoneConverter.TimeZoneConverterPlugin,Quartz.Plugins.TimeZoneConverter"},{"quartz.serializer.type","json"},{"quartz.threadPool.type","Quartz.Simpl.SimpleThreadPool,Quartz"},{"quartz.threadPool.threadCount",(_options.Value.ThreadCount ?? 10).ToString("F0")},{"quartz.jobStore.misfireThreshold","60000"},{"quartz.jobStore.type","Quartz.Impl.AdoJobStore.JobStoreTX,{"quartz.jobStore.driverDelegateType","Quartz.Impl.AdoJobStore.sqlServerDelegate,{"quartz.jobStore.tablePrefix",_options.Value.TablePrefix},{"quartz.jobStore.dataSource","default"},{"quartz.dataSource.default.provider",_options.Value.Provider},{"quartz.dataSource.default.connectionString",_options.Value.ConnectionString},{"quartz.jobStore.useProperties","true"}
                };

                foreach (var key in configuration.AllKeys)
                {
                    _logger.Loginformation("{Key} = {Value}",key,configuration[key]);
                }

                return configuration;
            }
        }

也是设置 Masstransit 调度的扩展方法

public static void AddScheduling(this IBusFactoryConfigurator configurator,Action<InMemorySchedulerOptions> configure)
        {
            if (configurator == null)
                throw new ArgumentNullException(nameof(configurator));

            var options = new InMemorySchedulerOptions();
            configure?.Invoke(options);

            if (options.SchedulerFactory == null)
                throw new ArgumentNullException(nameof(options.SchedulerFactory));

            var observer = new SchedulerBusObserver(options);
            
            configurator.ReceiveEndpoint(options.QueueName,e =>
            {
                var partitioner = configurator.CreatePartitioner(Environment.ProcessorCount);

                e.Consumer(() => new ScheduleMessageConsumer(observer.Scheduler),x =>
                    x.Message<ScheduleMessage>(m => m.UsePartitioner(partitioner,p => p.Message.CorrelationId)));

                e.Consumer(() => new CancelScheduledMessageConsumer(observer.Scheduler),x =>
                    x.Message<CancelScheduledMessage>(m => m.UsePartitioner(partitioner,p => p.Message.TokenId)));

                configurator.UseMessageScheduler(e.InputAddress);

                configurator.ConnectBusObserver(observer);
            });
        }

我尝试调试但找不到我收到的错误的原因,数据库连接正在成功创建,消费者上的消息调度也成功完成。 有没有办法在调度程序读取消息时进行实际调试,或者知道抛出对象引用错误的究竟是什么?

编辑 调试 Masstransit.Quartz 框架我注意到问题是在尝试反序列化 TranslateJsonBody 方法上的消息时。

var envelope = JObject.Parse(body);

envelope["destinationAddress"] = destination;

var message = envelope["message"];

var payload = message["payload"];

var payloadType = message["payloadType"];

如您所见,该方法使用驼峰式大小写,并且我们的代理使用 PascalCase 发送,由于没有处理程序,因此当该方法尝试获取 envelope["message"] 时它返回 null,因为我们将其作为“消息”发送”。反序列化时有没有强制camelCase的配置?

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。