微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如何在erlang gen_server 中有效地使用receive 子句来解决超时错误?

如何解决如何在erlang gen_server 中有效地使用receive 子句来解决超时错误?

有时我的循环返回 ok 因为超时如何以正确的方式编写此代码。当超时时,它只返回 ok 但不是我假设的实际值。在句柄调用中,我在 loop() 函数调用一个函数 loop() 我正在接收带有接收子句的消息。现在我正在使用 loop2 函数将此数据发送到我的数据库,无论数据是否已成功保存,都会从数据库返回响应并将响应返回给 loop()。但是如果超时,我的循环函数返回正常但不是实际值。

% @Author: ZEESHAN AHMAD
% @Date:   2020-12-22 05:06:12
% @Last Modified by:   ZEESHAN AHMAD
% @Last Modified time: 2021-01-10 04:42:59


-module(getAccDataCons).

-behavIoUr(gen_server).

-include_lib("deps/amqp_client/include/amqp_client.hrl").

-export([start_link/0,stop/0]).
-export([init/1,handle_call/3,handle_cast/2,handle_info/2,code_change/3,terminate/2]).
-export([get_account/0]).

start_link() ->
    gen_server:start_link({local,?MODULE},?MODULE,[],[]).

stop() ->
    gen_server:cast(?MODULE,stop).

get_account() ->
    gen_server:call(?MODULE,{get_account}).

init(_Args) ->
    {ok,Connection} = amqp_connection:start(#amqp_params_network{host = "localhost"}),{ok,Channel} = amqp_connection:open_channel(Connection),Channel}.

handle_call({get_account},_From,State) ->
    amqp_channel:call(State,#'exchange.declare'{exchange = <<"get">>,type = <<"topic">>}),amqp_channel:call(State,#'queue.declare'{queue = <<"get_account">>}),Binding =
        #'queue.bind'{exchange = <<"get">>,routing_key = <<"get.account">>,queue = <<"get_account">>},#'queue.bind_ok'{} = amqp_channel:call(State,Binding),io:format(" [*] Waiting for logs. To exit press CTRL+C~n"),#'basic.consume'{queue = <<"get_account">>,no_ack = true}),Returned =loop(),io:format("~nReti=~p",[Returned]),{reply,Returned,State};
    

handle_call(Message,State) ->
    io:format("received other handle_call message: ~p~n",[Message]),ok,State}.

handle_cast(stop,State) ->
    {stop,normal,State};
handle_cast(Message,State) ->
    io:format("received other handle_cast call : ~p~n",{noreply,State}.

handle_info(Message,State) ->
    io:format("received handle_info message : ~p~n",State}.

code_change(_OldVer,State,_Extra) ->
    {ok,State}.

terminate(Reason,_State) ->
    io:format("server is terminating with reason :~p~n",[Reason]).


    loop()->
        receive
         #'basic.consume_ok'{} -> ok
        end,receive
            {#'basic.deliver'{},Msg} ->
                #amqp_msg{payload = Payload} = Msg,Value=loop2(Payload),Value
    after 2000->
    io:format("Server timeout")
    end.


  loop2(Payload)->
            Result = jiffy:decode(Payload),{[{<<"account_id">>,AccountId}]} = Result,Doc = {[{<<"account_id">>,AccountId}]},getAccDataDb:create_AccountId_view(),Returned=case getAccDataDb:getAccountNameDetails(Doc) of
                success ->
                    Respo = getAccDataDb:getAccountNameDetails1(Doc),Respo;
                details_not_matched ->
                    user_not_exist
            end,Returned.

解决方法

如果没有 looploop2 代码,很难给出答案,如果这两个函数之一检测到超时,则必须首先更改它们的行为以避免任何超时,或将其增加到有效的值。如果需要超时,则确保返回值是明确的,它发生,例如 {error,RequestRef,timeout} 而不是 ok

尽管如此,gen_server 不应等待太长时间才能得到答案,您可以修改代码:

您可以使用:

,而不是在客户端进程中使用 gen_server:call(ServerRef,Request)
RequestId = send_request(ServerRef,Request),Result = wait_response(RequestId,Timeout),

并删除 loop 和/或 loop2 中的超时。这样做您可以在客户端控制超时,您甚至可以将其设置为无穷大(这不是一个好主意!)。

或者你可以把你的功能分成两部分

gen_server:cast(ServerRef,{Request,RequestRef}),% this will not wait for any answer,RequestRef is a tag to identify later 
% if the request was fulfilled,you can use make_ref() to generate it

及以后,或在另一个客户端进程中(这至少需要将 RequestRef 传递给此进程)检查请求的结果:

Answer = gen_server:call(ServerRef,{get_answer,case Answer of
    no_reply -> ... % no answer yet
    {ok,Reply} -> ... % handle the answer
end,

最后,您必须修改循环代码以处理 RequestRef,将带有结果和 gen_server:cast 的消息(再次使用 RequestRef)发送回服务器,并存储此结果处于服务器状态。

我不认为第二个解决方案有价值,因为它与第一个解决方案或多或少相同,但都是手工制作的,它让您可以管理许多错误情况(例如客户端死亡),这些情况可能会导致一种内存泄漏。

,

这对编辑来说太长了,我把它放在一个新的答案中。

发生超时时您收到 ok 的原因在于 loop() 代码。在第二个接收块中,2000 毫秒后,您返回 紧跟在 io:format/1 语句之后。

io:format 返回 ok,它是您在 Returned 变量中得到的。您应该使用

更改此代码
loop()->
    ok = receive
        #'basic.consume_ok'{} -> ok
    end,receive
        {#'basic.deliver'{},#amqp_msg{payload = Payload}} -> {ok,loop2(Payload)}
    after 2000 ->
        io:format("Server timeout"),{error,timeout}
    end.

使用此代码,您的客户将收到 {ok,Value}{error,timeout},并能够做出相应的反应。

但是这个版本仍然存在问题: - 2 秒超时可能太短,您缺少有效答案 - 由于您在接收块中使用模式匹配并且不检查每个 amqp_channel:call 的结果,因此可能会发生许多不同的问题并显示为超时

首先让我们看看超时。有可能对 amqp_channel 的 4 次调用总共需要 2 秒以上才能成功完成。简单的解决方案是增加超时时间,将 after 2000 更改为 after 3000 或更多。 但是你会遇到两个问题:

  • 您的 gen_server 在这段时间内一直被阻塞,如果它不是专用于单个客户端,它将无法用于 在等待答复期间处理任何其他请求。
  • 如果您需要将超时增加到 5 秒以上,您将遇到另一个超时,由 gen_server 内部管理:必须在 5 秒内响应请求。

gen_server 提供了一些接口函数来解决这类问题:'send_request'、'wait_response' 和reply。这是一个基本的 gen_server 可以处理 3 种请求:

  • stop ... 停止服务器,有助于更新代码。
  • {blocking,Time,Value} 服务器将在 Time ms 结束期间休眠,然后返回 Value。这模拟了您的情况,您可以调整方式 得到答案需要很长时间。
  • {non_blocking,Value} 服务器会将作业委托给另一个进程并立即返回而无需回答(因此 它可用于另一个请求)。新进程将在 Time ms 结束期间休眠,然后使用 gen_server:reply 返回值。

服务器模块实现了几个用户界面:

  • 标准的 start(),stop()
  • blocking(Time,Value) 使用 gen_server:call 以请求 {blocking,Value} 调用服务器
  • blocking_catch(Time,Value) 与上一个相同,但捕获 gen_server:call 的结果以显示隐藏的超时
  • non_blocking(Time,Value,Wait) 使用 gen_server:send_request 以请求 {non_blocking,Value} 调用服务器并等待最大等待毫秒的答案

最后包含2个测试功能

  • test([Type,OptionalWait]) 它产生一个进程,该进程将发送一个带有相应参数的类型请求。应答被发送回调用进程。答案可以通过 shell 中的 flush() 来检索。
  • parallel_test ([Type,NbRequests,OptionalWait]) 它使用相应的参数调用 NbRequests 次测试。它收集了所有 答案并使用本地函数 collect(NbRequests,Timeout) 打印出来。

下面的代码

-module (server_test).

-behaviour(gen_server).

%% API
-export([start/0,stop/0,blocking/2,blocking_catch/2,non_blocking/3,test/1,parallel_test/1]).


%% gen_server callbacks
-export([init/1,handle_call/3,handle_cast/2,handle_info/2,terminate/2,code_change/3]).

-define(SERVER,?MODULE). 

%%%===================================================================
%%% API
%%%===================================================================
start() ->
    gen_server:start_link({local,?SERVER},?MODULE,[],[]).

stop() ->
    gen_server:cast(?SERVER,stop).

blocking(Time,Value) ->
    gen_server:call(?SERVER,{blocking,Value}).

blocking_catch(Time,Value) ->
    catch {ok,gen_server:call(?SERVER,Value})}.

non_blocking(Time,Wait) ->
    ReqId = gen_server:send_request(?SERVER,{non_blocking,Value}),gen_server:wait_response(ReqId,Wait).

test([Type,Value]) -> test([Type,5000]);
test([Type,Wait]) ->
    Start = erlang:monotonic_time(),From = self(),F = fun() -> 
        R = case Type of 
            non_blocking -> ?MODULE:Type(Time,Wait);
            _ -> ?MODULE:Type(Time,Value)
        end,From ! {request,Type,got_answer,R,after_microsec,erlang:monotonic_time() - Start} 
    end,spawn(F).

parallel_test([Type,NbRequests]) -> parallel_test([Type,5000]);
parallel_test([Type,Wait]) ->
    case Type of
        non_blocking -> [server_test:test([Type,X,Wait]) || X <- lists:seq(1,NbRequests)];
        _ -> [server_test:test([Type,X]) || X <- lists:seq(1,NbRequests)]
    end,collect_answers(NbRequests,Time + 1000).


%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
init([]) ->
    {ok,#{}}.

handle_call({blocking,Value},_From,State) ->
    timer:sleep(Time),Reply = {ok,{reply,Reply,State};
handle_call({non_blocking,From,State) ->
    F = fun() ->
        do_answer(From,Value)
    end,spawn(F),{noreply,State};
handle_call(_Request,State) ->
    Reply = ok,State}.

handle_cast(stop,State) ->
    {stop,stopped,State};
handle_cast(_Msg,State) ->
    {noreply,State}.

handle_info(_Info,State}.

terminate(_Reason,_State) ->
    ok.

code_change(OldVsn,State,_Extra) ->
    io:format("changing code replacing version ~p~n",[OldVsn]),{ok,State}.

%%%===================================================================
%%% Internal functions
%%%===================================================================

do_answer(From,Value) ->
    timer:sleep(Time),gen_server:reply(From,Value).

collect_answers(0,_Timeout) ->
    got_all_answers;
collect_answers(NbRequests,Timeout) ->
    receive 
        A -> io:format("~p~n",[A]),collect_answers(NbRequests - 1,Timeout)
    after Timeout ->
        missing_answers
    end.

shell 中的会话:

44> c(server_test).                                    
{ok,server_test}
45> server_test:start().                               
{ok,<0.338.0>}
46> server_test:parallel_test([blocking,200,3]).
{request,blocking,1,1},207872}
{request,2,2},415743}
{request,3,3},623615}
got_all_answers
47> % 3 blocking requests in parallel,each lasting 200ms,they are executed in sequence but no timemout is reached
47> % All the clients get their answers
47> server_test:parallel_test([blocking,2000,3]).                                                                                                       
{request,2063358}
{request,4127740}
missing_answers
48> % 3 blocking requests in parallel,each lasting 2000ms,they are executed in sequence and the last answer exceeds the gen_server timeout.       
48> % The client for this request don't receive answer. The client should also manage its own timeout to handle this case
48> server_test:parallel_test([blocking_catch,3]).                                                                                             
{request,blocking_catch,4127740}
{request,{'EXIT',{timeout,{gen_server,call,[server_test,3}]}}},5135355}
got_all_answers
49> % same thing but catching the exception. After 5 seconds the gen_server call throws a timeout exception.
49> % The information can be forwarded to the client
49> server_test:parallel_test([non_blocking,3]).                                                       
{request,non_blocking,207872}
got_all_answers
50> % using non blocking mechanism,we can see that all the requests were managed in parallel 
50> server_test:parallel_test([non_blocking,5100,3]).                                        
{request,timeout,5136379}
{request,5136379}
got_all_answers
51> % if we increase the answer delay above 5000ms,all requests fail in default timeout
51> server_test:parallel_test([non_blocking,6000]).                              
{request,5231611}
{request,5231611}
got_all_answers
52> % but thanks to the send_request/wait_response/reply interfaces,the client can adjust the timeout to an accurate value
52> % for each request

请求无法完成的下一个原因是 amqp_channel:call 失败。根据你想要做什么,有几个 无所作为的可能性,让崩溃,捕获异常或管理所有情况。下一个提案使用全局捕获

handle_call({get_account,Timeout},State) ->
    F = fun() ->
        do_get_account(From,Timeout)
    end,% delegate the job to another process and free the server
    {noreply,State}; % I don't see any change of State in your code,this should be enough

...

do_get_account(From,Timeout) ->
    % this block of code asserts all positive return values from amqp_channel calls. it will catch any error
    % and return it as {error,...}. If everything goes well it return {ok,Answer}
    Reply = try
        ok = amqp_channel:call(State,#'exchange.declare'{exchange = <<"get">>,type = <<"topic">>}),ok = amqp_channel:call(State,#'queue.declare'{queue = <<"get_account">>}),Binding = #'queue.bind'{exchange = <<"get">>,routing_key = <<"get.account">>,queue = <<"get_account">>},#'queue.bind_ok'{} = amqp_channel:call(State,Binding),#'basic.consume'{queue = <<"get_account">>,no_ack = true}),wait_account_reply(Timeout)}
    catch
        Class:Exception -> {error,Class,Exception}
    end,Reply).

wait_account_reply(Timeout) ->
    receive
    % #'basic.consume_ok'{} -> ok % you do not handle this message,ignore it since it will be garbaged when the process die
        {#'basic.deliver'{},#amqp_msg{payload = Payload}} -> extract_account(Payload)
    after Timeout->
       server_timeout
    end.


extract_account(Payload)->
        {[{<<"account_id">>,AccountId}]} = jiffy:decode(Payload),Doc = {[{<<"account_id">>,AccountId}]},getAccDataDb:create_AccountId_view(),% What is the effect of this function,what is the return value?
        case getAccDataDb:getAccountNameDetails(Doc) of
            success ->
                getAccDataDb:getAccountNameDetails1(Doc);
            details_not_matched ->
                user_not_exist
        end.

客户端应该看起来像:

get_account() ->
    ReqId = gen_server:send_request(server_name,{get_account,2000}),2200).

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。