www.gusucode.com > datastoreio工具箱 matlab源码程序 > datastoreio/+matlab/+io/+datastore/SplittableDatastore.m

    classdef (Hidden = true, AllowedSubclasses = {?matlab.io.datastore.FileBasedDatastore, ?matlab.io.datastore.SplittableDatastoreTestBase}) ...
        SplittableDatastore < matlab.io.datastore.Datastore & matlab.mixin.Copyable
%SplittableDatastore    Declares the interface for divisible datastores.
%   Datastores that can divide up the tasks for reading the datastore
%   into smaller pieces (called splits) are splittable in nature and can
%   support extra functionality including getting progress from the
%   datastore and possible parallelism with mapreduce.
%
%   This class inherits from AbstractDatastore and provides default
%   implementations for the hasdata, read, and reset methods
%
%   See also datastore, mapreduce

%   Copyright 2014-2016 The MathWorks, Inc.

    % Default implementation for SplittableDatastore %
    
    properties (Access = 'protected')
        Splitter;                % Splitter instance for the datastore
    end
    
    properties (Transient, Access = 'protected')
        SplitReader;             % SplitReader instance
        SplitIdx = 0;            % current split index
    end
    
    methods        
        function tf = hasdata(ds)
        %HASDATA   Returns true if more data is available.
        %   Return logical scalar indicating availability of data. This
        %   method should be called before calling read.
        %
        %   See also READ, READALL, PREVIEW, RESET,
        %   matlab.io.datastore.TabularTextDatastore
        
            tf = false;
            
            numSplits = ds.Splitter.NumSplits;
            if numSplits == 0
                return;
            end
            
            % current split has data?
            if hasNext(ds.SplitReader)
                tf = true;
                return;
            end
            
            % any non empty split left?
            currIdx = ds.SplitIdx;
            if currIdx < numSplits
                % skip over splits without data
                for sidx = currIdx + 1 : numSplits
                    prevRdr = ds.SplitReader;
                    try
                        % point the reader to the new split beginning
                        ds.moveToSplit(sidx);
                    catch ME
                        % if it fails, set the reader and the split index
                        % to the previous split.
                        ds.SplitReader = prevRdr;
                        throw(ME)
                    end
                    
                    if hasNext(ds.SplitReader)
                        tf = true;
                        return;
                    end
                end 
            end
            
        end
        
        function reset(ds)
        %RESET   Reset to the start of the data.
        %   Reset the datastore to the state where no data has been read
        %   from it.
            
            if ~isempty(ds.Splitter) && isvalid(ds.Splitter) && ...
                ds.Splitter.NumSplits ~= 0
                ds.moveToSplit(1);
            end
        end

        function delete(ds)
        %DELETE   Delete the datastore
            if ~isempty(ds.Splitter) && isvalid(ds.Splitter)
                delete(ds.Splitter);
            end
            if ~isempty(ds.SplitReader) && ...
               isa(ds.SplitReader, 'matlab.io.datastore.splitreader.SplitReader') && ...
               isvalid(ds.SplitReader)
                delete(ds.SplitReader);
            end
        end        
    end

    methods (Hidden)
        function frac = progress(ds)
        %PROGRESS   Percentage of completed splits between 0.0 and 1.0.
        %   Return fraction between 0.0 and 1.0 indicating progress. Does
        %   not count unfinished splits
            numSplits = ds.Splitter.NumSplits;
            if numSplits == 0
                frac = 1.0;
                return;
            end
            split = ds.SplitIdx-hasNext(ds.SplitReader);
            frac = min(split/numSplits, 1.0);
        end
    end

    % Default copy implementation for SplittableDatastore
    methods (Access = 'protected')
        function dscopy = copyElement(ds)
        % COPYELEMENT   Default implementation for copying SplittableDatastore objects.
            dscopy = copyElement@matlab.mixin.Copyable(ds);
            dscopy.Splitter = copy(ds.Splitter);
            if ds.Splitter.NumSplits ~= 0 ...
            && isa(ds.SplitReader, 'matlab.io.datastore.splitreader.SplitReader')
                dscopy.SplitReader = copy(ds.SplitReader);
            end
        end

        function [data, info] = readData(ds)
        %READDATA   Read data and information about the extracted data.
        %   Return the data extracted from the datastore in the appropriate
        %   form for this datastore. Also return information about where
        %   the data was extracted from in the datastore.
        %
            if ~hasdata(ds)
                error(message(...
                    'MATLAB:datastoreio:splittabledatastore:noMoreData'));
            end
            [data, info] = getNext(ds.SplitReader);
        end

    end
    
    methods (Access = 'protected')
        function moveToSplit(ds, ii)
            rdr = createReader(ds.Splitter, ii);
            reset(rdr);
            % the above call may error, so only do sets afterwards
            ds.SplitIdx = ii;
            ds.SplitReader = rdr;
        end
    end
    
    methods
        function set.Splitter(ds, splitter)
            if ~isa(splitter, 'matlab.io.datastore.splitter.Splitter')
                error(message('MATLAB:datastoreio:splittabledatastore:invalidSplitter'));
            end
            ds.Splitter = splitter;
        end
    end

    methods
        function outds = partition(ds, N, ii)
        %PARTITION   Return a partitioned part of the datastore.
        %   This function will return a datastore that represents the part of the
        %   data corresponding with the partition and index chosen.
             
            % The actual partitioning.
            newSplitter = ds.Splitter.partitionBySubset(N, ii);
            outds = ds.copy();
            outds.Splitter = newSplitter;
            outds.reset();
        end
        
        function numPartitions = numpartitions(ds, pool)
            %NUMPARTITIONS Return an estimate for a reasonable number of partitions for the given information.
            %
            %   N = NUMPARTITIONS(DS) returns the default number of partitions for a
            %   given DATASTORE, DS.
            %
            %   N = NUMPARTITIONS(DS,POOL) returns a reasonable number of partitions
            %   to parallelize DS over the parallel pool, POOL, based on the total
            %   number of partitions and the number of workers in POOL.
            %
            %   Th number of partitions obtained from NUMPARTITIONS is recommended as
            %   an input to PARTITION function.
            %
            %   Example:
            %      % A datastore that contains 10 copies of the 'airlinesmall.csv'
            %      % example dataset.
            %      files = repmat({'airlinesmall.csv'},1,10);
            %      ds = tabularTextDatastore(files,'TreatAsMissing','NA','MissingValue',0);
            %      ds.SelectedVariableNames = 'ArrDelay';
            %
            %      N = numpartitions(ds,gcp);
            %      totalSum = 0;
            %      parfor ii = 1:N
            %          subds = partition(ds,N,ii);
            %
            %          while hasdata(subds)
            %              data = read(subds)
            %              totalSum = totalSum + sum(data.ArrDelay);
            %          end
            %      end
            %      totalSum
            %
            %   See also matlab.io.datastore.TabularTextDatastore, partition.

            try
                numWorkers = Inf;
                if nargin >= 2
                    validateattributes(pool, {'parallel.Pool'}, {}, 'numpartitions', 'pool');
                    if ~isempty(pool)
                        numWorkers = pool.NumWorkers;
                    end
                end

                % We choose 3 * numWorkers for load balancing reasons.
                numPartitions = min(numel(ds.Splitter.Splits), 3 * numWorkers);
            catch ME
                throwAsCaller(ME);
            end
        end
    end
end