如何解决读取消息时未设置带有quartz.net 对象引用的大众运输调度程序
我在使用 Masstransit 设置quartz.net 调度程序时收到以下错误
消息在RabbitMQ上调度后,quartz.net尝试读取时,报错:
MT-Fault-Message: Object reference not set to an instance of an object.
MT-Fault-Timestamp: 2021-06-02T18:46:56.1335404Z
MT-Fault-StackTrace: at Masstransit.QuartzIntegration.ScheduleMessageConsumer.TranslateJsonBody(String body,String destination)
at Masstransit.QuartzIntegration.ScheduleMessageConsumer.CreateJobDetail(ConsumeContext context,Uri destination,JobKey jobKey,Nullable`1 tokenId)
at Masstransit.QuartzIntegration.ScheduleMessageConsumer.Consume(ConsumeContext`1 context)
at Masstransit.Pipeline.ConsumerFactories.DelegateConsumerFactory`1.Send[TMessage](ConsumeContext`1 context,IPipe`1 next)
at Masstransit.Pipeline.ConsumerFactories.DelegateConsumerFactory`1.Send[TMessage](ConsumeContext`1 context,IPipe`1 next)
at Masstransit.Pipeline.Filters.ConsumerMessageFilter`2.GreenPipes.IFilter<Masstransit.ConsumeContext<TMessage>>.Send(ConsumeContext`1 context,IPipe`1 next)
at GreenPipes.Partitioning.Partition.Send[T](T context,IPipe`1 next)
at GreenPipes.Filters.TeeFilter`1.<>c__displayClass5_0.<<Send>g__SendAsync|1>d.MoveNext()
--- End of stack trace from prevIoUs location ---
at GreenPipes.Filters.OutputPipeFilter`2.SendToOutput(IPipe`1 next,TOutput pipeContext)
at GreenPipes.Filters.OutputPipeFilter`2.SendToOutput(IPipe`1 next,TOutput pipeContext)
at GreenPipes.Filters.DynamicFilter`1.<>c__displayClass10_0.<<Send>g__SendAsync|0>d.MoveNext()
--- End of stack trace from prevIoUs location ---
at Masstransit.Pipeline.Filters.Deserializefilter.Send(ReceiveContext context,IPipe`1 next)
at GreenPipes.Filters.RescueFilter`2.GreenPipes.IFilter<TContext>.Send(TContext context,IPipe`1 next)
MT-Fault-ConsumerType: Masstransit.QuartzIntegration.ScheduleMessageConsumer
MT-Fault-MessageType: Masstransit.Scheduling.ScheduleMessage
ConfigureServices 是这样设置的:
.ConfigureServices((host,services) =>
{
services.Configure<OtherOptions>(host.Configuration);
services.Configure<QuartzOptions>(host.Configuration.GetSection("Quartz"));
services.AddSingleton<QuartzConfiguration>();
services.AddMasstransit(x =>
{
x.UsingRabbitMq((context,cfg) =>
{
var options = context.GetService<QuartzConfiguration>();
cfg.AddScheduling(s =>
{
s.SchedulerFactory = new StdSchedulerFactory(options.Configuration);
s.QueueName = options.Queue;
});
var vhost = host.Configuration.GetValue<string>("RabbitMQ:VirtualHost");
cfg.Host(string.Empty,vhost,h =>
{
h.Username( host.Configuration.GetValue<string>("RabbitMQ:User"));
h.Password(host.Configuration.GetValue<string>("RabbitMQ:Password"));
h.UseCluster(c =>
{
c.Node(host.Configuration.GetValue<string>("RabbitMQ:Node1"));
c.Node(host.Configuration.GetValue<string>("RabbitMQ:Node2"));
});
});
});
});
services.AddMasstransitHostedService();
});
这是石英配置的设置方式:
public NameValueCollection Configuration
{
get
{
var configuration = new NameValueCollection(13)
{
{"quartz.scheduler.instanceName",_options.Value.InstanceName},{"quartz.scheduler.instanceId","AUTO"},{"quartz.plugin.timeZoneConverter.type","Quartz.Plugin.TimeZoneConverter.TimeZoneConverterPlugin,Quartz.Plugins.TimeZoneConverter"},{"quartz.serializer.type","json"},{"quartz.threadPool.type","Quartz.Simpl.SimpleThreadPool,Quartz"},{"quartz.threadPool.threadCount",(_options.Value.ThreadCount ?? 10).ToString("F0")},{"quartz.jobStore.misfireThreshold","60000"},{"quartz.jobStore.type","Quartz.Impl.AdoJobStore.JobStoreTX,{"quartz.jobStore.driverDelegateType","Quartz.Impl.AdoJobStore.sqlServerDelegate,{"quartz.jobStore.tablePrefix",_options.Value.TablePrefix},{"quartz.jobStore.dataSource","default"},{"quartz.dataSource.default.provider",_options.Value.Provider},{"quartz.dataSource.default.connectionString",_options.Value.ConnectionString},{"quartz.jobStore.useProperties","true"}
};
foreach (var key in configuration.AllKeys)
{
_logger.Loginformation("{Key} = {Value}",key,configuration[key]);
}
return configuration;
}
}
public static void AddScheduling(this IBusFactoryConfigurator configurator,Action<InMemorySchedulerOptions> configure)
{
if (configurator == null)
throw new ArgumentNullException(nameof(configurator));
var options = new InMemorySchedulerOptions();
configure?.Invoke(options);
if (options.SchedulerFactory == null)
throw new ArgumentNullException(nameof(options.SchedulerFactory));
var observer = new SchedulerBusObserver(options);
configurator.ReceiveEndpoint(options.QueueName,e =>
{
var partitioner = configurator.CreatePartitioner(Environment.ProcessorCount);
e.Consumer(() => new ScheduleMessageConsumer(observer.Scheduler),x =>
x.Message<ScheduleMessage>(m => m.UsePartitioner(partitioner,p => p.Message.CorrelationId)));
e.Consumer(() => new CancelScheduledMessageConsumer(observer.Scheduler),x =>
x.Message<CancelScheduledMessage>(m => m.UsePartitioner(partitioner,p => p.Message.TokenId)));
configurator.UseMessageScheduler(e.InputAddress);
configurator.ConnectBusObserver(observer);
});
}
我尝试调试但找不到我收到的错误的原因,数据库连接正在成功创建,消费者上的消息调度也成功完成。 有没有办法在调度程序读取消息时进行实际调试,或者知道抛出对象引用错误的究竟是什么?
编辑 调试 Masstransit.Quartz 框架我注意到问题是在尝试反序列化 TranslateJsonBody 方法上的消息时。
var envelope = JObject.Parse(body);
envelope["destinationAddress"] = destination;
var message = envelope["message"];
var payload = message["payload"];
var payloadType = message["payloadType"];
如您所见,该方法使用驼峰式大小写,并且我们的代理使用 PascalCase 发送,由于没有处理程序,因此当该方法尝试获取 envelope["message"]
时它返回 null,因为我们将其作为“消息”发送”。反序列化时有没有强制camelCase的配置?
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。