c# – 我做错了什么,或者不可能并行提取zip文件?

前端之家收集整理的这篇文章主要介绍了c# – 我做错了什么,或者不可能并行提取zip文件?前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。
我创建了这个测试一个并行提取
  1. public static async Task ExtractToDirectoryAsync(this FileInfo file,DirectoryInfo folder)
  2. {
  3.  
  4. ActionBlock<ZipArchiveEntry> block = new ActionBlock<ZipArchiveEntry>((entry) =>
  5. {
  6. var path = Path.Combine(folder.FullName,entry.FullName);
  7.  
  8. Directory.CreateDirectory(Path.GetDirectoryName(path));
  9. entry.ExtractToFile(path);
  10.  
  11. },new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 2 });
  12.  
  13. using (var archive = ZipFile.OpenRead(file.FullName))
  14. {
  15. foreach (var entry in archive.Entries.Where(e => e.Name != string.Empty))
  16. {
  17. block.Post(entry);
  18. }
  19. block.Complete();
  20. await block.Completion;
  21. }
  22.  
  23. }

并进行以下测试单元测试:

  1. [TestMethod]
  2. public async Task ExtractTestAsync()
  3. {
  4. if (Resources.LocalExtractFolder.Exists)
  5. Resources.LocalExtractFolder.Delete(true);
  6. // Resources.LocalExtractFolder.Create();
  7. await Resources.WebsiteZip.ExtractToDirectoryAsync(Resources.LocalExtractFolder);
  8. }

使用MaxDegreeOfParallelism = 1,事情工作,但2它不.

  1. Test Name: ExtractTestAsync
  2. Test FullName: Composite.Azure.Tests.ZipFileTests.ExtractTestAsync
  3. Test Source: c:\Development\C1\local\CompositeC1\Composite.Azure.Tests\ZipFileTests.cs : line 21
  4. Test Outcome: Failed
  5. Test Duration: 0:00:02.4138753
  6.  
  7. Result Message:
  8. Test method Composite.Azure.Tests.ZipFileTests.ExtractTestAsync threw exception:
  9. System.IO.InvalidDataException: Unknown block type. Stream might be corrupted.
  10. Result StackTrace:
  11. at System.IO.Compression.Inflater.Decode()
  12. at System.IO.Compression.Inflater.Inflate(Byte[] bytes,Int32 offset,Int32 length)
  13. at System.IO.Compression.DeflateStream.Read(Byte[] array,Int32 count)
  14. at System.IO.Stream.InternalCopyTo(Stream destination,Int32 bufferSize)
  15. at System.IO.Stream.CopyTo(Stream destination)
  16. at System.IO.Compression.ZipFileExtensions.ExtractToFile(ZipArchiveEntry source,String destinationFileName,Boolean overwrite)
  17. at System.IO.Compression.ZipFileExtensions.ExtractToFile(ZipArchiveEntry source,String destinationFileName)
  18. at Composite.Azure.Storage.Compression.ZipArchiveExtensions.<>c__DisplayClass6.<ExtractToDirectoryAsync>b__3(ZipArchiveEntry entry) in c:\Development\C1\local\CompositeC1\Composite.Azure.Storage\Compression\ZipArchiveExtensions.cs:line 37
  19. at System.Threading.Tasks.Dataflow.ActionBlock`1.ProcessMessage(Action`1 action,KeyValuePair`2 messageWithId)
  20. at System.Threading.Tasks.Dataflow.ActionBlock`1.<>c__DisplayClass5.<.ctor>b__0(KeyValuePair`2 messageWithId)
  21. at System.Threading.Tasks.Dataflow.Internal.TargetCore`1.ProcessMessagesLoopCore()
  22. --- End of stack trace from prevIoUs location where exception was thrown ---
  23. at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
  24. at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
  25. at System.Runtime.CompilerServices.TaskAwaiter.GetResult()
  26. at Composite.Azure.Storage.Compression.ZipArchiveExtensions.<ExtractToDirectoryAsync>d__8.MoveNext() in c:\Development\C1\local\CompositeC1\Composite.Azure.Storage\Compression\ZipArchiveExtensions.cs:line 48
  27. --- End of stack trace from prevIoUs location where exception was thrown ---
  28. at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
  29. at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
  30. at System.Runtime.CompilerServices.TaskAwaiter.GetResult()
  31. at Composite.Azure.Tests.ZipFileTests.<ExtractTestAsync>d__2.MoveNext() in c:\Development\C1\local\CompositeC1\Composite.Azure.Tests\ZipFileTests.cs:line 25
  32. --- End of stack trace from prevIoUs location where exception was thrown ---
  33. at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
  34. at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
  35. at System.Runtime.CompilerServices.TaskAwaiter.GetResult()

更新2

这是我自己去做并行,它不工作:)记住在continueWith中处理异常.

  1. public static void ExtractToDirectorySemaphore(this FileInfo file,DirectoryInfo folder)
  2. {
  3.  
  4. int MaxDegreeOfParallelism = 2;
  5. using (var archive = ZipFile.OpenRead(file.FullName))
  6. {
  7.  
  8. var semaphore = new Semaphore(MaxDegreeOfParallelism,MaxDegreeOfParallelism);
  9.  
  10. foreach (var entry in archive.Entries.Where(e => e.Name != string.Empty))
  11. {
  12. semaphore.WaitOne();
  13.  
  14. var task = Task.Run(() =>
  15. {
  16. var path = Path.Combine(folder.FullName,entry.FullName);
  17.  
  18. Directory.CreateDirectory(Path.GetDirectoryName(path));
  19. entry.ExtractToFile(path);
  20. });
  21. task.ContinueWith(handle =>
  22. {
  23. try
  24. {
  25. //do any cleanup/post processing
  26. }
  27. finally
  28. {
  29. // Release the semaphore so the next thing can be processed
  30. semaphore.Release();
  31. }
  32. });
  33. }
  34. while(MaxDegreeOfParallelism-->0)
  35. semaphore.WaitOne(); //Wait here until the last task completes.
  36.  
  37.  
  38. }
  39.  
  40. }

这里是异步版本:

  1. public static Task ExtractToDirectorySemaphoreAsync(this FileInfo file,DirectoryInfo folder)
  2. {
  3. return Task.Factory.StartNew(() =>
  4. {
  5. int MaxDegreeOfParallelism = 50;
  6. using (var archive = ZipFile.OpenRead(file.FullName))
  7. {
  8.  
  9. var semaphore = new Semaphore(MaxDegreeOfParallelism,MaxDegreeOfParallelism);
  10.  
  11. foreach (var entry in archive.Entries.Where(e => e.Name != string.Empty))
  12. {
  13. semaphore.WaitOne();
  14.  
  15. var task = Task.Run(() =>
  16. {
  17. var path = Path.Combine(folder.FullName,entry.FullName);
  18.  
  19. Directory.CreateDirectory(Path.GetDirectoryName(path));
  20. entry.ExtractToFile(path);
  21. });
  22. task.ContinueWith(handle =>
  23. {
  24. try
  25. {
  26. //do any cleanup/post processing
  27. }
  28. finally
  29. {
  30. // Release the semaphore so the next thing can be processed
  31. semaphore.Release();
  32. }
  33. },TaskContinuationOptions.AttachedToParent); // the outher task will wait for all.
  34. }
  35.  
  36. }
  37. });
  38. }

更新3

在handle.Exception中抛出以下异常.

  1. {"Block length does not match with its complement."}
  2. [0] = {"A local file header is corrupt."}

必须找出ZipFile是否是线程安全的.

解决方法

拆解器:它只是一个概念证明.

代码中的示例中使用ParallelZipFile.OpenRead替换ZipFile.OpenRead全部4个单位测试通过.

  1. public class ParallelZipFile
  2. {
  3. public static ParallelZipArchive OpenRead(string path)
  4. {
  5.  
  6. return new ParallelZipArchive(ZipFile.OpenRead(path),path);
  7. }
  8. }
  9. public class ParallelZipArchive : IDisposable
  10. {
  11. internal ZipArchive _archive;
  12. internal string _path;
  13. internal ConcurrentQueue<ZipArchive> FreeReaders = new ConcurrentQueue<ZipArchive>();
  14.  
  15. public ParallelZipArchive(ZipArchive zip,string path)
  16. {
  17. _path = path;
  18. _archive = zip;
  19. FreeReaders.Enqueue(zip);
  20. }
  21.  
  22. public ReadOnlyCollection<ParallelZipArchiveEntry> Entries
  23. {
  24. get
  25. {
  26. var list = new List<ParallelZipArchiveEntry>(_archive.Entries.Count);
  27. int i = 0;
  28. foreach (var entry in _archive.Entries)
  29. list.Add(new ParallelZipArchiveEntry(i++,entry,this));
  30.  
  31. return new ReadOnlyCollection<ParallelZipArchiveEntry>(list);
  32. }
  33. }
  34.  
  35.  
  36. public void Dispose()
  37. {
  38. foreach (var archive in FreeReaders)
  39. archive.Dispose();
  40. }
  41. }
  42. public class ParallelZipArchiveEntry
  43. {
  44. private ParallelZipArchive _parent;
  45. private int _entry;
  46. public string Name { get; set; }
  47. public string FullName { get; set; }
  48.  
  49. public ParallelZipArchiveEntry(int entryNr,ZipArchiveEntry entry,ParallelZipArchive parent)
  50. {
  51. _entry = entryNr;
  52. _parent = parent;
  53. Name = entry.Name;
  54. FullName = entry.FullName;
  55. }
  56.  
  57. public void ExtractToFile(string path)
  58. {
  59. ZipArchive value;
  60. Trace.TraceInformation(string.Format("Number of readers: {0}",_parent.FreeReaders.Count));
  61.  
  62. if (!_parent.FreeReaders.TryDequeue(out value))
  63. value = ZipFile.OpenRead(_parent._path);
  64.  
  65. value.Entries.Skip(_entry).First().ExtractToFile(path);
  66.  
  67.  
  68.  
  69. _parent.FreeReaders.Enqueue(value);
  70. }
  71. }

单元测试

  1. [TestClass]
  2. public class ZipFileTests
  3. {
  4. [ClassInitialize()]
  5. public static void PreInitialize(TestContext context)
  6. {
  7. if (Resources.LocalExtractFolderTruth.Exists)
  8. Resources.LocalExtractFolderTruth.Delete(true);
  9.  
  10. ZipFile.ExtractToDirectory(Resources.WebsiteZip.FullName,Resources.LocalExtractFolderTruth.FullName);
  11. }
  12.  
  13. [TestInitialize()]
  14. public void InitializeTests()
  15. {
  16. if (Resources.LocalExtractFolder.Exists)
  17. Resources.LocalExtractFolder.Delete(true);
  18.  
  19. }
  20.  
  21. [TestMethod]
  22. public void ExtractTest()
  23. {
  24.  
  25. Resources.WebsiteZip.ExtractToDirectory(Resources.LocalExtractFolder);
  26.  
  27. Assert.IsTrue(Helpers.DirectoryTools.CompareDirectories(
  28. Resources.LocalExtractFolderTruth,Resources.LocalExtractFolder));
  29.  
  30. }
  31. [TestMethod]
  32. public async Task ExtractAsyncTest()
  33. {
  34.  
  35. await Resources.WebsiteZip.ExtractToDirectoryAsync(Resources.LocalExtractFolder);
  36.  
  37. Assert.IsTrue(Helpers.DirectoryTools.CompareDirectories(
  38. Resources.LocalExtractFolderTruth,Resources.LocalExtractFolder));
  39. }
  40. [TestMethod]
  41. public void ExtractSemaphoreTest()
  42. {
  43.  
  44. Resources.WebsiteZip.ExtractToDirectorySemaphore(Resources.LocalExtractFolder);
  45. Assert.IsTrue(Helpers.DirectoryTools.CompareDirectories(
  46. Resources.LocalExtractFolderTruth,Resources.LocalExtractFolder));
  47. }
  48. [TestMethod]
  49. public async Task ExtractSemaphoreAsyncTest()
  50. {
  51.  
  52. await Resources.WebsiteZip.ExtractToDirectorySemaphoreAsync(Resources.LocalExtractFolder);
  53. Assert.IsTrue(Helpers.DirectoryTools.CompareDirectories(
  54. Resources.LocalExtractFolderTruth,Resources.LocalExtractFolder));
  55. }
  56.  
  57. }

猜你在找的C#相关文章