[2/3] framework: allow specifying the number of jobs for concurrency

Submitted by Marek Olšák on May 30, 2018, 9:18 p.m.

Details

Message ID 20180530211815.5995-1-maraeo@gmail.com
State New
Headers show
Series "Series without cover letter" ( rev: 2 ) in Piglit

Not browsing as part of any series.

Commit Message

Marek Olšák May 30, 2018, 9:18 p.m.
From: Nicolai Hähnle <nicolai.haehnle@amd.com>

The default remains the same: number of CPUs. But on systems with lots of
cores but comparatively little (V)RAM it can make sense to reduce the
number of jobs to avoid random failures caused by out-of-memory conditions.
---
 framework/options.py      |  1 +
 framework/profile.py      |  5 +++--
 framework/programs/run.py | 23 +++++++++++++++++++++--
 3 files changed, 25 insertions(+), 4 deletions(-)

Patch hide | download patch | download mbox

diff --git a/framework/options.py b/framework/options.py
index 211159a45..f5f32af78 100644
--- a/framework/options.py
+++ b/framework/options.py
@@ -51,20 +51,21 @@  class _Options(object):  # pylint: disable=too-many-instance-attributes
     env -- environment variables set for each test before run
     deqp_mustpass -- True to enable the use of the deqp mustpass list feature.
     """
 
     def __init__(self):
         self.execute = True
         self.valgrind = False
         self.sync = False
         self.deqp_mustpass = False
         self.process_isolation = True
+        self.jobs = None
 
         # env is used to set some base environment variables that are not going
         # to change across runs, without sending them to os.environ which is
         # fickle and easy to break
         self.env = {
             'PIGLIT_SOURCE_DIR':
                 os.environ.get(
                     'PIGLIT_SOURCE_DIR',
                     os.path.abspath(os.path.join(os.path.dirname(__file__),
                                                  '..')))
diff --git a/framework/profile.py b/framework/profile.py
index ffc91e0a6..a6cac2cf0 100644
--- a/framework/profile.py
+++ b/framework/profile.py
@@ -590,37 +590,38 @@  def load_test_profile(filename, python=None):
                 filename))
 
     try:
         return mod.profile
     except AttributeError:
         raise exceptions.PiglitFatalError(
             'There is no "profile" attribute in module {}.\n'
             'Did you specify the right file?'.format(filename))
 
 
-def run(profiles, logger, backend, concurrency):
+def run(profiles, logger, backend, concurrency, jobs):
     """Runs all tests using Thread pool.
 
     When called this method will flatten out self.tests into self.test_list,
     then will prepare a logger, and begin executing tests through it's Thread
     pools.
 
     Based on the value of concurrency it will either run all the tests
     concurrently, all serially, or first the thread safe tests then the
     serial tests.
 
     Finally it will print a final summary of the tests.
 
     Arguments:
     profiles -- a list of Profile instances.
     logger   -- a log.LogManager instance.
     backend  -- a results.Backend derived instance.
+    jobs     -- maximum number of concurrent jobs. Use os.cpu_count() by default
     """
     chunksize = 1
 
     profiles = [(p, p.itertests()) for p in profiles]
     log = LogManager(logger, sum(len(p) for p, _ in profiles))
 
     # check that after the filters are run there are actually tests to run.
     # if not any(l for _, l in profiles):
         # raise exceptions.PiglitUserError('no matching tests')
 
@@ -663,21 +664,21 @@  def run(profiles, logger, backend, concurrency):
             # pool
             run_threads(single, profile, test_list[1],
                         lambda x: not x[1].run_concurrent)
         profile.teardown()
 
     # Multiprocessing.dummy is a wrapper around Threading that provides a
     # multiprocessing compatible API
     #
     # The default value of pool is the number of virtual processor cores
     single = multiprocessing.dummy.Pool(1)
-    multi = multiprocessing.dummy.Pool()
+    multi = multiprocessing.dummy.Pool(jobs)
 
     try:
         for p in profiles:
             run_profile(*p)
 
         for pool in [single, multi]:
             pool.close()
             pool.join()
     finally:
         log.get().summary()
diff --git a/framework/programs/run.py b/framework/programs/run.py
index 14fb764a2..ab1cb4e24 100644
--- a/framework/programs/run.py
+++ b/framework/programs/run.py
@@ -201,20 +201,28 @@  def _run_parser(input_):
                         dest='process_isolation',
                         action='store',
                         type=booltype,
                         default=core.PIGLIT_CONFIG.safe_get(
                             'core', 'process isolation', 'true'),
                         metavar='<bool>',
                         help='Set this to allow tests to run without process '
                              'isolation. This allows, but does not require, '
                              'tests to run multiple tests per process. '
                              'This value can also be set in piglit.conf.')
+    parser.add_argument('-j', '--jobs',
+                        dest='jobs',
+                        action='store',
+                        type=int,
+                        default=core.PIGLIT_CONFIG.safe_get(
+                            'core', 'jobs', None),
+                        help='Set the maximum number of jobs to run concurrently. '
+                             'By default, the reported number of CPUs is used.')
     parser.add_argument("--ignore-missing",
                         dest="ignore_missing",
                         action="store_true",
                         help="missing tests are considered as 'notrun'")
     parser.add_argument("test_profile",
                         metavar="<Profile path(s)>",
                         nargs='+',
                         help="Path to one or more test profiles to run. "
                              "If more than one profile is provided then they "
                              "will be merged.")
@@ -289,20 +297,21 @@  def run(input_):
     # isn't reliable with threaded run
     if args.dmesg or args.monitored:
         args.concurrency = "none"
 
     # Pass arguments into Options
     options.OPTIONS.execute = args.execute
     options.OPTIONS.valgrind = args.valgrind
     options.OPTIONS.sync = args.sync
     options.OPTIONS.deqp_mustpass = args.deqp_mustpass
     options.OPTIONS.process_isolation = args.process_isolation
+    options.OPTIONS.jobs = args.jobs
 
     # Set the platform to pass to waffle
     options.OPTIONS.env['PIGLIT_PLATFORM'] = args.platform
 
     # Change working directory to the root of the piglit directory
     piglit_dir = path.dirname(path.realpath(sys.argv[0]))
     os.chdir(piglit_dir)
 
     # If the results directory already exists and if overwrite was set, then
     # clear the directory. If it wasn't set, then raise fatal error.
@@ -357,21 +366,21 @@  def run(input_):
         for p in profiles:
             p.options['ignore_missing'] = args.ignore_missing
 
     for p in profiles:
         if args.exclude_tests:
             p.filters.append(profile.RegexFilter(args.exclude_tests,
                                                  inverse=True))
         if args.include_tests:
             p.filters.append(profile.RegexFilter(args.include_tests))
 
-    profile.run(profiles, args.log_level, backend, args.concurrency)
+    profile.run(profiles, args.log_level, backend, args.concurrency, args.jobs)
 
     time_elapsed.end = time.time()
     backend.finalize({'time_elapsed': time_elapsed.to_json()})
 
     print('Thank you for running Piglit!\n'
           'Results have been written to ' + args.results_path)
 
 
 @exceptions.handler
 def resume(input_):
@@ -382,29 +391,38 @@  def resume(input_):
                         help="Path to results folder")
     parser.add_argument("-f", "--config",
                         dest="config_file",
                         type=argparse.FileType("r"),
                         help="Optionally specify a piglit config file to use. "
                              "Default is piglit.conf")
     parser.add_argument("-n", "--no-retry",
                         dest="no_retry",
                         action="store_true",
                         help="Do not retry incomplete tests")
+    parser.add_argument('-j', '--jobs',
+                        dest='jobs',
+                        action='store',
+                        type=int,
+                        default=core.PIGLIT_CONFIG.safe_get(
+                            'core', 'jobs', None),
+                        help='Set the maximum number of jobs to run concurrently. '
+                             'By default, the reported number of CPUs is used.')
     args = parser.parse_args(input_)
     _disable_windows_exception_messages()
 
     results = backends.load(args.results_path)
     options.OPTIONS.execute = results.options['execute']
     options.OPTIONS.valgrind = results.options['valgrind']
     options.OPTIONS.sync = results.options['sync']
     options.OPTIONS.deqp_mustpass = results.options['deqp_mustpass']
     options.OPTIONS.process_isolation = results.options['process_isolation']
+    options.OPTIONS.jobs = args.jobs
 
     core.get_config(args.config_file)
 
     options.OPTIONS.env['PIGLIT_PLATFORM'] = results.options['platform']
 
     results.options['env'] = core.collect_system_info()
     results.options['name'] = results.name
 
     # Resume only works with the JSON backend
     backend = backends.get_backend('json')(
@@ -446,19 +464,20 @@  def resume(input_):
 
         if results.options['forced_test_list']:
             p.forced_test_list = results.options['forced_test_list']
 
     # This is resumed, don't bother with time since it won't be accurate anyway
     try:
         profile.run(
             profiles,
             results.options['log_level'],
             backend,
-            results.options['concurrent'])
+            results.options['concurrent'],
+            args.jobs)
     except exceptions.PiglitUserError as e:
         if str(e) != 'no matching tests':
             raise
 
     backend.finalize()
 
     print("Thank you for running Piglit!\n"
           "Results have been written to {0}".format(args.results_path))

Comments

Thanks for the changes,

Reviewed-by: Dylan Baker <dylan@pnwbakers.com>

Quoting Marek Olšák (2018-05-30 14:18:15)
> From: Nicolai Hähnle <nicolai.haehnle@amd.com>
> 
> The default remains the same: number of CPUs. But on systems with lots of
> cores but comparatively little (V)RAM it can make sense to reduce the
> number of jobs to avoid random failures caused by out-of-memory conditions.
> ---
>  framework/options.py      |  1 +
>  framework/profile.py      |  5 +++--
>  framework/programs/run.py | 23 +++++++++++++++++++++--
>  3 files changed, 25 insertions(+), 4 deletions(-)
> 
> diff --git a/framework/options.py b/framework/options.py
> index 211159a45..f5f32af78 100644
> --- a/framework/options.py
> +++ b/framework/options.py
> @@ -51,20 +51,21 @@ class _Options(object):  # pylint: disable=too-many-instance-attributes
>      env -- environment variables set for each test before run
>      deqp_mustpass -- True to enable the use of the deqp mustpass list feature.
>      """
>  
>      def __init__(self):
>          self.execute = True
>          self.valgrind = False
>          self.sync = False
>          self.deqp_mustpass = False
>          self.process_isolation = True
> +        self.jobs = None
>  
>          # env is used to set some base environment variables that are not going
>          # to change across runs, without sending them to os.environ which is
>          # fickle and easy to break
>          self.env = {
>              'PIGLIT_SOURCE_DIR':
>                  os.environ.get(
>                      'PIGLIT_SOURCE_DIR',
>                      os.path.abspath(os.path.join(os.path.dirname(__file__),
>                                                   '..')))
> diff --git a/framework/profile.py b/framework/profile.py
> index ffc91e0a6..a6cac2cf0 100644
> --- a/framework/profile.py
> +++ b/framework/profile.py
> @@ -590,37 +590,38 @@ def load_test_profile(filename, python=None):
>                  filename))
>  
>      try:
>          return mod.profile
>      except AttributeError:
>          raise exceptions.PiglitFatalError(
>              'There is no "profile" attribute in module {}.\n'
>              'Did you specify the right file?'.format(filename))
>  
>  
> -def run(profiles, logger, backend, concurrency):
> +def run(profiles, logger, backend, concurrency, jobs):
>      """Runs all tests using Thread pool.
>  
>      When called this method will flatten out self.tests into self.test_list,
>      then will prepare a logger, and begin executing tests through it's Thread
>      pools.
>  
>      Based on the value of concurrency it will either run all the tests
>      concurrently, all serially, or first the thread safe tests then the
>      serial tests.
>  
>      Finally it will print a final summary of the tests.
>  
>      Arguments:
>      profiles -- a list of Profile instances.
>      logger   -- a log.LogManager instance.
>      backend  -- a results.Backend derived instance.
> +    jobs     -- maximum number of concurrent jobs. Use os.cpu_count() by default
>      """
>      chunksize = 1
>  
>      profiles = [(p, p.itertests()) for p in profiles]
>      log = LogManager(logger, sum(len(p) for p, _ in profiles))
>  
>      # check that after the filters are run there are actually tests to run.
>      # if not any(l for _, l in profiles):
>          # raise exceptions.PiglitUserError('no matching tests')
>  
> @@ -663,21 +664,21 @@ def run(profiles, logger, backend, concurrency):
>              # pool
>              run_threads(single, profile, test_list[1],
>                          lambda x: not x[1].run_concurrent)
>          profile.teardown()
>  
>      # Multiprocessing.dummy is a wrapper around Threading that provides a
>      # multiprocessing compatible API
>      #
>      # The default value of pool is the number of virtual processor cores
>      single = multiprocessing.dummy.Pool(1)
> -    multi = multiprocessing.dummy.Pool()
> +    multi = multiprocessing.dummy.Pool(jobs)
>  
>      try:
>          for p in profiles:
>              run_profile(*p)
>  
>          for pool in [single, multi]:
>              pool.close()
>              pool.join()
>      finally:
>          log.get().summary()
> diff --git a/framework/programs/run.py b/framework/programs/run.py
> index 14fb764a2..ab1cb4e24 100644
> --- a/framework/programs/run.py
> +++ b/framework/programs/run.py
> @@ -201,20 +201,28 @@ def _run_parser(input_):
>                          dest='process_isolation',
>                          action='store',
>                          type=booltype,
>                          default=core.PIGLIT_CONFIG.safe_get(
>                              'core', 'process isolation', 'true'),
>                          metavar='<bool>',
>                          help='Set this to allow tests to run without process '
>                               'isolation. This allows, but does not require, '
>                               'tests to run multiple tests per process. '
>                               'This value can also be set in piglit.conf.')
> +    parser.add_argument('-j', '--jobs',
> +                        dest='jobs',
> +                        action='store',
> +                        type=int,
> +                        default=core.PIGLIT_CONFIG.safe_get(
> +                            'core', 'jobs', None),
> +                        help='Set the maximum number of jobs to run concurrently. '
> +                             'By default, the reported number of CPUs is used.')
>      parser.add_argument("--ignore-missing",
>                          dest="ignore_missing",
>                          action="store_true",
>                          help="missing tests are considered as 'notrun'")
>      parser.add_argument("test_profile",
>                          metavar="<Profile path(s)>",
>                          nargs='+',
>                          help="Path to one or more test profiles to run. "
>                               "If more than one profile is provided then they "
>                               "will be merged.")
> @@ -289,20 +297,21 @@ def run(input_):
>      # isn't reliable with threaded run
>      if args.dmesg or args.monitored:
>          args.concurrency = "none"
>  
>      # Pass arguments into Options
>      options.OPTIONS.execute = args.execute
>      options.OPTIONS.valgrind = args.valgrind
>      options.OPTIONS.sync = args.sync
>      options.OPTIONS.deqp_mustpass = args.deqp_mustpass
>      options.OPTIONS.process_isolation = args.process_isolation
> +    options.OPTIONS.jobs = args.jobs
>  
>      # Set the platform to pass to waffle
>      options.OPTIONS.env['PIGLIT_PLATFORM'] = args.platform
>  
>      # Change working directory to the root of the piglit directory
>      piglit_dir = path.dirname(path.realpath(sys.argv[0]))
>      os.chdir(piglit_dir)
>  
>      # If the results directory already exists and if overwrite was set, then
>      # clear the directory. If it wasn't set, then raise fatal error.
> @@ -357,21 +366,21 @@ def run(input_):
>          for p in profiles:
>              p.options['ignore_missing'] = args.ignore_missing
>  
>      for p in profiles:
>          if args.exclude_tests:
>              p.filters.append(profile.RegexFilter(args.exclude_tests,
>                                                   inverse=True))
>          if args.include_tests:
>              p.filters.append(profile.RegexFilter(args.include_tests))
>  
> -    profile.run(profiles, args.log_level, backend, args.concurrency)
> +    profile.run(profiles, args.log_level, backend, args.concurrency, args.jobs)
>  
>      time_elapsed.end = time.time()
>      backend.finalize({'time_elapsed': time_elapsed.to_json()})
>  
>      print('Thank you for running Piglit!\n'
>            'Results have been written to ' + args.results_path)
>  
>  
>  @exceptions.handler
>  def resume(input_):
> @@ -382,29 +391,38 @@ def resume(input_):
>                          help="Path to results folder")
>      parser.add_argument("-f", "--config",
>                          dest="config_file",
>                          type=argparse.FileType("r"),
>                          help="Optionally specify a piglit config file to use. "
>                               "Default is piglit.conf")
>      parser.add_argument("-n", "--no-retry",
>                          dest="no_retry",
>                          action="store_true",
>                          help="Do not retry incomplete tests")
> +    parser.add_argument('-j', '--jobs',
> +                        dest='jobs',
> +                        action='store',
> +                        type=int,
> +                        default=core.PIGLIT_CONFIG.safe_get(
> +                            'core', 'jobs', None),
> +                        help='Set the maximum number of jobs to run concurrently. '
> +                             'By default, the reported number of CPUs is used.')
>      args = parser.parse_args(input_)
>      _disable_windows_exception_messages()
>  
>      results = backends.load(args.results_path)
>      options.OPTIONS.execute = results.options['execute']
>      options.OPTIONS.valgrind = results.options['valgrind']
>      options.OPTIONS.sync = results.options['sync']
>      options.OPTIONS.deqp_mustpass = results.options['deqp_mustpass']
>      options.OPTIONS.process_isolation = results.options['process_isolation']
> +    options.OPTIONS.jobs = args.jobs
>  
>      core.get_config(args.config_file)
>  
>      options.OPTIONS.env['PIGLIT_PLATFORM'] = results.options['platform']
>  
>      results.options['env'] = core.collect_system_info()
>      results.options['name'] = results.name
>  
>      # Resume only works with the JSON backend
>      backend = backends.get_backend('json')(
> @@ -446,19 +464,20 @@ def resume(input_):
>  
>          if results.options['forced_test_list']:
>              p.forced_test_list = results.options['forced_test_list']
>  
>      # This is resumed, don't bother with time since it won't be accurate anyway
>      try:
>          profile.run(
>              profiles,
>              results.options['log_level'],
>              backend,
> -            results.options['concurrent'])
> +            results.options['concurrent'],
> +            args.jobs)
>      except exceptions.PiglitUserError as e:
>          if str(e) != 'no matching tests':
>              raise
>  
>      backend.finalize()
>  
>      print("Thank you for running Piglit!\n"
>            "Results have been written to {0}".format(args.results_path))
> -- 
> 2.17.0
> 
> _______________________________________________
> Piglit mailing list
> Piglit@lists.freedesktop.org
> https://lists.freedesktop.org/mailman/listinfo/piglit