www.gusucode.com > bigdata 工具箱 matlab源码程序 > bigdata/+matlab/+bigdata/+internal/+executor/CompositeDataProcessorBuilder.m
%CompositeDataProcessorBuilder % A helper class that builds a graph of DataProcessor instances and wraps % them in a CompositeDataProcessor. % % To construct a CompositeDataProcessor, the caller must build up a graph % of CompositeDataProcessorBuilder instances. Then, the caller must call % feval on the final or most downstream builder, which will return a % CompositeDataProcessor representing the graph of everything upstream. % % Copyright 2015-2016 The MathWorks, Inc. classdef CompositeDataProcessorBuilder < handle properties (SetAccess = immutable) % A unique ID for this builder. Id; % An array of CompositeDataProcessorBuilder instances that % represent the direct inputs to the DataProcessorFactory held by % this instance. InputBuilders; % A factory that will construct a data processor or empty. If % non-empty, this will be used to construct one of the data % processor instances inside the CompositeDataProcessor built by % this class. DataProcessorFactory; % Either a scalar positive integer, a string ID or empty. If non-empty, % this will be used to construct a CompositeDataProcessorNode that % outputs one of the global inputs of the CompositeDataProcessor. % If numeric, this ID represents the global input index. If a % string, the corresponding global input index is the index of this % string in the AllInputIds property. InputId; % A scalar logical that describes whether the processor should % return partition indices. RequireOutputPartitionIndices; % A scalar logical that describes whether the processor should be % passed the partition indices output of the previous input tasks. RequireInputPartitionIndices; NumOutputPartitions = []; end properties (Dependent) % An ordered array of all InputId values that will represent the % list of dependency inputs to the constructed CompositeDataProcessor. AllInputIds; end properties (Access = private, Constant) % The means by which this class receives unique IDs. IdFactory = matlab.bigdata.internal.util.UniqueIdFactory('CompositeDataProcessorBuilder'); end methods % Construct a CompositeDataProcessorBuilder that represents the % graph of all the inputBuilders in combination with % dataProcessorFactory. The elements of inputBuilder correspond % with the inputs of dataProcessorFactory. function obj = CompositeDataProcessorBuilder(inputBuilders, dataProcessorFactory, ... requireOutputPartitionIndices, requireInputPartitionIndices, numOutputPartitions) import matlab.bigdata.internal.executor.CompositeDataProcessorBuilder; if isempty(inputBuilders) inputBuilders = CompositeDataProcessorBuilder.empty(); end obj.InputBuilders = inputBuilders; if isnumeric(dataProcessorFactory) || ischar(dataProcessorFactory) obj.InputId = dataProcessorFactory; else obj.DataProcessorFactory = dataProcessorFactory; end if nargin < 3 requireOutputPartitionIndices = false; end obj.RequireOutputPartitionIndices = requireOutputPartitionIndices; if nargin < 4 requireInputPartitionIndices = false; end obj.RequireInputPartitionIndices = requireInputPartitionIndices; if nargin < 5 numOutputPartitions = []; end obj.NumOutputPartitions = numOutputPartitions; obj.Id = obj.IdFactory.nextId(); end % Build the graph of DataProcessors and wrap them in an enclosing % CompositeDataProcessor. function processor = feval(obj, partition) import matlab.bigdata.internal.executor.CompositeDataProcessor; builtNodeMap = containers.Map('KeyType', 'char', 'ValueType', 'any'); builtNodes = cell(size(obj)); for ii = 1:numel(obj) [builtNodes{ii}, globalInputIndices] = obj(ii).recursivelyBuildNodes(partition, builtNodeMap, []); end if isnumeric(globalInputIndices) numGlobalInputs = max([0, globalInputIndices]); else numGlobalInputs = numel(globalInputIndices); end processor = CompositeDataProcessor(vertcat(builtNodes{:}), numGlobalInputs); end function globalInputIndices = get.AllInputIds(obj) builtNodeMap = containers.Map('KeyType', 'char', 'ValueType', 'any'); globalInputIndices = obj.recursivelyGetAllInputIndices(builtNodeMap, []); end end methods (Access = private) % Recursively build each DataProcessor in the graph. function [builtNodes, allInputIds] = recursivelyBuildNodes(obj, partition, builtNodeMap, allInputIds) import matlab.bigdata.internal.executor.CompositeDataProcessorNode; if isKey(builtNodeMap, obj.Id) builtNodes = []; return; end builtNodes = cell(numel(obj.InputBuilders), 1); inputNodes = cell(numel(obj.InputBuilders), 1); for ii = 1:numel(obj.InputBuilders) [builtNodes{ii}, allInputIds] = recursivelyBuildNodes(obj.InputBuilders(ii), partition, builtNodeMap, allInputIds); inputNodes{ii} = builtNodeMap(obj.InputBuilders(ii).Id); end if isempty(obj.DataProcessorFactory) [allInputIds, processor] = iAddInputId(obj.InputId, allInputIds); elseif isempty(obj.NumOutputPartitions) processor = feval(obj.DataProcessorFactory, partition); else processor = feval(obj.DataProcessorFactory, partition, obj.NumOutputPartitions); end thisNode = CompositeDataProcessorNode([inputNodes{:}], processor, ... obj.RequireOutputPartitionIndices, obj.RequireInputPartitionIndices); builtNodeMap(obj.Id) = thisNode; %#ok<NASGU> builtNodes = vertcat(builtNodes{:}, thisNode); end % Recursively build a list of all unique InputId in a stable way. function allInputIds = recursivelyGetAllInputIndices(obj, builtNodeMap, allInputIds) if isKey(builtNodeMap, obj.Id) return; end builtNodeMap(obj.Id) = 1; for ii = 1:numel(obj.InputBuilders) allInputIds = recursivelyGetAllInputIndices(obj.InputBuilders(ii), builtNodeMap, allInputIds); end if isempty(obj.DataProcessorFactory) allInputIds = iAddInputId(obj.InputId, allInputIds); end end end end % Add an input ID to the list of all input IDs and return the index into % all input IDs. function [allInputIds, inputIndex] = iAddInputId(inputId, allInputIds) if isnumeric(inputId) if inputId > numel(allInputIds) allInputIds = 1:inputId; end inputIndex = inputId; else assert (ischar(inputId)); allInputIds = union(allInputIds, {inputId}, 'stable'); [~, inputIndex] = ismember({inputId}, allInputIds); end end