www.gusucode.com > bigdata 工具箱 matlab源码程序 > bigdata/+matlab/+bigdata/+internal/+executor/CompositeDataProcessorNode.m
%CompositeDataProcessorNode % A helper class to represent the graph of DataProcessor instances within a % CompositeDataProcessor instance. % % Copyright 2015-2016 The MathWorks, Inc. classdef CompositeDataProcessorNode < handle properties (SetAccess = immutable) % An array of CompositeDataProcessorNode instances that represent % the direct inputs to this node. InputNodes; % An instance of DataProcessor or empty. If this is non-empty, the % node represents this DataProcessor instance. Processor; % A scalar positive integer or empty. If this is non-empty, the % node represents the CompositeDataProcessor or the corresponding % index. InputIndex; % A scalar logical that describes whether the processor should % return partition indices. RequireOutputPartitionIndices; % A scalar logical that describes whether the processor should be % passed the partition indices output of the previous input tasks. RequireInputPartitionIndices; end properties (SetAccess = private) % A cache of the output of this node. Output = []; % A cache of the partition indices that correspond with the output % of this node. PartitionIndices = []; % A cache of the output when it is empty. This is NaN until valid. EmptyChunk = NaN; % A scalar logical that is true if and only if this node will % generate no more output. IsFinished = false; % A scalar logical that is true if and only if more output from % this node is required before the CompositeDataProcessor can emit % any further final output. IsMoreOutputRequired = true; end methods (Access = ?matlab.bigdata.internal.executor.CompositeDataProcessorBuilder) function obj = CompositeDataProcessorNode(inputNodes, processor, ... requireOutputPartitionIndices, requireInputPartitionIndices) import matlab.bigdata.internal.executor.CompositeDataProcessorNode; if isempty(inputNodes) inputNodes = CompositeDataProcessorNode.empty(0,1); end obj.InputNodes = inputNodes; if isnumeric(processor) obj.InputIndex = processor; else obj.Processor = processor; end obj.RequireOutputPartitionIndices = requireOutputPartitionIndices; obj.RequireInputPartitionIndices = requireInputPartitionIndices; end end methods % Perform one iteration of the action of this node. function isMoreGlobalInputRequired = process(obj, isLastOfGlobalInput, globalInputs, isMoreGlobalInputRequired) if obj.IsFinished return; elseif ~isempty(obj.InputIndex) obj.IsFinished = isLastOfGlobalInput(obj.InputIndex); obj.Output = globalInputs{obj.InputIndex}; if obj.IsMoreOutputRequired isMoreGlobalInputRequired(obj.InputIndex) = true; end return; end isLastOfInput = [obj.InputNodes.IsFinished]; if obj.RequireInputPartitionIndices inputs = {isLastOfInput, obj.InputNodes.Output, obj.InputNodes.PartitionIndices}; else inputs = {isLastOfInput, obj.InputNodes.Output}; end if obj.RequireOutputPartitionIndices [obj.Output, obj.PartitionIndices] = obj.Processor.process(inputs{:}); else obj.Output = obj.Processor.process(inputs{:}); end obj.IsFinished = obj.Processor.IsFinished; obj.IsMoreOutputRequired = false; end % Mark the output state of this object as consumed. This will also % update the IsMoreOutputRequired flags as requested. function markOutputAsConsumed(obj, isMoreOutputRequired) if ~isempty(obj.Output) % The EmptyChunk property is set to non-empty NaN until it % has been calculated. if isempty(obj.EmptyChunk) obj.Output = obj.EmptyChunk; else sz = size(obj.Output); obj.Output = obj.Output([], :); if numel(sz) > 2 obj.Output = reshape(obj.Output, [0, sz(2:end)]); end obj.EmptyChunk = obj.Output; end obj.PartitionIndices = []; end obj.IsMoreOutputRequired = obj.IsMoreOutputRequired || isMoreOutputRequired; if ~isempty(obj.Processor) && obj.IsMoreOutputRequired inputNodes = obj.InputNodes; for ii = 1:numel(inputNodes) inputNodes(ii).IsMoreOutputRequired = inputNodes(ii).IsMoreOutputRequired || obj.Processor.IsMoreInputRequired(ii); end end end end end