www.gusucode.com > bigdata 工具箱 matlab源码程序 > bigdata/+matlab/+bigdata/+internal/+lazyeval/AbstractChunkwiseProcessor.m
%AbstractChunkwiseProcessor % Abstract base class for all chunk-wise operations that contain the common % chunk-wise processing logic. % % Copyright 2015-2016 The MathWorks, Inc. classdef (Abstract) AbstractChunkwiseProcessor < matlab.bigdata.internal.executor.DataProcessor % Properties overridden in the DataProcessor interface. properties (SetAccess = private) IsFinished = false; IsMoreInputRequired; end properties (SetAccess = immutable) % The number of outputs from the function handle. NumOutputs; % An object that represents how to convert from dependency input to % function handle input. InputFutureMap; % An input buffer that will deal with the fact that inputs are % multiplexed. % % This can be empty in cases where no buffering is required. This % will be the case for anything that can be treated as a chunkwise % operation that only has a single (or already zipped) input. InputBuffer; % The maximum number of input parameter slices to pass to the % function handle in any one call. MaxNumSlices = Inf; % A logical scalar that specifies if this processor should allow % singleton expansion in the tall dimension. AllowTallDimExpansion = true; end properties (SetAccess = private) % A logical scalar that is set to true once this object is % initialized and has begun processing data. IsInitialized = false; end % Methods overridden in the DataProcessor interface. methods function data = process(obj, isLastOfDependencies, varargin) if obj.IsFinished data = cell(0, obj.NumOutputs); return; end isLastOfInputsVector = obj.InputFutureMap.mapScalars(isLastOfDependencies); functionInputs = obj.InputFutureMap.mapData(varargin); inputBuffer = obj.InputBuffer; % The buffer property is set to empty if no buffering is % required for zipping or for chunk size. As such, we can just % pass the data directly to the underlying operation, which is % faster. if isempty(inputBuffer) if all(cellfun(@isempty, varargin)) data = cell(0, obj.NumOutputs); obj.IsFinished = all(isLastOfDependencies); else isLastofAllInputs = all(isLastOfInputsVector); functionInputs = cellfun(@matlab.bigdata.internal.util.vertcatCellContents, ... functionInputs, 'UniformOutput', false); [obj.IsFinished, data] = obj.callFunctionHandle(isLastofAllInputs, functionInputs, []); end obj.IsMoreInputRequired = ~isLastOfDependencies; return; end inputBuffer.add(isLastOfInputsVector, functionInputs{:}); isInputTooShortVector = isLastOfInputsVector & ~inputBuffer.IsInputSingleSlice ... & inputBuffer.NumBufferedSlices ~= inputBuffer.LargestNumBufferedSlices; if any (isInputTooShortVector) if all(inputBuffer.IsInputSinglePartition) % We can be certain in this case that the two arrays % have an incompatible size in the tall dimension. obj.throwSizeError(); else % Otherwise this might just be a case of different % partitioning. obj.throwFromFunctionHandle(MException(message('MATLAB:bigdata:array:IncompatibleTallIndexing'))); end end % There are some first time checks that we want to do once the % input buffer has enough data to determine the types of input % we are about to receive. if ~obj.IsInitialized % We require to know which inputs are single slice before % we can do the first time checks. if ~inputBuffer.HasDeterminedSingleSliceInputs data = cell(0, obj.NumOutputs); return; end % This is to guard against the situation where the size of % one partition in a partitioned tall array just so happens % to match the size of a non-partitioned array. Examples % include the output of a reduction as well as local arrays. if any(~inputBuffer.IsInputSinglePartition) isInputInvalidBroadcastVector = inputBuffer.IsInputSinglePartition & ~inputBuffer.IsInputSingleSlice; if any(isInputInvalidBroadcastVector) obj.throwSizeError(); end end if ~obj.AllowTallDimExpansion && any(~inputBuffer.IsInputSingleSlice) && any(inputBuffer.IsInputSingleSlice) obj.throwSizeError(); end obj.IsInitialized = true; end [functionInputs, numSlices] = inputBuffer.getCompleteSlices(obj.MaxNumSlices); isLastofAllInputs = all(isLastOfInputsVector) && inputBuffer.LargestNumBufferedSlices == 0; [obj.IsFinished, data] = obj.callFunctionHandle(isLastofAllInputs, functionInputs, numSlices); % This logic exists in order to ensure inputs arrive at similar % data rates. % % This object indicates that it requires more data for a given % input if and only if the buffer for that input contains less % data than would be needed to consume all of the data from all % buffers, or to reach MaxNumSlices if that is smaller. requiredBufferSize = inputBuffer.LargestNumBufferedSlices; requiredBufferSize = min(requiredBufferSize, obj.MaxNumSlices); requiredBufferSize = max(requiredBufferSize, 1); isBufferTooShortVector = inputBuffer.NumBufferedSlices < requiredBufferSize; isMoreInputRequiredVector = ~isLastOfInputsVector & isBufferTooShortVector; % We have to map from operation inputs back to upstream % dependencies because this property is in terms of upstream % dependencies. obj.IsMoreInputRequired = ~obj.IsFinished & obj.InputFutureMap.reverseMapLogicals(isMoreInputRequiredVector); end end methods (Access = protected, Abstract) % Call the underlying function handle with the corresponding % inputs. [isFinished, data] = callFunctionHandle(obj, isLastOfAllInput, inputs); % Throw the provided error as if it originated from the function % handle. throwFromFunctionHandle(obj, err); end methods (Access = protected) % Protected constructor for subclasses. function obj = AbstractChunkwiseProcessor(numOutputs, inputFutureMap, ... enableBuffer, isInputSinglePartition, maxNumSlices, allowTallDimExpansion) import matlab.bigdata.internal.lazyeval.InputBuffer; obj.NumOutputs = numOutputs; obj.InputFutureMap = inputFutureMap; if enableBuffer obj.InputBuffer = InputBuffer(numel(isInputSinglePartition), isInputSinglePartition); end obj.IsMoreInputRequired = true(1, obj.InputFutureMap.NumOperationInputs); if nargin >= 5 obj.MaxNumSlices = maxNumSlices; end if nargin >= 6 obj.AllowTallDimExpansion = allowTallDimExpansion; end end end methods (Access = private) % Helper function that ensures the right error is thrown based on % whether this operation supports singleton expansion in the tall % dimension. function throwSizeError(obj) if obj.AllowTallDimExpansion obj.throwFromFunctionHandle(MException(message('MATLAB:bigdata:array:IncompatibleTallSize'))); else obj.throwFromFunctionHandle(MException(message('MATLAB:bigdata:array:IncompatibleTallStrictSize'))); end end end end