我创建了这个测试一个并行提取:
- public static async Task ExtractToDirectoryAsync(this FileInfo file,DirectoryInfo folder)
- {
- ActionBlock<ZipArchiveEntry> block = new ActionBlock<ZipArchiveEntry>((entry) =>
- {
- var path = Path.Combine(folder.FullName,entry.FullName);
- Directory.CreateDirectory(Path.GetDirectoryName(path));
- entry.ExtractToFile(path);
- },new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 2 });
- using (var archive = ZipFile.OpenRead(file.FullName))
- {
- foreach (var entry in archive.Entries.Where(e => e.Name != string.Empty))
- {
- block.Post(entry);
- }
- block.Complete();
- await block.Completion;
- }
- }
并进行以下测试单元测试:
- [TestMethod]
- public async Task ExtractTestAsync()
- {
- if (Resources.LocalExtractFolder.Exists)
- Resources.LocalExtractFolder.Delete(true);
- // Resources.LocalExtractFolder.Create();
- await Resources.WebsiteZip.ExtractToDirectoryAsync(Resources.LocalExtractFolder);
- }
使用MaxDegreeOfParallelism = 1,事情工作,但2它不.
- Test Name: ExtractTestAsync
- Test FullName: Composite.Azure.Tests.ZipFileTests.ExtractTestAsync
- Test Source: c:\Development\C1\local\CompositeC1\Composite.Azure.Tests\ZipFileTests.cs : line 21
- Test Outcome: Failed
- Test Duration: 0:00:02.4138753
- Result Message:
- Test method Composite.Azure.Tests.ZipFileTests.ExtractTestAsync threw exception:
- System.IO.InvalidDataException: Unknown block type. Stream might be corrupted.
- Result StackTrace:
- at System.IO.Compression.Inflater.Decode()
- at System.IO.Compression.Inflater.Inflate(Byte[] bytes,Int32 offset,Int32 length)
- at System.IO.Compression.DeflateStream.Read(Byte[] array,Int32 count)
- at System.IO.Stream.InternalCopyTo(Stream destination,Int32 bufferSize)
- at System.IO.Stream.CopyTo(Stream destination)
- at System.IO.Compression.ZipFileExtensions.ExtractToFile(ZipArchiveEntry source,String destinationFileName,Boolean overwrite)
- at System.IO.Compression.ZipFileExtensions.ExtractToFile(ZipArchiveEntry source,String destinationFileName)
- at Composite.Azure.Storage.Compression.ZipArchiveExtensions.<>c__DisplayClass6.<ExtractToDirectoryAsync>b__3(ZipArchiveEntry entry) in c:\Development\C1\local\CompositeC1\Composite.Azure.Storage\Compression\ZipArchiveExtensions.cs:line 37
- at System.Threading.Tasks.Dataflow.ActionBlock`1.ProcessMessage(Action`1 action,KeyValuePair`2 messageWithId)
- at System.Threading.Tasks.Dataflow.ActionBlock`1.<>c__DisplayClass5.<.ctor>b__0(KeyValuePair`2 messageWithId)
- at System.Threading.Tasks.Dataflow.Internal.TargetCore`1.ProcessMessagesLoopCore()
- --- End of stack trace from prevIoUs location where exception was thrown ---
- at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
- at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
- at System.Runtime.CompilerServices.TaskAwaiter.GetResult()
- at Composite.Azure.Storage.Compression.ZipArchiveExtensions.<ExtractToDirectoryAsync>d__8.MoveNext() in c:\Development\C1\local\CompositeC1\Composite.Azure.Storage\Compression\ZipArchiveExtensions.cs:line 48
- --- End of stack trace from prevIoUs location where exception was thrown ---
- at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
- at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
- at System.Runtime.CompilerServices.TaskAwaiter.GetResult()
- at Composite.Azure.Tests.ZipFileTests.<ExtractTestAsync>d__2.MoveNext() in c:\Development\C1\local\CompositeC1\Composite.Azure.Tests\ZipFileTests.cs:line 25
- --- End of stack trace from prevIoUs location where exception was thrown ---
- at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
- at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
- at System.Runtime.CompilerServices.TaskAwaiter.GetResult()
更新2
这是我自己去做并行,它不工作:)记住在continueWith中处理异常.
- public static void ExtractToDirectorySemaphore(this FileInfo file,DirectoryInfo folder)
- {
- int MaxDegreeOfParallelism = 2;
- using (var archive = ZipFile.OpenRead(file.FullName))
- {
- var semaphore = new Semaphore(MaxDegreeOfParallelism,MaxDegreeOfParallelism);
- foreach (var entry in archive.Entries.Where(e => e.Name != string.Empty))
- {
- semaphore.WaitOne();
- var task = Task.Run(() =>
- {
- var path = Path.Combine(folder.FullName,entry.FullName);
- Directory.CreateDirectory(Path.GetDirectoryName(path));
- entry.ExtractToFile(path);
- });
- task.ContinueWith(handle =>
- {
- try
- {
- //do any cleanup/post processing
- }
- finally
- {
- // Release the semaphore so the next thing can be processed
- semaphore.Release();
- }
- });
- }
- while(MaxDegreeOfParallelism-->0)
- semaphore.WaitOne(); //Wait here until the last task completes.
- }
- }
这里是异步版本:
- public static Task ExtractToDirectorySemaphoreAsync(this FileInfo file,DirectoryInfo folder)
- {
- return Task.Factory.StartNew(() =>
- {
- int MaxDegreeOfParallelism = 50;
- using (var archive = ZipFile.OpenRead(file.FullName))
- {
- var semaphore = new Semaphore(MaxDegreeOfParallelism,MaxDegreeOfParallelism);
- foreach (var entry in archive.Entries.Where(e => e.Name != string.Empty))
- {
- semaphore.WaitOne();
- var task = Task.Run(() =>
- {
- var path = Path.Combine(folder.FullName,entry.FullName);
- Directory.CreateDirectory(Path.GetDirectoryName(path));
- entry.ExtractToFile(path);
- });
- task.ContinueWith(handle =>
- {
- try
- {
- //do any cleanup/post processing
- }
- finally
- {
- // Release the semaphore so the next thing can be processed
- semaphore.Release();
- }
- },TaskContinuationOptions.AttachedToParent); // the outher task will wait for all.
- }
- }
- });
- }
更新3
在handle.Exception中抛出以下异常.
- {"Block length does not match with its complement."}
- [0] = {"A local file header is corrupt."}
必须找出ZipFile是否是线程安全的.
解决方法
拆解器:它只是一个概念证明.
在代码中的示例中使用ParallelZipFile.OpenRead替换ZipFile.OpenRead全部4个单位测试通过.
- public class ParallelZipFile
- {
- public static ParallelZipArchive OpenRead(string path)
- {
- return new ParallelZipArchive(ZipFile.OpenRead(path),path);
- }
- }
- public class ParallelZipArchive : IDisposable
- {
- internal ZipArchive _archive;
- internal string _path;
- internal ConcurrentQueue<ZipArchive> FreeReaders = new ConcurrentQueue<ZipArchive>();
- public ParallelZipArchive(ZipArchive zip,string path)
- {
- _path = path;
- _archive = zip;
- FreeReaders.Enqueue(zip);
- }
- public ReadOnlyCollection<ParallelZipArchiveEntry> Entries
- {
- get
- {
- var list = new List<ParallelZipArchiveEntry>(_archive.Entries.Count);
- int i = 0;
- foreach (var entry in _archive.Entries)
- list.Add(new ParallelZipArchiveEntry(i++,entry,this));
- return new ReadOnlyCollection<ParallelZipArchiveEntry>(list);
- }
- }
- public void Dispose()
- {
- foreach (var archive in FreeReaders)
- archive.Dispose();
- }
- }
- public class ParallelZipArchiveEntry
- {
- private ParallelZipArchive _parent;
- private int _entry;
- public string Name { get; set; }
- public string FullName { get; set; }
- public ParallelZipArchiveEntry(int entryNr,ZipArchiveEntry entry,ParallelZipArchive parent)
- {
- _entry = entryNr;
- _parent = parent;
- Name = entry.Name;
- FullName = entry.FullName;
- }
- public void ExtractToFile(string path)
- {
- ZipArchive value;
- Trace.TraceInformation(string.Format("Number of readers: {0}",_parent.FreeReaders.Count));
- if (!_parent.FreeReaders.TryDequeue(out value))
- value = ZipFile.OpenRead(_parent._path);
- value.Entries.Skip(_entry).First().ExtractToFile(path);
- _parent.FreeReaders.Enqueue(value);
- }
- }
单元测试
- [TestClass]
- public class ZipFileTests
- {
- [ClassInitialize()]
- public static void PreInitialize(TestContext context)
- {
- if (Resources.LocalExtractFolderTruth.Exists)
- Resources.LocalExtractFolderTruth.Delete(true);
- ZipFile.ExtractToDirectory(Resources.WebsiteZip.FullName,Resources.LocalExtractFolderTruth.FullName);
- }
- [TestInitialize()]
- public void InitializeTests()
- {
- if (Resources.LocalExtractFolder.Exists)
- Resources.LocalExtractFolder.Delete(true);
- }
- [TestMethod]
- public void ExtractTest()
- {
- Resources.WebsiteZip.ExtractToDirectory(Resources.LocalExtractFolder);
- Assert.IsTrue(Helpers.DirectoryTools.CompareDirectories(
- Resources.LocalExtractFolderTruth,Resources.LocalExtractFolder));
- }
- [TestMethod]
- public async Task ExtractAsyncTest()
- {
- await Resources.WebsiteZip.ExtractToDirectoryAsync(Resources.LocalExtractFolder);
- Assert.IsTrue(Helpers.DirectoryTools.CompareDirectories(
- Resources.LocalExtractFolderTruth,Resources.LocalExtractFolder));
- }
- [TestMethod]
- public void ExtractSemaphoreTest()
- {
- Resources.WebsiteZip.ExtractToDirectorySemaphore(Resources.LocalExtractFolder);
- Assert.IsTrue(Helpers.DirectoryTools.CompareDirectories(
- Resources.LocalExtractFolderTruth,Resources.LocalExtractFolder));
- }
- [TestMethod]
- public async Task ExtractSemaphoreAsyncTest()
- {
- await Resources.WebsiteZip.ExtractToDirectorySemaphoreAsync(Resources.LocalExtractFolder);
- Assert.IsTrue(Helpers.DirectoryTools.CompareDirectories(
- Resources.LocalExtractFolderTruth,Resources.LocalExtractFolder));
- }
- }