www.gusucode.com > bigdata 工具箱 matlab源码程序 > bigdata/+matlab/+bigdata/+internal/+executor/CompositeDataProcessorBuilder.m

    %CompositeDataProcessorBuilder
% A helper class that builds a graph of DataProcessor instances and wraps
% them in a CompositeDataProcessor.
%
% To construct a CompositeDataProcessor, the caller must build up a graph
% of CompositeDataProcessorBuilder instances. Then, the caller must call
% feval on the final or most downstream builder, which will return a
% CompositeDataProcessor representing the graph of everything upstream.
%

%   Copyright 2015-2016 The MathWorks, Inc.

classdef CompositeDataProcessorBuilder < handle
    properties (SetAccess = immutable)
        % A unique ID for this builder.
        Id;
        
        % An array of CompositeDataProcessorBuilder instances that
        % represent the direct inputs to the DataProcessorFactory held by
        % this instance.
        InputBuilders;
        
        % A factory that will construct a data processor or empty. If
        % non-empty, this will be used to construct one of the data
        % processor instances inside the CompositeDataProcessor built by
        % this class.
        DataProcessorFactory;
        
        % Either a scalar positive integer, a string ID or empty. If non-empty,
        % this will be used to construct a CompositeDataProcessorNode that
        % outputs one of the global inputs of the CompositeDataProcessor.
        % If numeric, this ID represents the global input index. If a
        % string, the corresponding global input index is the index of this
        % string in the AllInputIds property.
        InputId;
        
        % A scalar logical that describes whether the processor should
        % return partition indices.
        RequireOutputPartitionIndices;
        
        % A scalar logical that describes whether the processor should be
        % passed the partition indices output of the previous input tasks.
        RequireInputPartitionIndices;
        
        NumOutputPartitions = [];
    end
    
    properties (Dependent)
        % An ordered array of all InputId values that will represent the
        % list of dependency inputs to the constructed CompositeDataProcessor.
        AllInputIds;
    end
    
    properties (Access = private, Constant)
        % The means by which this class receives unique IDs.
        IdFactory = matlab.bigdata.internal.util.UniqueIdFactory('CompositeDataProcessorBuilder');
    end
    
    methods
        % Construct a CompositeDataProcessorBuilder that represents the
        % graph of all the inputBuilders in combination with
        % dataProcessorFactory. The elements of inputBuilder correspond
        % with the inputs of dataProcessorFactory.
        function obj = CompositeDataProcessorBuilder(inputBuilders, dataProcessorFactory, ...
                requireOutputPartitionIndices, requireInputPartitionIndices, numOutputPartitions)
            import matlab.bigdata.internal.executor.CompositeDataProcessorBuilder;
            if isempty(inputBuilders)
                inputBuilders = CompositeDataProcessorBuilder.empty();
            end
            obj.InputBuilders = inputBuilders;
            
            if isnumeric(dataProcessorFactory) || ischar(dataProcessorFactory)
                obj.InputId = dataProcessorFactory;
            else
                obj.DataProcessorFactory = dataProcessorFactory;
            end
            if nargin < 3
                requireOutputPartitionIndices = false;
            end
            obj.RequireOutputPartitionIndices = requireOutputPartitionIndices;
            if nargin < 4
                requireInputPartitionIndices = false;
            end
            obj.RequireInputPartitionIndices = requireInputPartitionIndices;
            if nargin < 5
                numOutputPartitions = [];
            end
            obj.NumOutputPartitions = numOutputPartitions;
            
            obj.Id = obj.IdFactory.nextId();
        end
        
        % Build the graph of DataProcessors and wrap them in an enclosing
        % CompositeDataProcessor.
        function processor = feval(obj, partition)
            import matlab.bigdata.internal.executor.CompositeDataProcessor;
            
            builtNodeMap = containers.Map('KeyType', 'char', 'ValueType', 'any');
            builtNodes = cell(size(obj));
            for ii = 1:numel(obj)
                [builtNodes{ii}, globalInputIndices] = obj(ii).recursivelyBuildNodes(partition, builtNodeMap, []);
            end
            
            if isnumeric(globalInputIndices)
                numGlobalInputs = max([0, globalInputIndices]);
            else
                numGlobalInputs = numel(globalInputIndices);
            end
            
            processor = CompositeDataProcessor(vertcat(builtNodes{:}), numGlobalInputs);
        end
        
        function globalInputIndices = get.AllInputIds(obj)
            builtNodeMap = containers.Map('KeyType', 'char', 'ValueType', 'any');
            globalInputIndices = obj.recursivelyGetAllInputIndices(builtNodeMap, []);
        end
    end
    
    methods (Access = private)
        % Recursively build each DataProcessor in the graph.
        function [builtNodes, allInputIds] = recursivelyBuildNodes(obj, partition, builtNodeMap, allInputIds)
            import matlab.bigdata.internal.executor.CompositeDataProcessorNode;
            if isKey(builtNodeMap, obj.Id)
                builtNodes = [];
                return;
            end
            
            builtNodes = cell(numel(obj.InputBuilders), 1);
            inputNodes = cell(numel(obj.InputBuilders), 1);
            for ii = 1:numel(obj.InputBuilders)
                [builtNodes{ii}, allInputIds] = recursivelyBuildNodes(obj.InputBuilders(ii), partition, builtNodeMap, allInputIds);
                inputNodes{ii} = builtNodeMap(obj.InputBuilders(ii).Id);
            end
            
            if isempty(obj.DataProcessorFactory)
                [allInputIds, processor] = iAddInputId(obj.InputId, allInputIds);
            elseif isempty(obj.NumOutputPartitions)
                processor = feval(obj.DataProcessorFactory, partition);
            else
                processor = feval(obj.DataProcessorFactory, partition, obj.NumOutputPartitions);
            end
            thisNode = CompositeDataProcessorNode([inputNodes{:}], processor, ...
                obj.RequireOutputPartitionIndices, obj.RequireInputPartitionIndices);
            
            builtNodeMap(obj.Id) = thisNode; %#ok<NASGU>
            builtNodes = vertcat(builtNodes{:}, thisNode);
        end
        
        % Recursively build a list of all unique InputId in a stable way.
        function allInputIds = recursivelyGetAllInputIndices(obj, builtNodeMap, allInputIds)
            if isKey(builtNodeMap, obj.Id)
                return;
            end
            builtNodeMap(obj.Id) = 1;
            
            for ii = 1:numel(obj.InputBuilders)
                allInputIds = recursivelyGetAllInputIndices(obj.InputBuilders(ii), builtNodeMap, allInputIds);
            end
            
            if isempty(obj.DataProcessorFactory)
                allInputIds = iAddInputId(obj.InputId, allInputIds);
            end
        end
    end
end

% Add an input ID to the list of all input IDs and return the index into
% all input IDs.
function [allInputIds, inputIndex] = iAddInputId(inputId, allInputIds)
if isnumeric(inputId)
    if inputId > numel(allInputIds)
        allInputIds = 1:inputId;
    end
    inputIndex = inputId;
else
    assert (ischar(inputId));
    allInputIds = union(allInputIds, {inputId}, 'stable');
    [~, inputIndex] = ismember({inputId}, allInputIds);
end
end