www.gusucode.com > bigdata 工具箱 matlab源码程序 > bigdata/+matlab/+bigdata/+internal/+executor/CompositeDataProcessorNode.m

    %CompositeDataProcessorNode
% A helper class to represent the graph of DataProcessor instances within a
% CompositeDataProcessor instance.
%

%   Copyright 2015-2016 The MathWorks, Inc.

classdef CompositeDataProcessorNode < handle
    properties (SetAccess = immutable)
        % An array of CompositeDataProcessorNode instances that represent
        % the direct inputs to this node.
        InputNodes;
        
        % An instance of DataProcessor or empty. If this is non-empty, the
        % node represents this DataProcessor instance.
        Processor;
        
        % A scalar positive integer or empty. If this is non-empty, the
        % node represents the CompositeDataProcessor or the corresponding
        % index.
        InputIndex;
        
        
        % A scalar logical that describes whether the processor should
        % return partition indices.
        RequireOutputPartitionIndices;
        
        % A scalar logical that describes whether the processor should be
        % passed the partition indices output of the previous input tasks.
        RequireInputPartitionIndices;
    end
    
    properties (SetAccess = private)
        % A cache of the output of this node.
        Output = [];
        
        % A cache of the partition indices that correspond with the output
        % of this node.
        PartitionIndices = [];
        
        % A cache of the output when it is empty. This is NaN until valid.
        EmptyChunk = NaN;
        
        % A scalar logical that is true if and only if this node will
        % generate no more output.
        IsFinished = false;
        
        % A scalar logical that is true if and only if more output from
        % this node is required before the CompositeDataProcessor can emit
        % any further final output.
        IsMoreOutputRequired = true;
    end
    
    methods (Access = ?matlab.bigdata.internal.executor.CompositeDataProcessorBuilder)
        function obj = CompositeDataProcessorNode(inputNodes, processor, ...
                requireOutputPartitionIndices, requireInputPartitionIndices)
            import matlab.bigdata.internal.executor.CompositeDataProcessorNode;
            if isempty(inputNodes)
                inputNodes = CompositeDataProcessorNode.empty(0,1);
            end
            obj.InputNodes = inputNodes;
            
            if isnumeric(processor)
                obj.InputIndex = processor;
            else
                obj.Processor = processor;
            end
            
            obj.RequireOutputPartitionIndices = requireOutputPartitionIndices;
            obj.RequireInputPartitionIndices = requireInputPartitionIndices;
        end
    end
    
    methods
        % Perform one iteration of the action of this node.
        function isMoreGlobalInputRequired = process(obj, isLastOfGlobalInput, globalInputs, isMoreGlobalInputRequired)
            if obj.IsFinished
                return;
            elseif ~isempty(obj.InputIndex)
                obj.IsFinished = isLastOfGlobalInput(obj.InputIndex);
                obj.Output = globalInputs{obj.InputIndex};
                if obj.IsMoreOutputRequired
                    isMoreGlobalInputRequired(obj.InputIndex) = true;
                end
                return;
            end
            
            isLastOfInput = [obj.InputNodes.IsFinished];
            
            if obj.RequireInputPartitionIndices
                inputs = {isLastOfInput, obj.InputNodes.Output, obj.InputNodes.PartitionIndices};
            else
                inputs = {isLastOfInput, obj.InputNodes.Output};
            end
            if obj.RequireOutputPartitionIndices
                [obj.Output, obj.PartitionIndices] = obj.Processor.process(inputs{:});
            else
                obj.Output = obj.Processor.process(inputs{:});
            end
            
            obj.IsFinished = obj.Processor.IsFinished;
            obj.IsMoreOutputRequired = false;
        end
        
        % Mark the output state of this object as consumed. This will also
        % update the IsMoreOutputRequired flags as requested.
        function markOutputAsConsumed(obj, isMoreOutputRequired)
            if ~isempty(obj.Output)
                % The EmptyChunk property is set to non-empty NaN until it
                % has been calculated.
                if isempty(obj.EmptyChunk)
                    obj.Output = obj.EmptyChunk;
                else
                    sz = size(obj.Output);
                    obj.Output = obj.Output([], :);
                    if numel(sz) > 2
                        obj.Output = reshape(obj.Output, [0, sz(2:end)]);
                    end
                    obj.EmptyChunk = obj.Output;
                end
                    
                obj.PartitionIndices = [];
            end
            
            obj.IsMoreOutputRequired = obj.IsMoreOutputRequired || isMoreOutputRequired;
            if ~isempty(obj.Processor) && obj.IsMoreOutputRequired
                inputNodes = obj.InputNodes;
                for ii = 1:numel(inputNodes)
                    inputNodes(ii).IsMoreOutputRequired = inputNodes(ii).IsMoreOutputRequired || obj.Processor.IsMoreInputRequired(ii);
                end
            end
        end
    end
end