微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如何使用System.IO.Pipelines包创建响应的TCP侦听器?

如何解决如何使用System.IO.Pipelines包创建响应的TCP侦听器?

我想使用Kestrel和system.io.pipelines包创建一个TCP侦听器。我收到的消息将始终为HL7 messages。消息示例可能是

MSH | ^〜&| MegaReg | XYZHospC | SuperOE | XYZImgCtr | 20060529090131-0500 || ADT ^ A01 ^ ADT_A01 | 01052901 | P | 2.5 EVN || 200605290901 |||| 200605290900 PID |||| 56782445 ^^^ UAReg ^ PI || KLEINSAMPLE ^ BARRY ^ Q ^ JR || 19620910 | M || 2028-9 ^^ HL70005 ^ RA99113 ^^ XYZ | 260 GOODWIN CREST DRIVE ^^ BIRMINGHAM ^ AL ^ 35209 ^^ M〜NICKELL'S PICKLES ^ 10000 W 100TH AVE ^ BIRMINGHAM ^ AL ^ 35200 ^^ O ||||||| 0105I30001 ^^^ 99DEF ^ AN PV1 || I | W ^ 389 ^ 1 ^ UABH ^^^^^ 3 |||| 12345 ^ MORGAN ^ REX ^ J ^^^ MD ^ 0010 ^ UAMC ^ L || 67890 ^ GRAINGER ^ LUCY ^ X ^^^ MD ^ 0010 ^ UAMC ^ L | MED ||||| A0 ||| 13579 ^ PottER ^ SHERMAN ^ T ^^^ MD ^ 0010 ^ UAMC ^ L |||||||||||||||||| ||||||||||||| 200605290900 OBX | 1 | NM | ^主体高度|| 1.80 | m ^米^ ISO + ||||| F OBX | 2 | NM | ^主体 重量|| 79 | kg ^千克^ ISO + |||| FAL1 | 1 || ^ ASPIRIN DG1 | 1 || 786.50 ^ CHEST PAIN,未审查^ I9 ||| A

要注意的唯一重要一点是,每条传入的HL7消息都以竖线制表符开头,因此您知道消息的开始位置。每个HL7消息都包含多个段,所以我认为我将不得不遍历每个段。处理完请求后,我想发送回HL7消息作为响应。首先,我想到了这个

internal class HL7Listener : ConnectionHandler
{
    public override async Task OnConnectedAsync(ConnectionContext connection)
    {
        IDuplexPipe pipe = connection.Transport;

        await FillPipe(pipe.Output);
        await ReadPipe(pipe.Input);
    }

    private async Task FillPipe(PipeWriter pipeWriter)
    {
        const int minimumBufferSize = 512;

        while (true)
        {
            Memory<byte> memory = pipeWriter.GetMemory(minimumBufferSize);
            
            try
            {
                int bytesRead = 32; // not sure what to do here
                
                if (bytesRead == 0)
                {
                    break;
                }
                
                pipeWriter.Advance(bytesRead);
            }
            catch (Exception ex)
            {
                // ... something Failed ...

                break;
            }

            FlushResult result = await pipeWriter.FlushAsync();

            if (result.IsCompleted)
            {
                break;
            }
        }

        pipeWriter.Complete();
    }

    private async Task ReadPipe(PipeReader pipeReader)
    {
        while (true)
        {
            ReadResult result = await pipeReader.ReadAsync();

            ReadOnlySequence<byte> buffer = result.Buffer;
            SequencePosition? position;

            do
            {
                position = buffer.PositionOf((byte)'\v');

                if (position != null)
                {
                    ReadOnlySequence<byte> line = buffer.Slice(0,position.Value);

                    // ... Process the line ...

                    buffer = buffer.Slice(buffer.GetPosition(1,position.Value));
                }
            }
            while (position != null);

            pipeReader.Advanceto(buffer.Start,buffer.End);

            if (result.IsCompleted)
            {
                break;
            }
        }

        pipeReader.Complete();
    }
}

不幸的是,我在一些事情上苦苦挣扎:

  • int bytesRead = 32;部分,我如何知道已读取多少字节?或者如何使用writer实例进行读取?
  • 当前,调试器未按// ... Process the line ...处的代码。基本上,我必须提取整个HL7消息,以便可以使用HL7解析器转换消息字符串。
  • 我该在哪里回应?致电await ReadPipe(pipe.Input);之后?通过使用await connection.Transport.Output.WriteAsync(/* the HL7 message to send back */);

解决方法

您是否看过大卫·福勒(David Fowler)的TcpEcho示例?我想说这是相当规范的,因为他是发布devblogs System.IO.Pipelines公告的人。

他的示例处理原始套接字。我已经将其调整为适用于ConnectionHandler API和HL7消息(但是,我对HL7知之甚少):

internal class HL7Listener : ConnectionHandler
{
    public override async Task OnConnectedAsync(ConnectionContext connection)
    {
        while (true)
        {
            var result = await connection.Transport.Input.ReadAsync();
            var buffer = result.Buffer;

            while (TryReadMessage(ref buffer,out ReadOnlySequence<byte> hl7Message))
            {
                // Process the line.
                var response = ProcessMessage(hl7Message);
                await connection.Transport.Output.WriteAsync(response);
            }

            if (result.IsCompleted)
            {
                break;
            }

            connection.Transport.Input.AdvanceTo(buffer.Start,buffer.End);
        }
    }

    public static bool TryReadMessage(ref ReadOnlySequence<byte> buffer,out ReadOnlySequence<byte> hl7Message)
    {
        var endOfMessage = buffer.PositionOf((byte)0x1C);

        if (endOfMessage == null || !TryMatchNextByte(ref buffer,endOfMessage.Value,0x0D,out var lastBytePosition))
        {
            hl7Message = default;
            return false;
        }

        var messageBounds = buffer.GetPosition(1,lastBytePosition.Value); // Slice() is exclusive on the upper bound
        hl7Message = buffer.Slice(0,messageBounds);
        buffer = buffer.Slice(messageBounds); // remove message from buffer
        return true;
    }

    /// <summary>
    /// Does the next byte after currentPosition match the provided value?
    /// </summary>
    private static bool TryMatchNextByte(ref ReadOnlySequence<byte> buffer,SequencePosition currentPosition,byte value,out SequencePosition? nextPosition)
    {
        nextPosition = buffer.Slice(currentPosition).PositionOf(value);
        if(nextPosition == null || !nextPosition.Value.Equals(buffer.GetPosition(1,currentPosition)))
        {
            nextPosition = null;
            return false;
        }
        return true;
    }

    private ReadOnlyMemory<byte> ProcessMessage(ReadOnlySequence<byte> hl7Message)
    {
        var incomingMessage = Encoding.UTF8.GetString(hl7Message.ToArray());
        // do something with the message and generate your response. I'm using UTF8 here
        // but not sure if that's valid for HL7.
        return Encoding.UTF8.GetBytes("Response message: OK!");
    }
}

更新:添加了有关HL7消息结构的最新信息。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。