c# – Rx.Net消息解析器

前端之家收集整理的这篇文章主要介绍了c# – Rx.Net消息解析器前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。
我正在尝试解析表示消息的传入字节流.
我需要拆分流并为每个部分创建一个消息结构.

消息始终以0x81(BOM)开头,以0x82(EOM)结束.

  1. start: 0x81
  2. header: 3 bytes
  3. data: arbitrary length
  4. stop: 0x82

数据部分使用转义字节0x1B(ESC)进行转义:只要数据部分中的一个字节包含控制字节{ESC,BOM,EOM}之一,它就会以ESC为前缀.

标题部分未转义,可能包含控制字节.

我想使用Rx.Net以功能性的反应式编码,通过使用IObservable< byte>并将其转换为IObservable< Message>.

最常用的方法是什么?

一些例子:

  1. [81 01 02 03 82] single message
  2. [81 82 81 82 82] single message,header = [82 81 82]
  3. [81 01 02 1B 82] single message,header = [01 02 1B].
  4. [81 01 02 03 1B 82 82] single message,header = [01 02 03],(unescaped) data = [82]
  5. [81 01 02 03 1B 1B 82 82] single message + dangling [82] which should be ignored.
  6. header = [01 02 03],(unescaped) data = [1B]

这是一个状态机绘图:

解决方法

如果你正在寻找“功能更强大”的东西,那么这可能会有所帮助,但@Evk的答案也会通过这些测试.

首先我可以建议,为了提供一个可验证的答案,你可以提供一个测试套件来实现这样的复杂问题.

像这样的东西会非常有帮助.

  1. var scheduler = new TestScheduler();
  2. var source = scheduler.CreateColdObservable<byte>(
  3. ReactiveTest.OnNext<byte>(01,0x81),//BOM m1
  4. ReactiveTest.OnNext<byte>(02,0x01),ReactiveTest.OnNext<byte>(03,0x02),ReactiveTest.OnNext<byte>(04,0x03),ReactiveTest.OnNext<byte>(05,0x82),//EOM m1
  5. ReactiveTest.OnNext<byte>(06,//BOM m2
  6. ReactiveTest.OnNext<byte>(07,ReactiveTest.OnNext<byte>(08,ReactiveTest.OnNext<byte>(09,ReactiveTest.OnNext<byte>(10,//EOM m2
  7. ReactiveTest.OnNext<byte>(11,//BOM m3
  8. ReactiveTest.OnNext<byte>(12,ReactiveTest.OnNext<byte>(13,ReactiveTest.OnNext<byte>(14,0x1B),ReactiveTest.OnNext<byte>(15,//EOM m3
  9. ReactiveTest.OnNext<byte>(16,//BOM m4
  10. ReactiveTest.OnNext<byte>(17,ReactiveTest.OnNext<byte>(18,ReactiveTest.OnNext<byte>(19,ReactiveTest.OnNext<byte>(20,//Control character
  11. ReactiveTest.OnNext<byte>(21,//Data
  12. ReactiveTest.OnNext<byte>(22,//EOM m4
  13. ReactiveTest.OnNext<byte>(23,//BOM m5
  14. ReactiveTest.OnNext<byte>(24,ReactiveTest.OnNext<byte>(25,ReactiveTest.OnNext<byte>(26,ReactiveTest.OnNext<byte>(27,//Control character
  15. ReactiveTest.OnNext<byte>(28,//Data
  16. ReactiveTest.OnNext<byte>(29,//EOM m5
  17. ReactiveTest.OnNext<byte>(30,0x82));//Ignored (expected 0x81)
  18.  
  19. var observer = scheduler.CreateObserver<Message>();
  20.  
  21. //CurrentAnswer(source)
  22. MyAnswer(source)
  23. .Subscribe(observer);
  24.  
  25. scheduler.Start();
  26.  
  27. ReactiveAssert.AreElementsEqual(
  28. new[] {
  29. ReactiveTest.OnNext(05,new Message(){Header=new byte[]{0x01,0x02,0x03},Data=new byte[0]{}}),ReactiveTest.OnNext(10,new Message(){Header=new byte[]{0x82,0x81,0x82},ReactiveTest.OnNext(15,0x1B},ReactiveTest.OnNext(22,Data=new byte[]{ 0x82}}),ReactiveTest.OnNext(29,Data=new byte[]{ 0x1B}}),},observer.Messages);

我还写了一个允许我验证代码的Message版本

  1. public class Message
  2. {
  3. public static readonly byte BOM = 0x81;
  4. public static readonly byte EOM = 0x82;
  5. public static readonly byte Control = 0x1B;
  6.  
  7. public byte[] Header { get; set; }
  8. public byte[] Data { get; set; }
  9.  
  10. public static Message Create(byte[] bytes)
  11. {
  12. if(bytes==null)
  13. throw new ArgumentNullException(nameof(bytes));
  14. if(bytes.Length<3)
  15. throw new ArgumentException("bytes<3").Dump();
  16.  
  17.  
  18. var header = new byte[3];
  19. Array.Copy(bytes,header,3);
  20.  
  21. var body = new List<byte>();
  22. var escapeNext = false;
  23. for (int i = 3; i < bytes.Length; i++)
  24. {
  25. var b = bytes[i];
  26.  
  27. if (b == Control && !escapeNext)
  28. {
  29. escapeNext = true;
  30. }
  31. else
  32. {
  33. body.Add(b);
  34. escapeNext = false;
  35. }
  36. }
  37. var msg = new Message { Header = header,Data = body.ToArray()};
  38. return msg;
  39. }
  40.  
  41. public override string ToString()
  42. {
  43. return string.Format("Message(Header=[{0}],Data=[{1}])",ByteArrayString(Header),ByteArrayString(Data));
  44. }
  45.  
  46. private static string ByteArrayString(byte[] bytes)
  47. {
  48. return string.Join(",",bytes.Select(b => b.ToString("X")));
  49. }
  50.  
  51. public override bool Equals(object obj)
  52. {
  53. var other = obj as Message;
  54. if(obj==null)
  55. return false;
  56. return Equals(other);
  57. }
  58.  
  59. protected bool Equals(Message other)
  60. {
  61. return IsSequenceEqual(Header,other.Header)
  62. && IsSequenceEqual(Data,other.Data);
  63. }
  64.  
  65. private bool IsSequenceEqual<T>(IEnumerable<T> expected,IEnumerable<T> other)
  66. {
  67. if(expected==null && other==null)
  68. return true;
  69. if(expected==null || other==null)
  70. return false;
  71. return expected.SequenceEqual(other);
  72. }
  73.  
  74. public override int GetHashCode()
  75. {
  76. unchecked
  77. {
  78. return ((Header != null ? Header.GetHashCode() : 0) * 397) ^ (Data != null ? Data.GetHashCode() : 0);
  79. }
  80. }
  81. }

现在我拥有了所有的管道,我可以专注于实际的问题.

  1. public static IObservable<Message> MyAnswer(IObservable<byte> source)
  2. {
  3. return source.Publish(s =>
  4. {
  5.  
  6. return
  7. Observable.Defer(()=>
  8. //Start consuming once we see a BOM
  9. s.SkipWhile(b => b != Message.BOM)
  10. .Scan(new Accumulator(),(acc,cur)=>acc.Accumulate(cur))
  11. )
  12. .TakeWhile(acc=>!acc.IsEndOfMessage())
  13. .Where(acc=>!acc.IsBeginingOfMessage())
  14. .Select(acc=>acc.Value())
  15. .ToArray()
  16. .Where(buffer=>buffer.Any())
  17. .Select(buffer => Message.Create(buffer))
  18. .Repeat();
  19. });
  20.  
  21. }
  22. public class Accumulator
  23. {
  24. private int _index = 0;
  25. private byte _current =0;
  26. private bool _isCurrentEscaped = false;
  27. private bool _isNextEscaped = false;
  28.  
  29. public Accumulator Accumulate(byte b)
  30. {
  31. _index++;
  32. _current = b;
  33. _isCurrentEscaped = _isNextEscaped;
  34. _isNextEscaped = (!IsHeader() && !_isCurrentEscaped && b==Message.Control);
  35. return this;
  36. }
  37. public byte Value()
  38. {
  39. return _current;
  40. }
  41.  
  42. private bool IsHeader()
  43. {
  44. return _index < 5;
  45. }
  46. public bool IsBeginingOfMessage()
  47. {
  48. return _index == 1 && _current == Message.BOM;
  49. }
  50. public bool IsEndOfMessage()
  51. {
  52. return !IsHeader()
  53. && _current == Message.EOM
  54. && !_isCurrentEscaped;
  55. }
  56. }

为了完整起见,这里是@Evk的答案的胆量,所以你可以轻松地交换实现.

  1. public static IObservable<Message> CurrentAnswer(IObservable<byte> source)
  2. {
  3. return Observable.Create<Message>(o =>
  4. {
  5. // some crude parsing code for the sake of example
  6. bool nextIsEscaped = false;
  7. bool readingHeader = false;
  8. bool readingBody = false;
  9. List<byte> body = new List<byte>();
  10. List<byte> header = new List<byte>();
  11. return source.Subscribe(b =>
  12. {
  13. if (b == 0x81 && !nextIsEscaped && !readingHeader)
  14. {
  15. // start
  16. readingHeader = true;
  17. readingBody = false;
  18. nextIsEscaped = false;
  19. }
  20. else if (b == 0x82 && !nextIsEscaped && !readingHeader)
  21. {
  22. // end
  23. readingHeader = false;
  24. readingBody = false;
  25. if (header.Count > 0 || body.Count > 0)
  26. {
  27. o.OnNext(new Message()
  28. {
  29. Header = header.ToArray(),Data = body.ToArray()
  30. });
  31. header.Clear();
  32. body.Clear();
  33. }
  34. nextIsEscaped = false;
  35. }
  36. else if (b == 0x1B && !nextIsEscaped && !readingHeader)
  37. {
  38. nextIsEscaped = true;
  39. }
  40. else
  41. {
  42. if (readingHeader)
  43. {
  44. header.Add(b);
  45. if (header.Count == 3)
  46. {
  47. readingHeader = false;
  48. readingBody = true;
  49. }
  50. }
  51. else if (readingBody)
  52. body.Add(b);
  53. nextIsEscaped = false;
  54. }
  55.  
  56. });
  57. });
  58.  
  59. }

猜你在找的C#相关文章