我正在尝试解析表示消息的传入字节流.
我需要拆分流并为每个部分创建一个消息结构.
我需要拆分流并为每个部分创建一个消息结构.
消息始终以0x81(BOM)开头,以0x82(EOM)结束.
- start: 0x81
- header: 3 bytes
- data: arbitrary length
- stop: 0x82
数据部分使用转义字节0x1B(ESC)进行转义:只要数据部分中的一个字节包含控制字节{ESC,BOM,EOM}之一,它就会以ESC为前缀.
标题部分未转义,可能包含控制字节.
我想使用Rx.Net以功能性的反应式编码,通过使用IObservable< byte>并将其转换为IObservable< Message>.
最常用的方法是什么?
一些例子:
- [81 01 02 03 82] single message
- [81 82 81 82 82] single message,header = [82 81 82]
- [81 01 02 1B 82] single message,header = [01 02 1B].
- [81 01 02 03 1B 82 82] single message,header = [01 02 03],(unescaped) data = [82]
- [81 01 02 03 1B 1B 82 82] single message + dangling [82] which should be ignored.
- header = [01 02 03],(unescaped) data = [1B]
解决方法
如果你正在寻找“功能更强大”的东西,那么这可能会有所帮助,但@Evk的答案也会通过这些测试.
首先我可以建议,为了提供一个可验证的答案,你可以提供一个测试套件来实现这样的复杂问题.
像这样的东西会非常有帮助.
- var scheduler = new TestScheduler();
- var source = scheduler.CreateColdObservable<byte>(
- ReactiveTest.OnNext<byte>(01,0x81),//BOM m1
- ReactiveTest.OnNext<byte>(02,0x01),ReactiveTest.OnNext<byte>(03,0x02),ReactiveTest.OnNext<byte>(04,0x03),ReactiveTest.OnNext<byte>(05,0x82),//EOM m1
- ReactiveTest.OnNext<byte>(06,//BOM m2
- ReactiveTest.OnNext<byte>(07,ReactiveTest.OnNext<byte>(08,ReactiveTest.OnNext<byte>(09,ReactiveTest.OnNext<byte>(10,//EOM m2
- ReactiveTest.OnNext<byte>(11,//BOM m3
- ReactiveTest.OnNext<byte>(12,ReactiveTest.OnNext<byte>(13,ReactiveTest.OnNext<byte>(14,0x1B),ReactiveTest.OnNext<byte>(15,//EOM m3
- ReactiveTest.OnNext<byte>(16,//BOM m4
- ReactiveTest.OnNext<byte>(17,ReactiveTest.OnNext<byte>(18,ReactiveTest.OnNext<byte>(19,ReactiveTest.OnNext<byte>(20,//Control character
- ReactiveTest.OnNext<byte>(21,//Data
- ReactiveTest.OnNext<byte>(22,//EOM m4
- ReactiveTest.OnNext<byte>(23,//BOM m5
- ReactiveTest.OnNext<byte>(24,ReactiveTest.OnNext<byte>(25,ReactiveTest.OnNext<byte>(26,ReactiveTest.OnNext<byte>(27,//Control character
- ReactiveTest.OnNext<byte>(28,//Data
- ReactiveTest.OnNext<byte>(29,//EOM m5
- ReactiveTest.OnNext<byte>(30,0x82));//Ignored (expected 0x81)
- var observer = scheduler.CreateObserver<Message>();
- //CurrentAnswer(source)
- MyAnswer(source)
- .Subscribe(observer);
- scheduler.Start();
- ReactiveAssert.AreElementsEqual(
- new[] {
- ReactiveTest.OnNext(05,new Message(){Header=new byte[]{0x01,0x02,0x03},Data=new byte[0]{}}),ReactiveTest.OnNext(10,new Message(){Header=new byte[]{0x82,0x81,0x82},ReactiveTest.OnNext(15,0x1B},ReactiveTest.OnNext(22,Data=new byte[]{ 0x82}}),ReactiveTest.OnNext(29,Data=new byte[]{ 0x1B}}),},observer.Messages);
我还写了一个允许我验证代码的Message版本
- public class Message
- {
- public static readonly byte BOM = 0x81;
- public static readonly byte EOM = 0x82;
- public static readonly byte Control = 0x1B;
- public byte[] Header { get; set; }
- public byte[] Data { get; set; }
- public static Message Create(byte[] bytes)
- {
- if(bytes==null)
- throw new ArgumentNullException(nameof(bytes));
- if(bytes.Length<3)
- throw new ArgumentException("bytes<3").Dump();
- var header = new byte[3];
- Array.Copy(bytes,header,3);
- var body = new List<byte>();
- var escapeNext = false;
- for (int i = 3; i < bytes.Length; i++)
- {
- var b = bytes[i];
- if (b == Control && !escapeNext)
- {
- escapeNext = true;
- }
- else
- {
- body.Add(b);
- escapeNext = false;
- }
- }
- var msg = new Message { Header = header,Data = body.ToArray()};
- return msg;
- }
- public override string ToString()
- {
- return string.Format("Message(Header=[{0}],Data=[{1}])",ByteArrayString(Header),ByteArrayString(Data));
- }
- private static string ByteArrayString(byte[] bytes)
- {
- return string.Join(",",bytes.Select(b => b.ToString("X")));
- }
- public override bool Equals(object obj)
- {
- var other = obj as Message;
- if(obj==null)
- return false;
- return Equals(other);
- }
- protected bool Equals(Message other)
- {
- return IsSequenceEqual(Header,other.Header)
- && IsSequenceEqual(Data,other.Data);
- }
- private bool IsSequenceEqual<T>(IEnumerable<T> expected,IEnumerable<T> other)
- {
- if(expected==null && other==null)
- return true;
- if(expected==null || other==null)
- return false;
- return expected.SequenceEqual(other);
- }
- public override int GetHashCode()
- {
- unchecked
- {
- return ((Header != null ? Header.GetHashCode() : 0) * 397) ^ (Data != null ? Data.GetHashCode() : 0);
- }
- }
- }
现在我拥有了所有的管道,我可以专注于实际的问题.
- public static IObservable<Message> MyAnswer(IObservable<byte> source)
- {
- return source.Publish(s =>
- {
- return
- Observable.Defer(()=>
- //Start consuming once we see a BOM
- s.SkipWhile(b => b != Message.BOM)
- .Scan(new Accumulator(),(acc,cur)=>acc.Accumulate(cur))
- )
- .TakeWhile(acc=>!acc.IsEndOfMessage())
- .Where(acc=>!acc.IsBeginingOfMessage())
- .Select(acc=>acc.Value())
- .ToArray()
- .Where(buffer=>buffer.Any())
- .Select(buffer => Message.Create(buffer))
- .Repeat();
- });
- }
- public class Accumulator
- {
- private int _index = 0;
- private byte _current =0;
- private bool _isCurrentEscaped = false;
- private bool _isNextEscaped = false;
- public Accumulator Accumulate(byte b)
- {
- _index++;
- _current = b;
- _isCurrentEscaped = _isNextEscaped;
- _isNextEscaped = (!IsHeader() && !_isCurrentEscaped && b==Message.Control);
- return this;
- }
- public byte Value()
- {
- return _current;
- }
- private bool IsHeader()
- {
- return _index < 5;
- }
- public bool IsBeginingOfMessage()
- {
- return _index == 1 && _current == Message.BOM;
- }
- public bool IsEndOfMessage()
- {
- return !IsHeader()
- && _current == Message.EOM
- && !_isCurrentEscaped;
- }
- }
为了完整起见,这里是@Evk的答案的胆量,所以你可以轻松地交换实现.
- public static IObservable<Message> CurrentAnswer(IObservable<byte> source)
- {
- return Observable.Create<Message>(o =>
- {
- // some crude parsing code for the sake of example
- bool nextIsEscaped = false;
- bool readingHeader = false;
- bool readingBody = false;
- List<byte> body = new List<byte>();
- List<byte> header = new List<byte>();
- return source.Subscribe(b =>
- {
- if (b == 0x81 && !nextIsEscaped && !readingHeader)
- {
- // start
- readingHeader = true;
- readingBody = false;
- nextIsEscaped = false;
- }
- else if (b == 0x82 && !nextIsEscaped && !readingHeader)
- {
- // end
- readingHeader = false;
- readingBody = false;
- if (header.Count > 0 || body.Count > 0)
- {
- o.OnNext(new Message()
- {
- Header = header.ToArray(),Data = body.ToArray()
- });
- header.Clear();
- body.Clear();
- }
- nextIsEscaped = false;
- }
- else if (b == 0x1B && !nextIsEscaped && !readingHeader)
- {
- nextIsEscaped = true;
- }
- else
- {
- if (readingHeader)
- {
- header.Add(b);
- if (header.Count == 3)
- {
- readingHeader = false;
- readingBody = true;
- }
- }
- else if (readingBody)
- body.Add(b);
- nextIsEscaped = false;
- }
- });
- });
- }