www.gusucode.com > bigdata 工具箱 matlab源码程序 > bigdata/+matlab/+bigdata/+internal/+executor/CompositeDataProcessor.m
%CompositeDataProcessor % An implementation of the DataProcessor interface that wraps around a % graph of DataProcessor instances. All DataProcessor instances must be % non-communicating except for last or most downstream DataProcessor. % Copyright 2015-2016 The MathWorks, Inc. classdef CompositeDataProcessor < matlab.bigdata.internal.executor.DataProcessor % Properties overridden in the DataProcessor interface. properties (SetAccess = private) IsFinished = false; IsMoreInputRequired; end properties (SetAccess = immutable) % A collection of CompositeDataProcessorNode instances. Each node % wraps a single DataProcessor. Nodes; end % Methods overridden in the DataProcessor interface. methods function [data, partitionIndices] = process(obj, isLastOfGlobalInput, varargin) isMoreGlobalInputRequired = false(size(obj.IsMoreInputRequired)); for ii = 1:numel(obj.Nodes) % This logic exists in order to ensure inputs arrive at % similar data rates. % % For a given iteration, we attempt to process only the nodes % whose output are needed to generate the next chunk of % output of the entire CompositeDataProcessor. We can only % skip a node if the upstream dependencies have not either % been evaluated or generated output. This is because there % is no persistence on output, it will be lost if it has % not been passed to the dependent processors. node = obj.Nodes(ii); if node.IsMoreOutputRequired ... || any(~cellfun(@isempty, {node.InputNodes.Output})) isMoreGlobalInputRequired = process(node, isLastOfGlobalInput, varargin, isMoreGlobalInputRequired); end end obj.IsFinished = [obj.Nodes(end).IsFinished]; obj.IsMoreInputRequired = isMoreGlobalInputRequired; data = obj.Nodes(end).Output; partitionIndices = obj.Nodes(end).PartitionIndices; markOutputAsConsumed(obj.Nodes(end), true); for ii = numel(obj.Nodes) - 1 : -1 : 1 markOutputAsConsumed(obj.Nodes(ii), false); end end end methods (Access = ?matlab.bigdata.internal.executor.CompositeDataProcessorBuilder) % Private constructor for the builder. function obj = CompositeDataProcessor(nodes, numGlobalInputs) obj.Nodes = nodes; obj.IsMoreInputRequired = true(1, numGlobalInputs); end end end