www.gusucode.com > bigdata 工具箱 matlab源码程序 > bigdata/+matlab/+bigdata/+internal/+lazyeval/LazyTaskGraph.m

    %LazyTaskGraph
% Class that represents a graph of ExecutionTask that is equivalent to the
% graph of closures that is required to be executed.
%
% General model for tasks:
%
% Each closure will have one or more execution tasks that represents it.
%
% Every task in this graph will emit a N x NumOutputs cell array.
% * Each column corresponds with exactly one output of the operation.
% * Each cell contains one chunk of data of that output.
% Tasks are expected to extract out the correct columns from upstream tasks.
% * The input to a task will be one N x NumOutputs cell array from each
%   upstream task.
% * The task is responsible for extracting out the data that corresponds to
%   the input futures that the corresponding operation requires.
%

%   Copyright 2015-2016 The MathWorks, Inc.

classdef (Sealed) LazyTaskGraph < matlab.bigdata.internal.executor.TaskGraph
    % Overrides of TaskGraph properties.
    properties (SetAccess = private)
        Tasks;
        OutputTasks;
    end
    
    properties (SetAccess = immutable)
        % A map of Closure ID to the corresponding ExecutionTask instance.
        ClosureToTaskMap;
        
        % A map of Task ID to the corresponding Closure instance.
        TaskToClosureMap;
    end
    
    methods
        % The main constructor.
        %
        % Syntax:
        %   obj = LazyTaskGraph(closures);
        %
        % Inputs:
        %  - closures is a list of Closure instances that represent the
        %  desired final results to be gathered.
        function obj = LazyTaskGraph(closures)
            import matlab.bigdata.internal.executor.ExecutionTask;
            obj.Tasks = ExecutionTask.empty();
            obj.ClosureToTaskMap = containers.Map('KeyType', 'char', 'ValueType', 'any');
            obj.TaskToClosureMap = containers.Map('KeyType', 'char', 'ValueType', 'any');
            
            for ii = 1:numel(closures)
                closure = closures(ii);
                task = obj.doAddClosure(closure);
                if isempty(obj.OutputTasks) || obj.OutputTasks(end) ~= task
                    obj.OutputTasks = [obj.OutputTasks; task];
                end
            end
        end
    end
    
    methods (Access = private)
        % Implementation of addClosure that returns the task corresponding
        % to the given closure.
        function task = doAddClosure(obj, closure)
            import matlab.bigdata.internal.executor.OutputCommunicationType;
            import matlab.bigdata.internal.lazyeval.InputFutureMap;
            if isKey(obj.ClosureToTaskMap, closure.Id)
                task = obj.ClosureToTaskMap(closure.Id);
                return;
            end
            
            % We make the inputs to the Execution task to be exactly the list of
            % direct closure dependencies on the current Closure. The list of
            % direct dependencies is equivalent to unique on all of
            % closure.InputFutures.Promise.Closure.
            dependencies = closure.getDirectDependencies();
            
            taskDependencies = cell(numel(dependencies) + 1, 1);
            for ii = 1:numel(dependencies)
                taskDependencies{ii} = obj.doAddClosure(dependencies(ii));
            end
            
            [inputFutureMap, additionalConstants] = InputFutureMap.createFromClosures(closure.InputFutures, dependencies);
            if ~isempty(additionalConstants)
                taskDependencies{end} = obj.createConstantTask(additionalConstants);
            end
            taskDependencies = vertcat(taskDependencies{:});

            isInputReplicated = inputFutureMap.mapScalars(arrayfun(@(x)x.OutputPartitionStrategy.IsDataReplicated, taskDependencies));
            
            allTasks = closure.Operation.createExecutionTasks(taskDependencies, inputFutureMap, isInputReplicated);
            
            task = allTasks(end);
            obj.Tasks = [obj.Tasks; allTasks(:)];
            obj.ClosureToTaskMap(closure.Id) = task;
            obj.TaskToClosureMap(task.Id) = closure;
            if task.OutputPartitionStrategy.IsBroadcast
                % We do this in order to complete any closures that so
                % happen to have a small output.
                obj.OutputTasks = [obj.OutputTasks; task];
            end
        end
        
        % Create an ExecutionTask that effectively broadcasts the provided
        % constants.
        function task = createConstantTask(obj, constants)
            import matlab.bigdata.internal.executor.ExecutionTask;
            import matlab.bigdata.internal.executor.BroadcastPartitionStrategy;
            import matlab.bigdata.internal.executor.ConstantProcessor;
            processorFactory = ConstantProcessor.createFactory(constants);
            task = ExecutionTask.createBroadcastTask([], processorFactory,...
                'ExecutionPartitionStrategy', BroadcastPartitionStrategy());
            obj.Tasks = [obj.Tasks; task];
        end
    end
end