www.gusucode.com > distcomp 案例源码程序 matlab代码 > distcomp/paralleldemo_distribjob_bench.m
%% Benchmarking Independent Jobs on the Cluster % In this example, we show how to benchmark an application using independent % jobs on the cluster, and we analyze the results in some detail. In % particular, we: % % * Show how to benchmark a mixture of sequential code and task parallel code. % * Explain strong and weak scaling. % * Discuss some of the potential bottlenecks, both on the client and on % the cluster. % % Note: If you run this example on a large cluster, it might take an hour to % run. % % Related examples: % % * <docid:distcomp_examples.example-ex61407340 Resource Contention in Task Parallel Problems> % * <docid:distcomp_examples.example-ex53144152 Simple Benchmarking of PARFOR Using Blackjack> % Copyright 2008-2016 The MathWorks, Inc. %% % The code shown in this example can be found in this function: function paralleldemo_distribjob_bench %% Check the Cluster Profile % Before we interact with the cluster, we verify that the MATLAB(R) % client is configured according to our needs. Calling |parcluster| will % give us a cluster using the default profile or will throw an error if the % default is not usable. myCluster = parcluster; %% Timing % We time all operations separately to allow us to inspect them in detail. % We will need all those detailed timings to understand where the time is % spent, and to isolate the potential bottlenecks. For the purposes of the % example, the actual function we benchmark is not very important; in this % case we simulate hands of the card game blackjack or 21. % % We write all of the operations to be as efficient as possible. For % example, we use vectorized task creation. We use |tic| and |toc| for % measuring the elapsed time of all the operations instead of using the job % and task properties |CreateTime|, |StartTime|, |FinishTime|, etc., % because |tic| and |toc| give us sub-second granularity. Note that we % have also instrumented the task function so that it returns the time % spent executing our benchmark computations. function [times, description] = timeJob(myCluster, numTasks, numHands) % The code that creates the job and its tasks executes sequentially in % the MATLAB client starts here. % We first measure how long it takes to create a job. timingStart = tic; start = tic; job = createJob(myCluster); times.jobCreateTime = toc(start); description.jobCreateTime = 'Job creation time'; % Create all the tasks in one call to createTask, and measure how long % that takes. start = tic; taskArgs = repmat({{numHands, 1}}, numTasks, 1); createTask(job, @pctdemo_task_blackjack, 2, taskArgs); times.taskCreateTime = toc(start); description.taskCreateTime = 'Task creation time'; % Measure how long it takes to submit the job to the cluster. start = tic; submit(job); times.submitTime = toc(start); description.submitTime = 'Job submission time'; % Once the job has been submitted, we hope all its tasks execute in % parallel. We measure how long it takes for all the tasks to start % and to run to completion. start = tic; wait(job); times.jobWaitTime = toc(start); description.jobWaitTime = 'Job wait time'; % Tasks have now completed, so we are again executing sequential code % in the MATLAB client. We measure how long it takes to retrieve all % the job results. start = tic; results = fetchOutputs(job); times.resultsTime = toc(start); description.resultsTime = 'Result retrieval time'; % Verify that the job ran without any errors. if ~isempty([job.Tasks.Error]) taskErrorMsgs = pctdemo_helper_getUniqueErrors(job); delete(job); error('pctexample:distribjobbench:JobErrored', ... ['The following error(s) occurred during task ' ... 'execution:\n\n%s'], taskErrorMsgs); end % Get the execution time of the tasks. Our task function returns this % as its second output argument. times.exeTime = max([results{:,2}]); description.exeTime = 'Task execution time'; % Measure how long it takes to delete the job and all its tasks. start = tic; delete(job); times.deleteTime = toc(start); description.deleteTime = 'Job deletion time'; % Measure the total time elapsed from creating the job up to this % point. times.totalTime = toc(timingStart); description.totalTime = 'Total time'; times.numTasks = numTasks; description.numTasks = 'Number of tasks'; end %% % We look at some of the details of what we are measuring: % % * *Job creation time*: The time it takes to create a job. For an MJS % cluster, this involves a remote call, and the MJS allocates space % in its data base. For other cluster types, job creation involves writing % a few files to disk. % * *Task creation time*: The time it takes to create and save the task % information. The MJS saves this in its data base, whereas other cluster % types save it in files on the file system. % * *Job submission time*: The time it takes to submit the job. For an MJS % cluster, we tell it to start executing the job it has in its data % base. We ask other cluster types to execute all the tasks we have % created. % * *Job wait time*: The time we wait after the job submission until job % completion. This includes all the activities that take place between job % submission and when the job has completed, such as: cluster may need to % start all the workers and to send the workers the task information; the % workers read the task information, and execute the task function. In the % case of an MJS cluster, the workers then send the task results to the % MJS, which writes them to its data base, whereas for the other % cluster types, the workers write the task results to disk. % * *Task execution time*: The time spent simulating blackjack. We % instrument the task function to accurately measure this time. This time % is also included in the job wait time. % * *Results retrieval time*: The time it takes to bring the job results % into the MATLAB client. For the MJS, we obtain them from its % data base. For other cluster types, we read them from the file system. % * *Job deletion time*: The time it takes to delete all the job and % task information. The MJS deletes it from its data base. For % the other cluster types, we delete the files from the file system. % * *Total time*: The time it takes to perform all of the above. %% Choosing Problem Size % We know that most clusters are designed for batch execution of medium % or long running jobs, so we deliberately try to have our benchmark % calculations fall within that range. Yet, we do not want this example to % take hours to run, so we choose the problem size so that each task takes % approximately 1 minute on our hardware, and we then repeat the timing % measurements a few times for increased accuracy. As a rule of thumb, if % your calculations in a task take much less than a minute, you should % consider whether |parfor| meets your low-latency needs better than jobs % and tasks. numHands = 1.2e6; numReps = 5; %% % We explore speedup by running on a different number of workers, starting % with 1, 2, 4, 8, 16, etc., and ending with as many workers as we can % possibly use. In this example, we assume that we have dedicated access to % the cluster for the benchmarking, and that the cluster's |NumWorkers| % property has been set correctly. Assuming that to be the case, each task % will execute right away on a dedicated worker, so we can equate the % number of tasks we submit with the number of workers that execute them. numWorkers = myCluster.NumWorkers ; if isinf(numWorkers) || (numWorkers == 0) error('pctexample:distribjobbench:InvalidNumWorkers', ... ['Cannot deduce the number of workers from the cluster. ' ... 'Set the NumWorkers on your default profile to be ' ... 'a value other than 0 or inf.']); end numTasks = [pow2(0:ceil(log2(numWorkers) - 1)), numWorkers]; %% Weak Scaling Measurements % We vary the number of tasks in a job, and have each task perform a fixed % amount of work. This is called *weak scaling*, and is what we really care % the most about, because we usually scale up to the cluster to solve % larger problems. It should be compared with the strong scaling % benchmarks shown later in this example. Speedup based on weak scaling is % also known as *scaled speedup*. fprintf(['Starting weak scaling timing. ' ... 'Submitting a total of %d jobs.\n'], numReps*length(numTasks)); for j = 1:length(numTasks) n = numTasks(j); for itr = 1:numReps [rep(itr), description] = timeJob(myCluster, n, numHands); %#ok<AGROW> end % Retain the iteration with the lowest total time. totalTime = [rep.totalTime]; fastest = find(totalTime == min(totalTime), 1); weak(j) = rep(fastest); %#ok<AGROW> fprintf('Job wait time with %d task(s): %f seconds\n', ... n, weak(j).jobWaitTime); end %% Sequential Execution % We measure the sequential execution time of the computations. Note % that this time should be compared to the execution time on the % cluster only if they have the same hardware and software configuration. seqTime = inf; for itr = 1:numReps start = tic; pctdemo_task_blackjack(numHands, 1); seqTime = min(seqTime, toc(start)); end fprintf('Sequential execution time: %f seconds\n', seqTime); %% Speedup Based on Weak Scaling and Total Execution Time % We first look at the overall speedup achieved by running on different % numbers of workers. The speedup is based on the total time used for the % computations, so it includes both the sequential and the parallel % portions of our code. % % This speedup curve represents the capabilities of multiple items with % unknown weights associated with each of them: The cluster hardware, the % cluster software, the client hardware, the client software, and the % connection between the client and the cluster. Therefore, the speedup % curve does not represent any one of these, but all taken together. % % If the speedup curve meets your desired performance targets, you know % that all the aforementioned factors work well together in this particular % benchmark. However, if the speedup curve fails to meet your targets, you % do not know which of the many factors listed above is the most to blame. % It could even be that the approach taken in the parallelization of the % application is to blame rather than either the other software or % hardware. % % All too often, novices believe that this single graph gives the complete % picture of the performance of their cluster hardware or software. This % is indeed not the case, and one always needs to be aware that this graph % does not allow us to draw any conclusions about potential performance % bottlenecks. titleStr = sprintf(['Speedup based on total execution time\n' ... 'Note: This graph does not identify performance ' ... 'bottlenecks']); pctdemo_plot_distribjob('speedup', [weak.numTasks], [weak.totalTime], ... weak(1).totalTime, titleStr); %% Detailed Graphs, Part 1 % We dig a little bit deeper and look at the times spent in the various % steps of our code. We benchmarked weak scaling, that is, the more tasks % we create, the more work we perform. Therefore, the size of the task % output data increases as we increase the number of tasks. With that % in mind, we expect the following to take longer the more tasks we % create: % % * Task creation % * Retrieval of job output arguments % * Job destruction time % % We have no reason to believe that the following increases with the number % of tasks: % % * Job creation time % % After all, the job is created before we define any of its tasks, so there % is no reason why it should vary with the number of tasks. We might % expect to see only some random fluctuations in the job creation time. pctdemo_plot_distribjob('fields', weak, description, ... {'jobCreateTime', 'taskCreateTime', 'resultsTime', 'deleteTime'}, ... 'Time in seconds'); %% Normalized Times % We already concluded that task creation time is expected to increase as % we increase the number of tasks, as does the time to retrieve job output % arguments and to delete the job. However, this increase is due to the % fact that we are performing more work as we increase the number of % workers/tasks. It is therefore meaningful to measure the efficiency of % these three activities by looking at the time it takes to perform these % operations, and normalize it by the number of tasks. This way, we can % look to see if any of the following times stay constant, increase, or % decrease as we vary the number of tasks: % % * The time it takes to create a single task % * The time it takes to retrieve output arguments from a single task % * The time it takes to delete a task in a job % % The normalized times in this graph represent the capabilities of the % MATLAB client and the portion of the cluster hardware or software that it % might interact with. It is generally considered good if these curves % stay flat, and excellent if they are decreasing. pctdemo_plot_distribjob('normalizedFields', weak, description, ... {'taskCreateTime', 'resultsTime', 'deleteTime'}); %% % These graphs sometimes show that the time spent retrieving the results % per task goes down as the number of tasks increases. That is undeniably % good: We become more efficient the more work we perform. This might % happen if there is a fixed amount of overhead for the operation and if it % takes a fixed amount of time per task in the job. % % We cannot expect a speedup curve based on total execution time to look % particularly good if it includes a significant amount of time spent on % sequential activities such as the above, where the time spent increases % with the number of tasks. In that case, the sequential activities will % dominate once there are sufficiently many tasks. %% Detailed Graphs, Part 2 % It is possible that the time spent in each of following steps varies with % the number of tasks, but we hope it does not: % % * Job submission time. % * Task execution time. This captures the time spent simulating % blackjack. Nothing more, nothing less. % % In both cases, we look at the elapsed time, also referred to as wall % clock time. We look at neither the total CPU time on the cluster nor the % normalized time. pctdemo_plot_distribjob('fields', weak, description, ... {'submitTime', 'exeTime'}); %% % There are situations where each of the times shown above could increase % with the number of tasks. For example: % % * With some third-party cluster types, the job submission involves one % system call for each task in the job, or the job submission involves % copying files across the network. In those cases, the job submission % time may increase linearly with the number of tasks. % * The graph of the task execution time is the most likely to expose % hardware limitations and resource contention. For example, the task % execution time could increase if we are executing multiple workers on the % same computer, due to contention for limited memory bandwidth. Another % example of resource contention is if the task function were to read or % write large data files using a single, shared file system. The task % function in this example, however, does not access the file system at all. % These types of hardware limitations are covered in great detail in the % example % <docid:distcomp_examples.example-ex61407340 Resource Contention in Task Parallel Problems>. %% Speedup Based on Weak Scaling and Job Wait Time % Now that we have dissected the times spent in the various stages of our % code, we want to create a speedup curve that more accurately reflects the % capabilities of our cluster hardware and software. We do this by % calculating a speedup curve based on the job wait time. % % When calculating this speedup curve based on the job wait time, we first % compare it to the time it takes to execute a job with a single task on % the cluster. titleStr = 'Speedup based on job wait time compared to one task'; pctdemo_plot_distribjob('speedup', [weak.numTasks], [weak.jobWaitTime], ... weak(1).jobWaitTime, titleStr); %% % Job wait time might include the time to start all the MATLAB workers. It % is therefore possible that this time is bounded by the IO capabilities of % a shared file system. The job wait time also includes the average task % execution time, so any deficiencies seen there also apply here. If we do % not have dedicated access to the cluster, we could expect the speedup % curve based on job wait time to suffer significantly. % % Next, we compare the job wait time to the sequential execution time, % assuming that the hardware of the client computer is comparable to the % compute nodes. If the client is not comparable to the cluster nodes, % this comparison is absolutely meaningless. If your cluster has a % substantial time lag when assigning tasks to workers, e.g., by assigning % tasks to workers only once per minute, this graph will be heavily % affected because the sequential execution time does not suffer this lag. % Note that this graph will have the same shape as the previous graph, they % will only differ by a constant, multiplicative factor. titleStr = 'Speedup based on job wait time compared to sequential time'; pctdemo_plot_distribjob('speedup', [weak.numTasks], [weak.jobWaitTime], ... seqTime, titleStr); %% Comparing Job Wait Time with Task Execution Time % As we have mentioned before, the job wait time consists of the task % execution time plus scheduling, wait time in the cluster's queue, % MATLAB startup time, etc. On an idle cluster, the difference between the % job wait time and task execution time should remain constant, at least % for small number of tasks. As the number of tasks grows into the tens, % hundreds, or thousands, we are bound to eventually run into some % limitations. For example, once we have sufficiently many tasks/workers, % the cluster cannot tell all the workers simultaneously to start % executing their task, or if the MATLAB workers all use the same file % system, they might end up saturating the file server. titleStr = 'Difference between job wait time and task execution time'; pctdemo_plot_distribjob('barTime', [weak.numTasks], ... [weak.jobWaitTime] - [weak.exeTime], titleStr); %% Strong Scaling Measurements % We now measure the execution time of a fixed-size problem, while % varying the number of workers we use to solve the problem. This is % called *strong scaling*, and it is well known that if an application % has any sequential parts, there is an upper limit to the speedup that % can be achieved with strong scaling. This is formalized in *Amdahl's % law*, which has been widely discussed and debated over the years. % % You can easily run into the limits of speedup with strong scaling when % submitting jobs to the cluster. If the task execution has a fixed % overhead (which it ordinarily does), even if it is as little as one % second, the execution time of our application will never go below one % second. In our case, we start with an application that executes in % approximately 60 seconds on one MATLAB worker. If we divide the % computations among 60 workers, it might take as little as one second for % each worker to compute its portion of the overall problem. However, the % hypothetical task execution overhead of one second has become a major % contributor to the overall execution time. % % Unless your application runs for a long time, jobs and tasks are usually % not the way to achieve good results with strong scaling. If the overhead % of task execution is close to the execution time of your application, you % should investigate whether |parfor| meets your requirements. Even in the % case of |parfor|, there is a fixed amount of overhead, albeit much % smaller than with regular jobs and tasks, and that overhead limits to the % speedup that can be achieved with strong scaling. Your problem size % relative to your cluster size may or may not be so large that you % experience those limitations. % % As a general rule of thumb, it is only possible to achieve strong scaling % of small problems on large numbers of processors with specialized % hardware and a great deal of programming effort. fprintf(['Starting strong scaling timing. ' ... 'Submitting a total of %d jobs.\n'], numReps*length(numTasks)) for j = 1:length(numTasks) n = numTasks(j); strongNumHands = ceil(numHands/n); for itr = 1:numReps rep(itr) = timeJob(myCluster, n, strongNumHands); end ind = find([rep.totalTime] == min([rep.totalTime]), 1); strong(n) = rep(ind); %#ok<AGROW> fprintf('Job wait time with %d task(s): %f seconds\n', ... n, strong(n).jobWaitTime); end %% Speedup Based on Strong Scaling and Total Execution Time % As we have already discussed, speedup curves that depict the sum of the % time spent executing sequential code in the MATLAB client and time % executing parallel code on the cluster can be very misleading. The % following graph shows this information in the worst-case scenario of % strong scaling. We deliberately chose the original problem to be so % small relative to our cluster size that the speedup curve would look bad. % Neither the cluster hardware nor software was designed with this kind of % a use in mind. titleStr = sprintf(['Speedup based on total execution time\n' ... 'Note: This graph does not identify performance ' ... 'bottlenecks']); pctdemo_plot_distribjob('speedup', [strong.numTasks], ... [strong.totalTime].*[strong.numTasks], strong(1).totalTime, titleStr); %% Alternative for Short Tasks: PARFOR % The strong scaling results did not look good because we deliberately used % jobs and tasks to execute calculations of short duration. We now look at % how |parfor| applies to that same problem. Note that we do not include % the time it takes to open the pool in our time measurements. pool = parpool(numWorkers); parforTime = inf; strongNumHands = ceil(numHands/numWorkers); for itr = 1:numReps start = tic; r = cell(1, numWorkers); parfor i = 1:numWorkers r{i} = pctdemo_task_blackjack(strongNumHands, 1); %#ok<PFOUS> end parforTime = min(parforTime, toc(start)); end delete(pool); %% Speedup Based on Strong Scaling with PARFOR % The original, sequential calculations took approximately one minute, so % each worker needs to perform only a few seconds of computations on a % large cluster. We therefore expect strong scaling performance to be much % better with |parfor| than with jobs and tasks. fprintf('Execution time with parfor using %d workers: %f seconds\n', ... numWorkers, parforTime); fprintf(['Speedup based on strong scaling with parfor using ', ... '%d workers: %f\n'], numWorkers, seqTime/parforTime); %% Summary % We have seen the difference between weak and strong scaling, and % discussed why we prefer to look at weak scaling: It measures our % ability to solve larger problems on the cluster (more simulations, % more iterations, more data, etc.). The large number of graphs and the % amount of detail in this example should also be a testament to the fact % that benchmarks cannot be boiled down to a single number or a single % graph. We need to look at the whole picture to understand whether % the application performance can be attributed to the application, the % cluster hardware or software, or a combination of both. % % We have also seen that for short calculations, |parfor| can be a great % alternative to jobs and tasks. For more benchmarking results using % |parfor|, see the example % <docid:distcomp_examples.example-ex53144152 Simple Benchmarking of PARFOR Using Blackjack>. end