我将使用嵌套块来避免拆分每月数据,然后不得不再次合并它们。这是两个嵌套的TransformBlock
的示例,它们处理2020年的所有天:
var monthlyBlock = new TransformBlock<int,List<string>>(async (month) =>
{
var dailyBlock = new TransformBlock<int,string>(async (day) =>
{
await Task.Delay(100); // Simulate async work
return day.ToString();
},new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 4 });
foreach (var day in Enumerable.Range(1,DateTime.DaysInMonth(2020,month)))
await dailyBlock.SendAsync(day);
dailyBlock.Complete();
var dailyResults = await dailyBlock.ToListAsync();
return dailyResults;
},new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 1 });
foreach (var month in Enumerable.Range(1,12))
await monthlyBlock.SendAsync(month);
monthlyBlock.Complete();
为了收集内部区块的日常结果,我使用了扩展方法ToListAsync
,如下所示:
public static async Task<List<T>> ToListAsync<T>(this IReceivableSourceBlock<T> block,CancellationToken cancellationToken = default)
{
var list = new List<T>();
while (await block.OutputAvailableAsync(cancellationToken).ConfigureAwait(false))
{
while (block.TryReceive(out var item))
{
list.Add(item);
}
}
return list;
}
,
您的问题的答案是:不,您不需要其他块类型,是的,您可以使用TransformManyBlock两次,是的,这确实有意义。我在底部编写了一些代码来证明这一点,其后是一些注释。
如您所描述的,代码使用拆分然后合并管道。至于您正在苦苦挣扎的一点:将单个文件的数据合并回一起可以通过将处理后的项目添加到列表中来完成。然后,如果列表具有预期的最终项目数,则仅将列表传递到下一个块。这可以通过一个相当简单的TransformMany块返回零或一项来完成。由于列表不是线程安全的,因此无法并行处理此块。
一旦有了这样的管道,就可以使用传递给块的选项来测试并行化和排序。下面的代码将并行化设置为可以实现的每个块无限,并让DataFlow代码对其进行整理。在我的机器上,它最大化了所有内核/逻辑处理器,并且受CPU限制,这就是我们想要的。启用了排序功能,但将其关闭并没有多大区别:同样,我们受CPU限制。
最后,我不得不说这是一项非常酷的技术,但是实际上您可以使用PLINQ来解决此问题,它只需几行代码即可快速获得所需的信息。最大的缺点是,如果这样做,您将无法轻松地将快速到达的消息添加到管道中:PLINQ更适合于一个大批处理过程。但是,PLINQ可能是针对您的用例的更好解决方案。
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks.Dataflow;
namespace ParallelDataFlow
{
class Program
{
static void Main(string[] args)
{
new Program().Run();
Console.ReadLine();
}
private void Run()
{
Stopwatch s = new Stopwatch();
s.Start();
// Can experiment with parallelization of blocks by changing MaxDegreeOfParallelism
var options = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded };
var getInputPathsBlock = new TransformManyBlock<(int,int),WorkItem>(date => GetWorkItemWithInputPath(date),options);
var loadDataBlock = new TransformBlock<WorkItem,WorkItem>(workItem => LoadDataIntoWorkItem(workItem),options);
var processDataBlock = new TransformBlock<WorkItem,WorkItem>(workItem => ProcessDataForWorkItem(workItem),options);
var waitForProcessedDataBlock = new TransformManyBlock<WorkItem,List<WorkItem>>(workItem => WaitForWorkItems(workItem)); // Can't parallelize this block
var mergeDataBlock = new TransformBlock<List<WorkItem>,List<WorkItem>>(list => MergeWorkItemData(list),options);
var saveDataBlock = new ActionBlock<List<WorkItem>>(list => SaveWorkItemData(list),options);
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
getInputPathsBlock.LinkTo(loadDataBlock,linkOptions);
loadDataBlock.LinkTo(processDataBlock,linkOptions);
processDataBlock.LinkTo(waitForProcessedDataBlock,linkOptions);
waitForProcessedDataBlock.LinkTo(mergeDataBlock,linkOptions);
mergeDataBlock.LinkTo(saveDataBlock,linkOptions);
// We post individual tuples of (year,month) to our pipeline,as many as we want
getInputPathsBlock.Post((1903,2)); // Post one month and date
var dates = from y in Enumerable.Range(2015,5) from m in Enumerable.Range(1,12) select (y,m);
foreach (var date in dates) getInputPathsBlock.Post(date); // Post a big sequence
getInputPathsBlock.Complete();
saveDataBlock.Completion.Wait();
s.Stop();
Console.WriteLine($"Completed in {s.ElapsedMilliseconds}ms on {ThreadAndTime()}");
}
private IEnumerable<WorkItem> GetWorkItemWithInputPath((int year,int month) date)
{
List<WorkItem> processedWorkItems = new List<WorkItem>(); // Will store merged results
return GetInputPaths(date.year,date.month).Select(
path => new WorkItem
{
Year = date.year,Month = date.month,FilePath = path,ProcessedWorkItems = processedWorkItems
});
}
// Get filepaths of form e.g. Files/20191101.txt These aren't real files,they just show how it could work.
private IEnumerable<string> GetInputPaths(int year,int month) =>
Enumerable.Range(0,GetNumberOfFiles(year,month)).Select(i => $@"Files/{year}{Pad(month)}{Pad(i + 1)}.txt");
private int GetNumberOfFiles(int year,int month) => DateTime.DaysInMonth(year,month);
private WorkItem LoadDataIntoWorkItem(WorkItem workItem) {
workItem.RawData = LoadData(workItem.FilePath);
return workItem;
}
// Simulate loading by just concatenating to path: in real code this could open a real file and return the contents
private string LoadData(string path) => "This is content from file " + path;
private WorkItem ProcessDataForWorkItem(WorkItem workItem)
{
workItem.ProcessedData = ProcessData(workItem.RawData);
return workItem;
}
private string ProcessData(string contents)
{
Thread.SpinWait(11000000); // Use 11,000,000 for ~50ms on Windows .NET Framework. 1,100,000 on Windows .NET Core.
return $"Results of processing file with contents '{contents}' on {ThreadAndTime()}";
}
// Adds a processed WorkItem to its ProcessedWorkItems list. Then checks if the list has as many processed WorkItems as we
// expect to see overall. If so the list is returned to the next block,if not we return an empty array,which passes nothing on.
// This isn't threadsafe for the list,so has to be called with MaxDegreeOfParallelization = 1
private IEnumerable<List<WorkItem>> WaitForWorkItems(WorkItem workItem)
{
List<WorkItem> itemList = workItem.ProcessedWorkItems;
itemList.Add(workItem);
return itemList.Count == GetNumberOfFiles(workItem.Year,workItem.Month) ? new[] { itemList } : new List<WorkItem>[0];
}
private List<WorkItem> MergeWorkItemData(List<WorkItem> processedWorkItems)
{
string finalContents = "";
foreach (WorkItem workItem in processedWorkItems)
{
finalContents = MergeData(finalContents,workItem.ProcessedData);
}
// Should really create a new data structure and return that,but let's cheat a bit
processedWorkItems[0].MergedData = finalContents;
return processedWorkItems;
}
// Just concatenate the output strings,separated by newlines,to merge our data
private string MergeData(string output1,string output2) => output1 != "" ? output1 + "\n" + output2 : output2;
private void SaveWorkItemData(List<WorkItem> workItems)
{
WorkItem result = workItems[0];
SaveData(result.MergedData,result.Year,result.Month);
// Code to show it's worked...
Console.WriteLine($"Saved data block for {DateToString((result.Year,result.Month))} on {ThreadAndTime()}." +
$" File contents:\n{result.MergedData}\n");
}
private void SaveData(string finalContents,int year,int month)
{
// Actually save,although don't really need to in this test code
new DirectoryInfo("Results").Create();
File.WriteAllText(Path.Combine("Results",$"results{year}{Pad(month)}.txt"),finalContents);
}
// Helper methods
private string DateToString((int year,int month) date) => date.year + Pad(date.month);
private string Pad(int number) => number < 10 ? "0" + number : number.ToString();
private string ThreadAndTime() => $"thread {Pad(Thread.CurrentThread.ManagedThreadId)} at {DateTime.Now.ToString("hh:mm:ss.fff")}";
}
public class WorkItem
{
public int Year { get; set; }
public int Month { get; set; }
public string FilePath { get; set; }
public string RawData { get; set; }
public string ProcessedData { get; set; }
public List<WorkItem> ProcessedWorkItems { get; set; }
public string MergedData { get; set; }
}
}
此代码将WorkItem对象从每个块传递到下一个块,并在每个阶段进行充实。然后,在其中运行一个月的最终工作列表并保存结果之前,将创建一个包含所有工作项的最终列表。
此代码基于每个阶段使用您使用的名称的伪方法。这些并没有做太多,但希望能证明解决方案。例如,将LoadData传递给文件路径,并向其中添加一些文本并传递字符串,但是显然,如果磁盘上确实有文件,它可以加载真实文件并传递内容字符串。
类似地,为了模拟在ProcessData中的工作,我们执行Thread.SpinWait,然后再次向字符串中添加一些文本。这是延迟的来源,因此,如果希望运行速度更快或更慢,请更改数字。该代码是在.NET Framework上编写的,但是可以在Core 3.0以及Ubuntu和OSX上运行。唯一的区别是SpinWait周期可以长得多或短得多,因此您可能需要延迟一下。
请注意,我们可以合并到waitForProcessedDataBlock中,并且完全具有您要的管道。只会更加混乱
该代码确实会在磁盘上最后创建文件,但还会将结果转储到屏幕上,因此它实际上并不需要。
如果将并行化设置为1,您会发现并行化速度降低了大约预期的数量。我的Windows机器是四核的,比慢四倍的机器差一点。
本文链接:https://www.f2er.com/3158541.html