www.gusucode.com > bigdata 工具箱 matlab源码程序 > bigdata/+matlab/+bigdata/+internal/+lazyeval/PartitionwiseProcessor.m

    %PartitionwiseProcessor
% Data Processor that applies a partition-wise function handle to the input
% data.
%
% This will apply a function handle chunk-wise using the advanced
% partitionwise API. It will emit data continuously throughout a pass.
%
% See LazyTaskGraph for a general description of input and outputs.
% Specifically, each iteration will emit a 1 x NumOutputs cell array where
% each cell contains a chunk of output of the corresponding operation
% output.
%

%   Copyright 2015-2016 The MathWorks, Inc.

classdef (Sealed) PartitionwiseProcessor < matlab.bigdata.internal.lazyeval.AbstractChunkwiseProcessor
    properties (SetAccess = immutable)
        % The chunk-wise function handle.
        FunctionHandle;
        
        % The index of the current partition into the number of partitions
        % for the task.
        PartitionIndex = 1;
    end
    
    properties (SetAccess = private)
        % The relative index of the first slice in the next chunk to be
        % passed to the function handle.
        RelativeIndexInPartition = 1;
    end
    
    methods (Static)
        % Create a data processor factory that can be used by the execution
        % environment to construct instances of this class.
        function factory = createFactory(functionHandle, numOutputs, inputFutureMap, isInputReplicated)
            
            factory = @createProcessor;
            function dataProcessor = createProcessor(partition)
                import matlab.bigdata.internal.lazyeval.PartitionwiseProcessor;
                
                isInputSinglePartition = isInputReplicated | (partition.NumPartitions == 1);
                
                dataProcessor = PartitionwiseProcessor(functionHandle, partition.PartitionIndex, numOutputs, inputFutureMap, isInputSinglePartition);
            end
        end
    end
    
    % Methods overridden in the AbstractChunkwiseProcessor base class.
    methods (Access = protected)
        function [isFinished, data] = callFunctionHandle(obj, isLastOfAllInput, inputs, numSlices)
            info = struct(...
                'PartitionId', obj.PartitionIndex, ...
                'RelativeIndexInPartition', obj.RelativeIndexInPartition, ...
                'IsLastChunk', isLastOfAllInput);
            [isFinished, data{1:obj.NumOutputs}] = matlab.bigdata.internal.lazyeval.callFunctionHandle(obj.FunctionHandle, info, inputs{:});
            obj.RelativeIndexInPartition = obj.RelativeIndexInPartition + numSlices;
        end
        
        function throwFromFunctionHandle(obj, err)
            obj.FunctionHandle.throwAsFunction(err);
        end
    end
    
    methods (Access = private)
        % Private constructor for factory method.
        function obj = PartitionwiseProcessor(functionHandle, partitionIndex, numOutputs, inputFutureMap, isInputSinglePartition)
            import matlab.bigdata.internal.lazyeval.InputBuffer;
            enableBuffer = true;
            obj = obj@matlab.bigdata.internal.lazyeval.AbstractChunkwiseProcessor(...
                numOutputs, inputFutureMap, enableBuffer, ...
                isInputSinglePartition, functionHandle.MaxNumSlices);
            obj.FunctionHandle = functionHandle;
            obj.PartitionIndex = partitionIndex;
        end
    end
end