如何解决关于实现服务总线会话块的建议
当前,我们有一个具有后台服务的dotnet核心应用,该服务从启用了会话的服务总线接收消息,其中sessionId
是userId
,并且消息包含用户信息的更新。现在,我们想实现一个功能,通过阻止特定的userId/sessionId
来临时暂停对特定用户的更新,但是在取消阻止时仍按顺序处理消息。解决这个问题的最佳方法是什么?
我试图浏览服务总线文档和示例。主要是message deferral,message session state和session state sample
我发现了有关SessionState
和消息延迟的一些信息,我想知道它们是否可用于实现此功能并仍然保证处理顺序(无论是否延迟消息,都是FIFO)。我正在考虑尝试将序列号存储在会话状态中,并继续通过该序列号接收延迟的消息,并递增该序列号以接收下一条消息,直到我用完消息为止。
当前,我们的代码如下所示:
this.queue.RegisterSessionHandler(
this.SessionHandler,new SessionHandlerOptions(this.ExceptionHandler)
{
AutoComplete = false,MessageWaitTimeout = TimeSpan.FromMinutes(1),});
其中this.SessionHandler
是处理消息然后通过调用session.CompleteAsync
和session.CloseAsync
来完成和关闭会话的函数。但是,我在构思如何将延迟逻辑添加到我们的代码时遇到了麻烦。因为当前,RegisterSessionHandler
已经处理了会话锁,并使用sessionId
对消息进行负载均衡(我认为),这很好。但是RegisterSessionHandler
也不允许您指定要处理的特定sessionId
。
说我有几条关于userId/sessionId: A
的消息。当我想取消对此用户的处理时,我不能简单地将延迟的消息插入回队列。由于发件人仍会不断向用户A发送消息到队列,这会弄乱订单。
我上面提到的会话状态示例有一个很好的示例,说明了如何使用会话状态和处理延迟的消息。但是,它仅使用一个sessionId
,而不使用RegisterSessionHandler
。我的问题是:如果要实现延迟消息处理逻辑(保留顺序),是否必须实现自己的RegisterSessionHandler
并处理sessionId
负载均衡?
谢谢!
解决方法
您应该在QueueClient中使用SessionClient
而不是RegisterSessionHandler
来更好地处理延迟方案并保留顺序。您可以在邮件正文中维护一些步骤/序列号。当您实际处理消息时,还要添加LastProcessedStep / Seqence。 Session state允许跟踪处理程序与会话相关的处理状态,从而使客户端可以在会话处理期间在处理节点之间灵活切换(包括故障转移)。 sample通过维护该延迟消息来处理(步骤)。它结合了Deferral和Session功能,以便使用会话状态工具来跟踪工作流的处理状态,在该状态下,各个步骤的输入都超出了预期的顺序。请注意,发送方代码还演示了通过以不可预测的顺序发送消息,但是凭借会话状态,接收方可以检测到顺序。
//
// Copyright © Microsoft Corporation,All Rights Reserved
//
// Licensed under the Apache License,Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// THIS CODE IS PROVIDED *AS IS* BASIS,WITHOUT WARRANTIES OR CONDITIONS
// OF ANY KIND,EITHER EXPRESS OR IMPLIED,INCLUDING WITHOUT LIMITATION
// ANY IMPLIED WARRANTIES OR CONDITIONS OF TITLE,FITNESS FOR A
// PARTICULAR PURPOSE,MERCHANTABILITY OR NON-INFRINGEMENT.
//
// See the Apache License,Version 2.0 for the specific language
// governing permissions and limitations under the License.
namespace SessionState
{
using Microsoft.Azure.ServiceBus;
using Microsoft.Azure.ServiceBus.Core;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
public class Program : MessagingSamples.Sample
{
public async Task Run(string connectionString)
{
Console.WriteLine("Press any key to exit the scenario");
var sendTask = this.SendMessagesAsync(Guid.NewGuid().ToString(),connectionString,SessionQueueName);
var sendTask2 = this.SendMessagesAsync(Guid.NewGuid().ToString(),SessionQueueName);
var receiveTask = this.ReceiveMessagesAsync(connectionString,SessionQueueName);
await Task.WhenAll(sendTask,sendTask2,receiveTask);
}
async Task SendMessagesAsync(string session,string connectionString,string queueName)
{
var sender = new MessageSender(connectionString,queueName);
Console.WriteLine("Sending messages to Queue...");
ProcessingState[] data = new[]
{
new ProcessingState {Step = 1,Title = "Buy"},new ProcessingState {Step = 2,Title = "Unpack"},new ProcessingState {Step = 3,Title = "Prepare"},new ProcessingState {Step = 4,Title = "Cook"},new ProcessingState {Step = 5,Title = "Eat"},};
var rnd = new Random();
var tasks = new List<Task>();
for (int i = 0; i < data.Length; i++)
{
var message = new Message(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(data[i])))
{
SessionId = session,ContentType = "application/json",Label = "RecipeStep",MessageId = i.ToString(),TimeToLive = TimeSpan.FromMinutes(2)
};
tasks.Add(Task.Delay(rnd.Next(30)).ContinueWith(
async (t) =>
{
await sender.SendAsync(message);
lock (Console.Out)
{
Console.ForegroundColor = ConsoleColor.Yellow;
Console.WriteLine("Message sent: Id = {0}",message.MessageId);
Console.ResetColor();
}
}));
}
await Task.WhenAll(tasks);
}
async Task ReceiveMessagesAsync(string connectionString,string queueName)
{
var client = new SessionClient(connectionString,queueName,ReceiveMode.PeekLock);
while (true)
{
var session = await client.AcceptMessageSessionAsync();
await Task.Run(
async () =>
{
ProcessingState processingState;
var stateData = await session.GetStateAsync();
if (stateData != null)
{
processingState = JsonConvert.DeserializeObject<ProcessingState>(Encoding.UTF8.GetString(stateData));
}
else
{
processingState = new ProcessingState
{
LastProcessedRecipeStep = 0,DeferredSteps = new Dictionary<int,long>()
};
}
while (true)
{
try
{
//receive messages from Queue
var message = await session.ReceiveAsync(TimeSpan.FromSeconds(5));
if (message != null)
{
if (message.Label != null &&
message.ContentType != null &&
message.Label.Equals("RecipeStep",StringComparison.InvariantCultureIgnoreCase) &&
message.ContentType.Equals("application/json",StringComparison.InvariantCultureIgnoreCase))
{
var body = message.Body;
ProcessingState recipeStep = JsonConvert.DeserializeObject<ProcessingState>(Encoding.UTF8.GetString(body));
if (recipeStep.Step == processingState.LastProcessedRecipeStep + 1)
{
lock (Console.Out)
{
Console.ForegroundColor = ConsoleColor.Cyan;
Console.WriteLine(
"\t\t\t\tMessage received: \n\t\t\t\t\t\tMessageId = {0},\n\t\t\t\t\t\tSequenceNumber = {1},\n\t\t\t\t\t\tEnqueuedTimeUtc = {2}," +
"\n\t\t\t\t\t\tExpiresAtUtc = {5},\n\t\t\t\t\t\tContentType = \"{3}\",\n\t\t\t\t\t\tSize = {4},\n\t\t\t\t\t\tContent: [ step = {6},title = {7} ]",message.MessageId,message.SystemProperties.SequenceNumber,message.SystemProperties.EnqueuedTimeUtc,message.ContentType,message.Size,message.ExpiresAtUtc,recipeStep.Step,recipeStep.Title);
Console.ResetColor();
}
await session.CompleteAsync(message.SystemProperties.LockToken);
processingState.LastProcessedRecipeStep = recipeStep.Step;
await
session.SetStateAsync(
Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(processingState)));
}
else
{
// in your case,if customer update is blocked,you can defer
processingState.DeferredSteps.Add((int)recipeStep.Step,(long)message.SystemProperties.SequenceNumber);
await session.DeferAsync(message.SystemProperties.LockToken);
await session.SetStateAsync(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(processingState)));
}
}
else
{
await session.DeadLetterAsync(message.SystemProperties.LockToken);//,"ProcessingError","Don't know what to do with this message");
}
}
else
{
while (processingState.DeferredSteps.Count > 0)
{
long step;
if (processingState.DeferredSteps.TryGetValue(processingState.LastProcessedRecipeStep + 1,out step))
{
var deferredMessage = await session.ReceiveDeferredMessageAsync(step);
var body = deferredMessage.Body;
ProcessingState recipeStep = JsonConvert.DeserializeObject<ProcessingState>(Encoding.UTF8.GetString(body));
lock (Console.Out)
{
Console.ForegroundColor = ConsoleColor.Cyan;
Console.WriteLine(
"\t\t\t\tdeferredMessage received: \n\t\t\t\t\t\tMessageId = {0},deferredMessage.MessageId,deferredMessage.SystemProperties.SequenceNumber,deferredMessage.SystemProperties.EnqueuedTimeUtc,deferredMessage.ContentType,deferredMessage.Size,deferredMessage.ExpiresAtUtc,recipeStep.Title);
Console.ResetColor();
}
await session.CompleteAsync(deferredMessage.SystemProperties.LockToken);
processingState.LastProcessedRecipeStep = processingState.LastProcessedRecipeStep + 1;
processingState.DeferredSteps.Remove(processingState.LastProcessedRecipeStep);
await session.SetStateAsync(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(processingState)));
}
}
break;
}
}
catch (ServiceBusException e)
{
if (!e.IsTransient)
{
Console.WriteLine(e.Message);
throw;
}
}
}
await session.CloseAsync();
});
}
}
public static int Main(string[] args)
{
try
{
var app = new Program();
app.RunSample(args,app.Run);
}
catch (Exception e)
{
Console.WriteLine(e.ToString());
return 1;
}
return 0;
}
class ProcessingState
{
[JsonProperty]
public int LastProcessedRecipeStep { get; set; }
[JsonProperty]
public Dictionary<int,long> DeferredSteps { get; set; }
[JsonProperty]
public int Step { get; internal set; }
[JsonProperty]
public string Title { get; internal set; }
}
}
}
您也可以遵循Ordering Messages in Azure Service Bus,它对概念进行了很好的解释。但是那里提供的示例与上面的稍有不同。
警告:使用消息会话也意味着一个会话(对于您的情况是用户ID),该会话中的消息将始终由单个接收者接收和处理。因此,在设置Session和SessionId时要谨记。如果创建一个非常大的会话,这将迫使Azure Service Bus将大多数消息发送到一个订阅服务器,从而降低了多线程的好处。如果将会话设置得过于精细,那么它将失去其预期的好处,而您只是在增加不必要的开销。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。