c# – Rx.Net消息解析器

我正在尝试解析表示消息的传入字节流.
我需要拆分流并为每个部分创建一个消息结构.

消息始终以0x81(BOM)开头,以0x82(EOM)结束.

start:  0x81
header: 3 bytes
data:   arbitrary length
stop:   0x82

数据部分使用转义字节0x1B(ESC)进行转义:只要数据部分中的一个字节包含控制字节{ESC,BOM,EOM}之一,它就会以ESC为前缀.

标题部分未转义,可能包含控制字节.

我想使用Rx.Net以功能性的反应式编码,通过使用IObservable< byte>并将其转换为IObservable< Message>.

最常用的方法是什么?

一些例子:

[81 01 02 03 82] single message
[81 82 81 82 82] single message,header = [82 81 82]
[81 01 02 1B 82] single message,header = [01 02 1B].
[81 01 02 03 1B 82 82] single message,header = [01 02 03],(unescaped) data = [82]
[81 01 02 03 1B 1B 82 82] single message + dangling [82] which should be ignored.
                          header = [01 02 03],(unescaped) data = [1B]

这是一个状态机绘图:

解决方法

如果你正在寻找“功能更强大”的东西,那么这可能会有所帮助,但@Evk的答案也会通过这些测试.

首先我可以建议,为了提供一个可验证的答案,你可以提供一个测试套件来实现这样的复杂问题.

像这样的东西会非常有帮助.

var scheduler = new TestScheduler();
var source = scheduler.CreateColdobservable<byte>(
    ReactiveTest.OnNext<byte>(01,0x81),//BOM m1
    ReactiveTest.OnNext<byte>(02,0x01),ReactiveTest.OnNext<byte>(03,0x02),ReactiveTest.OnNext<byte>(04,0x03),ReactiveTest.OnNext<byte>(05,0x82),//EOM m1
    ReactiveTest.OnNext<byte>(06,//BOM m2
    ReactiveTest.OnNext<byte>(07,ReactiveTest.OnNext<byte>(08,ReactiveTest.OnNext<byte>(09,ReactiveTest.OnNext<byte>(10,//EOM m2
    ReactiveTest.OnNext<byte>(11,//BOM m3
    ReactiveTest.OnNext<byte>(12,ReactiveTest.OnNext<byte>(13,ReactiveTest.OnNext<byte>(14,0x1B),ReactiveTest.OnNext<byte>(15,//EOM m3
    ReactiveTest.OnNext<byte>(16,//BOM m4
    ReactiveTest.OnNext<byte>(17,ReactiveTest.OnNext<byte>(18,ReactiveTest.OnNext<byte>(19,ReactiveTest.OnNext<byte>(20,//Control character 
    ReactiveTest.OnNext<byte>(21,//Data
    ReactiveTest.OnNext<byte>(22,//EOM m4
    ReactiveTest.OnNext<byte>(23,//BOM m5
    ReactiveTest.OnNext<byte>(24,ReactiveTest.OnNext<byte>(25,ReactiveTest.OnNext<byte>(26,ReactiveTest.OnNext<byte>(27,//Control character 
    ReactiveTest.OnNext<byte>(28,//Data
    ReactiveTest.OnNext<byte>(29,//EOM m5
    ReactiveTest.OnNext<byte>(30,0x82));//Ignored (expected 0x81)

var observer = scheduler.CreateObserver<Message>();

//CurrentAnswer(source)
MyAnswer(source)
    .Subscribe(observer);

scheduler.Start();

ReactiveAssert.AreElementsEqual(
    new[] {
        ReactiveTest.OnNext(05,new Message(){Header=new byte[]{0x01,0x02,0x03},Data=new byte[0]{}}),ReactiveTest.OnNext(10,new Message(){Header=new byte[]{0x82,0x81,0x82},ReactiveTest.OnNext(15,0x1B},ReactiveTest.OnNext(22,Data=new byte[]{ 0x82}}),ReactiveTest.OnNext(29,Data=new byte[]{ 0x1B}}),},observer.Messages);

我还写了一个允许我验证代码的Message版本

public class Message
{
    public static readonly byte BOM = 0x81;
    public static readonly byte EOM = 0x82;
    public static readonly byte Control = 0x1B;

    public byte[] Header { get; set; }
    public byte[] Data { get; set; }

    public static Message Create(byte[] bytes)
    {   
        if(bytes==null)
            throw new ArgumentNullException(nameof(bytes));
        if(bytes.Length<3)
            throw new ArgumentException("bytes<3").Dump();


        var header = new byte[3];
        Array.copy(bytes,header,3);

        var body = new List<byte>();
        var escapeNext = false;
        for (int i = 3; i < bytes.Length; i++)
        {
            var b = bytes[i];

            if (b == Control && !escapeNext)
            {
                escapeNext = true;
            }
            else
            {
                body.Add(b);
                escapeNext = false;
            }
        }
        var msg = new Message { Header = header,Data = body.ToArray()};
        return msg;
    }

    public override string ToString()
    {
        return string.Format("Message(Header=[{0}],Data=[{1}])",ByteArrayString(Header),ByteArrayString(Data));
    }

    private static string ByteArrayString(byte[] bytes)
    {
        return string.Join(",",bytes.Select(b => b.ToString("X")));
    }

    public override bool Equals(object obj)
    {
        var other = obj as Message;
        if(obj==null)
            return false;
        return Equals(other);
    }

    protected bool Equals(Message other)
    {
        return IsSequenceEqual(Header,other.Header) 
            && IsSequenceEqual(Data,other.Data);
    }

    private bool IsSequenceEqual<T>(IEnumerable<T> expected,IEnumerable<T> other)
    {
        if(expected==null && other==null)
            return true;
        if(expected==null || other==null)
            return false;
        return expected.SequenceEqual(other);
    }

    public override int GetHashCode()
    {
        unchecked
        {
            return ((Header != null ? Header.GetHashCode() : 0) * 397) ^ (Data != null ? Data.GetHashCode() : 0);
        }
    }
}

现在我拥有了所有的管道,我可以专注于实际的问题.

public static IObservable<Message> MyAnswer(IObservable<byte> source)
{
    return source.Publish(s =>
        {

            return 
                Observable.Defer(()=>
                    //Start consuming once we see a BOM
                    s.Skipwhile(b => b != Message.BOM)
                     .Scan(new Accumulator(),(acc,cur)=>acc.Accumulate(cur))
                )
                .TakeWhile(acc=>!acc.IsEndOfMessage())
                .Where(acc=>!acc.IsBeginingOfMessage())
                .Select(acc=>acc.Value())
                .ToArray()
                .Where(buffer=>buffer.Any())
                .Select(buffer => Message.Create(buffer))
                .Repeat();
        }); 

}
public class Accumulator
{
    private int _index = 0;
    private byte _current =0;
    private bool _isCurrentEscaped = false;
    private bool _isNextEscaped = false;

    public Accumulator Accumulate(byte b)
    {
        _index++;
        _current = b;
        _isCurrentEscaped = _isNextEscaped;
        _isNextEscaped = (!IsHeader() && !_isCurrentEscaped && b==Message.Control);
        return this;
    }
    public byte Value()
    {
        return _current;
    }

    private bool IsHeader()
    {
        return _index < 5;
    }
    public bool IsBeginingOfMessage()
    {
        return _index == 1 && _current == Message.BOM;
    }
    public bool IsEndOfMessage()
    {
        return !IsHeader()
            && _current == Message.EOM 
            && !_isCurrentEscaped;
    }
}

为了完整起见,这里是@Evk的答案的胆量,所以你可以轻松地交换实现.

public static IObservable<Message> CurrentAnswer(IObservable<byte> source)
{
    return Observable.Create<Message>(o =>
    {
        // some crude parsing code for the sake of example
        bool nextIsEscaped = false;
        bool readingHeader = false;
        bool readingBody = false;
        List<byte> body = new List<byte>();
        List<byte> header = new List<byte>();
        return source.Subscribe(b =>
        {
            if (b == 0x81 && !nextIsEscaped && !readingHeader)
            {
                // start
                readingHeader = true;
                readingBody = false;
                nextIsEscaped = false;
            }
            else if (b == 0x82 && !nextIsEscaped && !readingHeader)
            {
                // end
                readingHeader = false;
                readingBody = false;
                if (header.Count > 0 || body.Count > 0)
                {
                    o.OnNext(new Message()
                    {
                        Header = header.ToArray(),Data = body.ToArray()
                    });
                    header.Clear();
                    body.Clear();
                }
                nextIsEscaped = false;
            }
            else if (b == 0x1B && !nextIsEscaped && !readingHeader)
            {
                nextIsEscaped = true;
            }
            else
            {
                if (readingHeader)
                {
                    header.Add(b);
                    if (header.Count == 3)
                    {
                        readingHeader = false;
                        readingBody = true;
                    }
                }
                else if (readingBody)
                    body.Add(b);
                nextIsEscaped = false;
            }

        });
    });

}

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


原文地址:http://msdn.microsoft.com/en-us/magazine/cc163791.aspx 原文发布日期: 9/19/2005 原文已经被 Microsoft 删除了,收集过程中发现很多文章图都不全,那是因为原文的图都不全,所以特收集完整全文。 目录 前言 CLR启动程序
前言 随着近些年微服务的流行,有越来越多的开发者和团队所采纳和使用,它的确提供了很多的优势也解决了很多的问题,但是我们也知道也并不是银弹,提供优势的同时它也给我们的开发人员和团队也带来了很多的挑战。 为了迎接或者采用这些新技术,开发团队需要更加注重一些流程或工具的使用,这样才能更好的适应这些新技术所
最近因为比较忙,好久没有写博客了,这篇主要给大家分享一下PLINQ中的分区。上一篇介绍了并行编程,这边详细介绍一下并行编程中的分区和自定义分区。 先做个假设,假设我们有一个200Mb的文本文件需要读取,怎么样才能做到最优的速度呢?对,很显然就是拆分,把文本文件拆分成很多个小文件,充分利用我们计算机中
在多核CPU在今天和不久的将来,计算机将拥有更多的内核,Microsoft为了利用这个硬件特性,于是在Visual Studio 2010 和 .NET Framework 4的发布及以上版本中,添加了并行编程这个新特性,我想它以后势必会改变我们的开发方式。 在以前或者说现在,我们在并行开发的时候可
c语言输入成绩怎么判断等级
字符型数据在内存中的存储形式是什么
c语言怎么求字符串的长度并输出
c语言函数的三种调用方式是什么
c语言中保留两位小数怎么表示
double的输入格式符是什么
长整型输出格式是什么
C语言中文件包含的命令关键字是什么
c程序如何编写x的y次方
c语言开根号代码是什么
c语言怎么进行字符串比较
c语言怎么进行强制类型转换
c语言运算符的优先级顺序是什么
c++用什么软件编程
中序遍历是怎么遍历的
h文件和c文件的关系是什么