微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如何使用循环负载平衡访问 GRPC 服务器以在 AKS 中运行长期进程?

如何解决如何使用循环负载平衡访问 GRPC 服务器以在 AKS 中运行长期进程?

我的集群设置:

  1. 主 GRPC 服务器 Pod 1 实例
  2. 算法 GRPC 服务器 Pod 10 实例

计划:

将使用 BloomrPC 客户端向主 GRPC 服务器发送请求,主服务器将向算法服务器发送大约 100 个请求。

算法中的每个请求将运行至少 6 分钟,并且有 100 个请求。由于只有 10 个算法 GRPC 服务器实例可用,我尝试使用基本负载平衡解释here

问题:

我能够将请求发送到不同的 pod,每次使用上述方法,但它没有像我预期的那样一次发送 10 个请求(10 个算法服务实例)。因此,在主算法中,我不是仅仅发送请求并等待结果,而是在新任务中运行它们中的每一个并等待所有。

 public override async Task<ResultStatus> compute(ComputealgoRequest request,ServerCallContext context)
    {
        ResultStatus resultStatus = new ResultStatus() { IsSuccess = true,ErrorMsg = "Success" };

        try
        {
            var channelOptions = new List<ChannelOption>();

            var lbPolicyName = Environment.GetEnvironmentvariable("LB_POLICY_NAME");
            if (!string.IsNullOrEmpty(lbPolicyName))
            {
                channelOptions.Add(new ChannelOption("grpc.lb_policy_name",lbPolicyName));
            }

            var parametersModel = Load_InputParameters(request); // Some parameters from Json
            var algoParameters = parametersModel.Parameters.ToDictionary(p => p.Name,p => p.Value);
            var chunks = await GetChunks(parametersModel); // Creating chunks
            var recordChunks = chunks.Select(x => x.Item1);
            _logger.Loginformation($"{DateTime.UtcNow}: AlgorithmWrapper Total number of chunks {recordChunks.Count()}");
            var algoGrpcAddress = GetAlgorithmAdress(parametersModel.Algorithm);

            var ip = Dns.GetHostEntry(algoGrpcAddress.Split(':').First()).AddressList.First(addr => addr.AddressFamily == System.Net.sockets.AddressFamily.InterNetwork);
           
            if (string.IsNullOrEmpty(algoGrpcAddress))
            {
                resultStatus.IsSuccess = false;
                resultStatus.ErrorMsg = "Algorithm Grpc Wrapper not implemented";
                return resultStatus;
            }


            Channel channel = new Channel($"{algoGrpcAddress}",ChannelCredentials.Insecure,channelOptions);
            var client = new AlgoServiceProvider.AlgoproviderClient(channel);
            var algoRequest = CreatealgoRequest(parametersModel);
            int i = 0;
            var computeTasks = new List<Task>();
            foreach (var recordChunk in recordChunks)
            {
                algoRequest.DataInfo.DepthStart = recordChunk.DepthStart;
                algoRequest.DataInfo.DepthEnd = recordChunk.DepthEnd;
                Console.WriteLine($"Calling pod ip address {GetIpAddressportInfo(algoGrpcAddress)}");
                var computeTask = ComputeTask(i++,client,algoRequest);
                computeTasks.Add(computeTask);
                
            }

            await Task.WhenAll(computeTasks.ToArray()).ContinueWith(x =>
            {
                if (x.IsFaulted)
                {
                    resultStatus.IsSuccess = false;
                    resultStatus.ErrorMsg = x.Exception.Message;
                    _logger.Log(LogLevel.Error,x.Exception.StackTrace);
                }
            }).ConfigureAwait(false);
        }
        catch(Exception ex)
        {
            resultStatus.IsSuccess = false;
            resultStatus.ErrorMsg = ex.Message;
            _logger.Log(LogLevel.Error,ex.StackTrace);
        }
        _logger.Loginformation($"{DateTime.UtcNow}: AlgorithmWrapper Compute Completed and result status is  {resultStatus.IsSuccess}");
        return resultStatus;
    }

    private Task ComputeTask(int chunkId,AlgoServiceProviderClient client,AlgoRequest algoRequest)
    {
        return Task.Factory.StartNew(() => client.Compute(algoRequest),CancellationToken.None,TaskCreationoptions.AttachedToParent,TaskScheduler.Default).ContinueWith(x => Console.WriteLine($"{DateTime.UtcNow}:Compute completed for chunks  {chunkId}"));
    }

现在,它似乎一次发送多个请求,但循环负载均衡无效,导致请求多次发送到同一个 Pod 并出现异常,如

    I0629 04:40:12.081082 0 ..\..\..\src\core\ext\filters\client_channel\subchannel.cc:1012: Connect Failed: {"created":"@1624941612.081000000","description":"OS Error","file":"..\..\..\src\core\lib\iomgr\tcp_client_windows.cc","file_line":106,"os_error":"No connection Could be made because the target machine actively refused it.\r\n","syscall":"ConnectEx","wsa_error":10061}
I0629 04:40:12.081082 0 ..\..\..\src\core\ext\filters\client_channel\subchannel.cc:955: Subchannel 000001349FD498C0: Retry immediately
I0629 04:40:12.081082 0 ..\..\..\src\core\ext\filters\client_channel\subchannel.cc:980: Failed to connect to channel,retrying
I0629 04:40:13.082496 0 ..\..\..\src\core\ext\filters\client_channel\subchannel.cc:1012: Connect Failed: {"created":"@1624941613.082000000","wsa_error":10061}
I0629 04:40:13.082496 0 ..\..\..\src\core\ext\filters\client_channel\subchannel.cc:957: Subchannel 000001349FD498C0: Retry in 735 milliseconds
I0629 04:40:13.898556 1325528893600 ..\..\..\src\core\ext\filters\client_channel\subchannel.cc:980: Failed to connect to channel,retrying
I0629 04:40:15.090645 0 ..\..\..\src\core\ext\filters\client_channel\subchannel.cc:1012: Connect Failed: {"created":"@1624941615.091000000","wsa_error":10061}
I0629 04:40:15.090645 0 ..\..\..\src\core\ext\filters\client_channel\subchannel.cc:957: Subchannel 000001349FD498C0: Retry in 1478 milliseconds
I0629 04:40:16.595757 1325521381600 ..\..\..\src\core\ext\filters\client_channel\subchannel.cc:980: Failed to connect to channel,retrying
I0629 04:40:17.690832 0 ..\..\..\src\core\ext\filters\client_channel\subchannel.cc:1012: Connect Failed: {"created":"@1624941617.691000000","wsa_error":10061}

有谁知道的

  1. 当长时间运行的进程正在运行时,使用 LoadBalancing 正确分割负载的正确方法是什么?
  2. 我使用的基本负载平衡足以满足我的要求,还是我需要使用其他选项?
  3. 是否可以同时向在 AKS 中运行的 GRPC 服务器发送多个请求?

TIA

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。