TPL:如何拆分和合并数据流?

我正在尝试使用tpl创建具有以下格式的数据流:

                    -> LoadDataBlock1 -> ProcessDataBlock1 ->  
GetInputPathsBlock  -> LoadDataBlock2 -> ProcessDataBlock2 -> MergeDataBlock -> SaveDataBlock
                    -> LoadDataBlock3 -> ProcessDataBlock3 ->
                    ...                             
                    -> LoadDataBlockN -> ProcessDataBlockN ->

这个想法是,GetInputPathsBlock是一个块,它找到要加载的输入数据的路径,然后将该路径发送到每个LoadDataBlock。 LoadDataBlocks都是相同的(除了它们各自从GetInputPaths中接收到唯一的inputPath字符串)。然后将加载的数据发送到ProcessDataBlock,后者进行一些简单的处理。然后,来自每个ProcessDataBlock的数据将发送到MergeDataBlock,然后将其合并并发送到SaveDataBlock,然后将其保存到文件中。

将其视为需要每月运行的数据流。首先,找到每天数据的路径。每天的数据都被加载和处理,然后在整个月中合并在一起并保存。每个月可以并行运行,一个月中每一天的数据可以并行加载和并行处理(在加载单日数据之后),并且当该月的所有内容加载并处理后,就可以合并并保存

我尝试过的事情

据我所知TransformManyBlock<TInput,string>可用于进行拆分(GetInputPathsBlock),并可链接到常规TransformBlock<string,InputData>LoadDataBlock),并且从那里到另一个TransformBlock<InputData,ProcessedData>ProcessDataBlock),但我不知道如何将其合并回单个块。

我的目光

我发现this answer,它使用TransformManyBlockIEnumerable<item>item,但是我不完全理解,因此我无法链接TransformBlock<InputData,ProcessedData>ProcessDataBlock)到 TransformBlock<IEnumerable<ProcessedData>>,ProcessedData>,所以我不知道如何使用它。

我也看到了答案like this,建议使用JoinBlock,但是输入文件的数量N有所不同,并且文件的加载方式仍然相同。

还有this answer,它似乎可以满足我的要求,但我并不完全理解它,也不知道如何将字典中的设置转移到我的案子中。

>

如何拆分和合并数据流?

  • 我是否缺少块类型
  • 我能以某种方式两次使用TransformManyBlock吗?
  • tpl对拆分/合并有意义吗,还是有一种更简单的异步/等待方式?
zhangmhua87 回答:TPL:如何拆分和合并数据流?

我将使用嵌套块来避免拆分每月数据,然后不得不再次合并它们。这是两个嵌套的TransformBlock的示例,它们处理2020年的所有天:

var monthlyBlock = new TransformBlock<int,List<string>>(async (month) =>
{
    var dailyBlock = new TransformBlock<int,string>(async (day) =>
    {
        await Task.Delay(100); // Simulate async work
        return day.ToString();
    },new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 4 });

    foreach (var day in Enumerable.Range(1,DateTime.DaysInMonth(2020,month)))
        await dailyBlock.SendAsync(day);
    dailyBlock.Complete();

    var dailyResults = await dailyBlock.ToListAsync();
    return dailyResults;
},new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 1 });

foreach (var month in Enumerable.Range(1,12))
    await monthlyBlock.SendAsync(month);
monthlyBlock.Complete();

为了收集内部区块的日常结果,我使用了扩展方法ToListAsync,如下所示:

public static async Task<List<T>> ToListAsync<T>(this IReceivableSourceBlock<T> block,CancellationToken cancellationToken = default)
{
    var list = new List<T>();
    while (await block.OutputAvailableAsync(cancellationToken).ConfigureAwait(false))
    {
        while (block.TryReceive(out var item))
        {
            list.Add(item);
        }
    }
    return list;
}
,

您的问题的答案是:不,您不需要其他块类型,是的,您可以使用TransformManyBlock两次,是的,这确实有意义。我在底部编写了一些代码来证明这一点,其后是一些注释。

如您所描述的,代码使用拆分然后合并管道。至于您正在苦苦挣扎的一点:将单个文件的数据合并回一起可以通过将处理后的项目添加到列表中来完成。然后,如果列表具有预期的最终项目数,则仅将列表传递到下一个块。这可以通过一个相当简单的TransformMany块返回零或一项来完成。由于列表不是线程安全的,因此无法并行处理此块。

一旦有了这样的管道,就可以使用传递给块的选项来测试并行化和排序。下面的代码将并行化设置为可以实现的每个块无限,并让DataFlow代码对其进行整理。在我的机器上,它最大化了所有内核/逻辑处理器,并且受CPU限制,这就是我们想要的。启用了排序功能,但将其关闭并没有多大区别:同样,我们受CPU限制。

最后,我不得不说这是一项非常酷的技术,但是实际上您可以使用PLINQ来解决此问题,它只需几行代码即可快速获得所需的信息。最大的缺点是,如果这样做,您将无法轻松地将快速到达的消息添加到管道中:PLINQ更适合于一个大批处理过程。但是,PLINQ可能是针对您的用例的更好解决方案。

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks.Dataflow;

namespace ParallelDataFlow
{
    class Program
    {
        static void Main(string[] args)
        {
            new Program().Run();
            Console.ReadLine();
        }

        private void Run()
        {
            Stopwatch s = new Stopwatch();
            s.Start();

            // Can  experiment with parallelization of blocks by changing MaxDegreeOfParallelism
            var options = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded };
            var getInputPathsBlock = new TransformManyBlock<(int,int),WorkItem>(date => GetWorkItemWithInputPath(date),options);
            var loadDataBlock = new TransformBlock<WorkItem,WorkItem>(workItem => LoadDataIntoWorkItem(workItem),options);
            var processDataBlock = new TransformBlock<WorkItem,WorkItem>(workItem => ProcessDataForWorkItem(workItem),options);
            var waitForProcessedDataBlock = new TransformManyBlock<WorkItem,List<WorkItem>>(workItem => WaitForWorkItems(workItem));  // Can't parallelize this block
            var mergeDataBlock = new TransformBlock<List<WorkItem>,List<WorkItem>>(list => MergeWorkItemData(list),options);
            var saveDataBlock = new ActionBlock<List<WorkItem>>(list => SaveWorkItemData(list),options);

            var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
            getInputPathsBlock.LinkTo(loadDataBlock,linkOptions);
            loadDataBlock.LinkTo(processDataBlock,linkOptions);
            processDataBlock.LinkTo(waitForProcessedDataBlock,linkOptions);
            waitForProcessedDataBlock.LinkTo(mergeDataBlock,linkOptions);
            mergeDataBlock.LinkTo(saveDataBlock,linkOptions);

            // We post individual tuples of (year,month) to our pipeline,as many as we want
            getInputPathsBlock.Post((1903,2));  // Post one month and date
            var dates = from y in Enumerable.Range(2015,5) from m in Enumerable.Range(1,12) select (y,m);
            foreach (var date in dates) getInputPathsBlock.Post(date);  // Post a big sequence         

            getInputPathsBlock.Complete();
            saveDataBlock.Completion.Wait();
            s.Stop();
            Console.WriteLine($"Completed in {s.ElapsedMilliseconds}ms on {ThreadAndTime()}");
        }

        private IEnumerable<WorkItem> GetWorkItemWithInputPath((int year,int month) date)
        {
            List<WorkItem> processedWorkItems = new List<WorkItem>();  // Will store merged results
            return GetInputPaths(date.year,date.month).Select(
                path => new WorkItem
                {
                    Year = date.year,Month = date.month,FilePath = path,ProcessedWorkItems = processedWorkItems
                });
        }

        // Get filepaths of form e.g. Files/20191101.txt  These aren't real files,they just show how it could work.
        private IEnumerable<string> GetInputPaths(int year,int month) =>
            Enumerable.Range(0,GetNumberOfFiles(year,month)).Select(i => $@"Files/{year}{Pad(month)}{Pad(i + 1)}.txt");

        private int GetNumberOfFiles(int year,int month) => DateTime.DaysInMonth(year,month);

        private WorkItem LoadDataIntoWorkItem(WorkItem workItem) {
            workItem.RawData = LoadData(workItem.FilePath);
            return workItem;
        }

        // Simulate loading by just concatenating to path: in real code this could open a real file and return the contents
        private string LoadData(string path) => "This is content from file " + path;

        private WorkItem ProcessDataForWorkItem(WorkItem workItem)
        {
            workItem.ProcessedData = ProcessData(workItem.RawData);
            return workItem;
        }

        private string ProcessData(string contents)
        {
            Thread.SpinWait(11000000); // Use 11,000,000 for ~50ms on Windows .NET Framework.  1,100,000 on Windows .NET Core.
            return $"Results of processing file with contents '{contents}' on {ThreadAndTime()}";
        }

        // Adds a processed WorkItem to its ProcessedWorkItems list.  Then checks if the list has as many processed WorkItems as we 
        // expect to see overall.  If so the list is returned to the next block,if not we return an empty array,which passes nothing on.
        // This isn't threadsafe for the list,so has to be called with MaxDegreeOfParallelization = 1
        private IEnumerable<List<WorkItem>> WaitForWorkItems(WorkItem workItem)
        {
            List<WorkItem> itemList = workItem.ProcessedWorkItems;
            itemList.Add(workItem);
            return itemList.Count == GetNumberOfFiles(workItem.Year,workItem.Month) ? new[] { itemList } : new List<WorkItem>[0];
        }

        private List<WorkItem> MergeWorkItemData(List<WorkItem> processedWorkItems)
        {
            string finalContents = "";
            foreach (WorkItem workItem in processedWorkItems)
            {
                finalContents = MergeData(finalContents,workItem.ProcessedData);
            }
            // Should really create a new data structure and return that,but let's cheat a bit
            processedWorkItems[0].MergedData = finalContents;
            return processedWorkItems;
        }

        // Just concatenate the output strings,separated by newlines,to merge our data
        private string MergeData(string output1,string output2) => output1 != "" ? output1 + "\n" + output2 : output2;

        private void SaveWorkItemData(List<WorkItem> workItems)
        {
            WorkItem result = workItems[0];
            SaveData(result.MergedData,result.Year,result.Month);
            // Code to show it's worked...
            Console.WriteLine($"Saved data block for {DateToString((result.Year,result.Month))} on {ThreadAndTime()}." +
                              $"  File contents:\n{result.MergedData}\n");
        }
        private void SaveData(string finalContents,int year,int month)
        {
            // Actually save,although don't really need to in this test code
            new DirectoryInfo("Results").Create();
            File.WriteAllText(Path.Combine("Results",$"results{year}{Pad(month)}.txt"),finalContents);
        }

        // Helper methods
        private string DateToString((int year,int month) date) => date.year + Pad(date.month);
        private string Pad(int number) => number < 10 ? "0" + number : number.ToString();
        private string ThreadAndTime() => $"thread {Pad(Thread.CurrentThread.ManagedThreadId)} at {DateTime.Now.ToString("hh:mm:ss.fff")}";
    }

    public class WorkItem
    {
        public int Year { get; set; }
        public int Month { get; set; }
        public string FilePath { get; set; }
        public string RawData { get; set; }
        public string ProcessedData { get; set; }
        public List<WorkItem> ProcessedWorkItems { get; set; }
        public string MergedData { get; set; }
    }
}

此代码将WorkItem对象从每个块传递到下一个块,并在每个阶段进行充实。然后,在其中运行一个月的最终工作列表并保存结果之前,将创建一个包含所有工作项的最终列表。

此代码基于每个阶段使用您使用的名称的伪方法。这些并没有做太多,但希望能证明解决方案。例如,将LoadData传递给文件路径,并向其中添加一些文本并传递字符串,但是显然,如果磁盘上确实有文件,它可以加载真实文件并传递内容字符串。

类似地,为了模拟在ProcessData中的工作,我们执行Thread.SpinWait,然后再次向字符串中添加一些文本。这是延迟的来源,因此,如果希望运行速度更快或更慢,请更改数字。该代码是在.NET Framework上编写的,但是可以在Core 3.0以及Ubuntu和OSX上运行。唯一的区别是SpinWait周期可以长得多或短得多,因此您可能需要延迟一下。

请注意,我们可以合并到waitForProcessedDataBlock中,并且完全具有您要的管道。只会更加混乱

该代码确实会在磁盘上最后创建文件,但还会将结果转储到屏幕上,因此它实际上并不需要。

如果将并行化设置为1,您会发现并行化速度降低了大约预期的数量。我的Windows机器是四核的,比慢四倍的机器差一点。

本文链接:https://www.f2er.com/3158541.html

大家都在问