www.gusucode.com > bigdata 工具箱 matlab源码程序 > bigdata/+matlab/+bigdata/+internal/+io/CacheProcessor.m

    %CacheProcessor
% Helper class that introduces a cache to the graph of operations.
%
% On cache hit, this will not require any input and read output chunks from
% the cache. On cache miss, this will require the input, write it to the
% cache as well as pass it forward.
%

%   Copyright 2015-2016 The MathWorks, Inc.

classdef CacheProcessor < matlab.bigdata.internal.executor.DataProcessor
    % Properties overridden in the DataProcessor interface.
    properties (SetAccess = private)
        IsFinished = false;
        IsMoreInputRequired = true;
    end
    
    properties (SetAccess = immutable)
        % The underlying processor that will read from the disk or memory
        % cache. This will be empty if no cache exists.
        ReadProcessor;
        
        % The underlying processor that will write to the disk or memory
        % cache. This will be empty if both disk and memory cache already
        % exists.
        WriteProcessor;
    end
    
    % Methods overridden in the DataProcessor interface.
    methods
        function data = process(obj, isLastOfInput, data)
            if isempty(obj.ReadProcessor)
                % If no cache read processor, this is a straight pass
                % through.
                obj.IsFinished = isLastOfInput;
            else
                % Otherwise we redirect input to be from the cache read
                % processor.
                data = obj.ReadProcessor.process([]);
                obj.IsFinished = obj.ReadProcessor.IsFinished;
            end
            
            if ~isempty(obj.WriteProcessor)
                % If we need to write to cache, do that here.
                data = obj.WriteProcessor.process(obj.IsFinished, data);
                obj.IsFinished = obj.WriteProcessor.IsFinished;
            end
        end
    end
    
    methods (Static)
        % Create a DataProcessor that represents a cache point in the graph
        % of data processors.
        function factory = createFactory(cacheEntryKey, getCacheStoreFunction)
            factory = @createCacheProcessor;
            function processor = createCacheProcessor(partition)
                import matlab.bigdata.internal.io.CacheProcessor;
                import matlab.bigdata.internal.io.LocalReadProcessor;
                import matlab.bigdata.internal.io.LocalWriteProcessor;
                cacheEntryStore = feval(getCacheStoreFunction);
                
                [reader, writer] = cacheEntryStore.openOrCreateEntry(cacheEntryKey, partition.PartitionIndex);
                if ~isempty(reader)
                    reader = LocalReadProcessor(reader);
                end
                if ~isempty(writer)
                    writer = LocalWriteProcessor(writer);
                end
                processor = CacheProcessor(reader, writer);
            end
        end
    end
    
    methods (Access = private)
        % Private constructor for the static factory method.
        function obj = CacheProcessor(readProcessor, writeProcessor)
            obj.ReadProcessor = readProcessor;
            obj.WriteProcessor = writeProcessor;
            obj.IsMoreInputRequired = isempty(readProcessor);
        end
    end
end