www.gusucode.com > bigdata 工具箱 matlab源码程序 > bigdata/+matlab/+bigdata/+internal/+executor/combineSimpleTasks.m

    function outTaskGraph = combineSimpleTasks(taskGraph)
%COMBINESIMPLETASKS Transforms a task graph by combining all non-caching simple tasks forward.
%
% Syntax:
%  taskGraph = combineSimpleTasks(taskGraph);
%
% This will combine all tasks that fit the following criteria into direct
% downstream tasks:
%  1. The task must have simple (non-communicating) output.
%  2. The task must not cache its output.
%  3. The output of the task must not be required for gather.
%  

%   Copyright 2016 The MathWorks, Inc.

import matlab.bigdata.internal.executor.CompositeDataProcessorBuilder;
import matlab.bigdata.internal.executor.ExecutionTask;
import matlab.bigdata.internal.executor.OutputCommunicationType;
import matlab.bigdata.internal.executor.SimpleTaskGraph;

% A map of task ID an instance of CompositeDataProcessorBuilder that
% generates the same output but that can be combined forward into
% downstream tasks. A task ID can map to empty, in which case it is not
% possible to combine
passForwardNodeMap = containers.Map('KeyType', 'char', 'ValueType', 'any');

% A map of all tasks already processed for the purpose of creating new
% ExecutionTask objects.
taskMap = containers.Map('KeyType', 'char', 'ValueType', 'any');

tasks = taskGraph.Tasks;
outputTasks = taskGraph.OutputTasks;
newTasks = ExecutionTask.empty;
for ii = 1:numel(tasks)
    task = tasks(ii);
    
    inputNodes = cell(size(task.InputIds));
    isPassForwardTrivial = true;
    for jj = 1:numel(task.InputIds)
        inputNodes{jj} = passForwardNodeMap(task.InputIds{jj});
        isPassForwardTrivial = isPassForwardTrivial && isempty(inputNodes{jj}.DataProcessorFactory);
    end
    inputNodes = vertcat(CompositeDataProcessorBuilder.empty, inputNodes{:});
    
    isOutputSimple = ~ismember(task, outputTasks) ...
        && ~task.IsPassBoundary ...
        && task.OutputCommunicationType == OutputCommunicationType.Simple ...
        && strcmp(task.CacheLevel, 'None');
    
    if isOutputSimple
        % This task can be combined forward into downstream tasks.
        passForwardNodeMap(task.Id) = CompositeDataProcessorBuilder(inputNodes, task.DataProcessorFactory);
    else
        % This task has non-simple output, this task must be visible to the
        % execution environment.
        passForwardNodeMap(task.Id) = CompositeDataProcessorBuilder([], task.Id);
        if ~isPassForwardTrivial
            requiresOutputPartitionIndices = task.OutputCommunicationType == OutputCommunicationType.AnyToAny;
            newProcessor = CompositeDataProcessorBuilder(inputNodes, task.DataProcessorFactory, requiresOutputPartitionIndices);
            
            newInputIds = newProcessor.AllInputIds;
            newInputTasks = cell(size(newInputIds));
            for jj = 1:numel(newInputIds)
                newInputTasks{jj} = taskMap(newInputIds{jj});
            end
            newInputTasks = vertcat(newInputTasks{:});
            
            newTask = task.copyWithReplacedInputs(newInputTasks, newProcessor);
            outputTasks(task == outputTasks) = newTask;
            
            task = newTask;
        end
        newTasks(end + 1, 1) = task; %#ok<AGROW>
    end
    
    taskMap(task.Id) = task;
end

outTaskGraph = SimpleTaskGraph(newTasks, outputTasks);