www.gusucode.com > bigdata 工具箱 matlab源码程序 > bigdata/+matlab/+bigdata/+internal/+lazyeval/PartitionwiseProcessor.m
%PartitionwiseProcessor % Data Processor that applies a partition-wise function handle to the input % data. % % This will apply a function handle chunk-wise using the advanced % partitionwise API. It will emit data continuously throughout a pass. % % See LazyTaskGraph for a general description of input and outputs. % Specifically, each iteration will emit a 1 x NumOutputs cell array where % each cell contains a chunk of output of the corresponding operation % output. % % Copyright 2015-2016 The MathWorks, Inc. classdef (Sealed) PartitionwiseProcessor < matlab.bigdata.internal.lazyeval.AbstractChunkwiseProcessor properties (SetAccess = immutable) % The chunk-wise function handle. FunctionHandle; % The index of the current partition into the number of partitions % for the task. PartitionIndex = 1; end properties (SetAccess = private) % The relative index of the first slice in the next chunk to be % passed to the function handle. RelativeIndexInPartition = 1; end methods (Static) % Create a data processor factory that can be used by the execution % environment to construct instances of this class. function factory = createFactory(functionHandle, numOutputs, inputFutureMap, isInputReplicated) factory = @createProcessor; function dataProcessor = createProcessor(partition) import matlab.bigdata.internal.lazyeval.PartitionwiseProcessor; isInputSinglePartition = isInputReplicated | (partition.NumPartitions == 1); dataProcessor = PartitionwiseProcessor(functionHandle, partition.PartitionIndex, numOutputs, inputFutureMap, isInputSinglePartition); end end end % Methods overridden in the AbstractChunkwiseProcessor base class. methods (Access = protected) function [isFinished, data] = callFunctionHandle(obj, isLastOfAllInput, inputs, numSlices) info = struct(... 'PartitionId', obj.PartitionIndex, ... 'RelativeIndexInPartition', obj.RelativeIndexInPartition, ... 'IsLastChunk', isLastOfAllInput); [isFinished, data{1:obj.NumOutputs}] = matlab.bigdata.internal.lazyeval.callFunctionHandle(obj.FunctionHandle, info, inputs{:}); obj.RelativeIndexInPartition = obj.RelativeIndexInPartition + numSlices; end function throwFromFunctionHandle(obj, err) obj.FunctionHandle.throwAsFunction(err); end end methods (Access = private) % Private constructor for factory method. function obj = PartitionwiseProcessor(functionHandle, partitionIndex, numOutputs, inputFutureMap, isInputSinglePartition) import matlab.bigdata.internal.lazyeval.InputBuffer; enableBuffer = true; obj = obj@matlab.bigdata.internal.lazyeval.AbstractChunkwiseProcessor(... numOutputs, inputFutureMap, enableBuffer, ... isInputSinglePartition, functionHandle.MaxNumSlices); obj.FunctionHandle = functionHandle; obj.PartitionIndex = partitionIndex; end end end