一共运行N个job,其中M个随时并行运行

Run a total of N jobs, having M of them running in parallel at any time

我有很多作业要运行,假设是 100 个。它们可以并行运行,但每个都占用大量内存,所以我只能同时运行 8 个。

我目前有这个 shell 脚本:

(

(python run.py $arg1 &)

(python run.py $arg2 &)

(python run.py $arg3 &)

(python run.py $arg4 &)

(python run.py $arg5 &)

(python run.py $arg6 &)

(python run.py $arg7 &)

(python run.py $arg8 &)

) 2>&1 | cat -u



(

(python run.py $arg9 &)

(python run.py $arg10 &)

(python run.py $arg11 &)

(python run.py $arg12 &)

(python run.py $arg13 &)

(python run.py $arg14 &)

(python run.py $arg15 &)

(python run.py $arg16 &)

) 2>&1 | cat -u



...parallel --max-procs 8 <<EOF

 python run.py $arg1 

 python run.py $arg2 

 python run.py $arg3

 ..

EOF
cat args.list | parallel --max-procs 8 python run.py

import multiprocessing



def main(args):

 # Here's where you would do what you usually do with the arguments



pool = multiprocessing.Pool(processes=8)

pool.map(main, sys.argv[1:], chunksize=1)

pool.close()

pool.join()

这有运行第一批8个的效果,当它们都完成后,它开始下一批8个。问题是每个作业的运行时间不是恒定的,有些比其他作业更早完成,因此,对于要完成的每批 8 个进行加权并不是最佳选择,因为我实际上是在等待 8 个中最慢的一个完成。

相反,我想要一个脚本(shell 或 python)来运行我所有的 100 个作业,在任何给定时间并行运行 8 个作业,以实现最佳效率。

有什么想法可以实现吗?


您可以编写自己的小型调度程序,将它们分配给已完成当前任务的处理器;但在我们的中心,我们强烈建议使用 gnu 并行,它已经使用类似 xargs 的语法实现了这一点。

例如,如上所述,您可以这样做

(

(python run.py $arg1 &)

(python run.py $arg2 &)

(python run.py $arg3 &)

(python run.py $arg4 &)

(python run.py $arg5 &)

(python run.py $arg6 &)

(python run.py $arg7 &)

(python run.py $arg8 &)

) 2>&1 | cat -u



(

(python run.py $arg9 &)

(python run.py $arg10 &)

(python run.py $arg11 &)

(python run.py $arg12 &)

(python run.py $arg13 &)

(python run.py $arg14 &)

(python run.py $arg15 &)

(python run.py $arg16 &)

) 2>&1 | cat -u



...parallel --max-procs 8 <<EOF

 python run.py $arg1 

 python run.py $arg2 

 python run.py $arg3

 ..

EOF
cat args.list | parallel --max-procs 8 python run.py

import multiprocessing



def main(args):

 # Here's where you would do what you usually do with the arguments



pool = multiprocessing.Pool(processes=8)

pool.map(main, sys.argv[1:], chunksize=1)

pool.close()

pool.join()

或者,如果你在一个文件中有你的参数列表,你可以做类似

(

(python run.py $arg1 &)

(python run.py $arg2 &)

(python run.py $arg3 &)

(python run.py $arg4 &)

(python run.py $arg5 &)

(python run.py $arg6 &)

(python run.py $arg7 &)

(python run.py $arg8 &)

) 2>&1 | cat -u



(

(python run.py $arg9 &)

(python run.py $arg10 &)

(python run.py $arg11 &)

(python run.py $arg12 &)

(python run.py $arg13 &)

(python run.py $arg14 &)

(python run.py $arg15 &)

(python run.py $arg16 &)

) 2>&1 | cat -u



...parallel --max-procs 8 <<EOF

 python run.py $arg1 

 python run.py $arg2 

 python run.py $arg3

 ..

EOF
cat args.list | parallel --max-procs 8 python run.py

import multiprocessing



def main(args):

 # Here's where you would do what you usually do with the arguments



pool = multiprocessing.Pool(processes=8)

pool.map(main, sys.argv[1:], chunksize=1)

pool.close()

pool.join()

根据您的需要,您可以使用许多工具。最简单的可能是使用 GNU parallelmake 可以与其 -j 开关并行运行任务。如果您尝试运行的任务更加复杂和多样化,那么真正的排队系统可能会有所帮助,例如队列博士。还有更多工具,GNU parallel\\ 的手册页很好地列出了它们。


在我看来,您正在寻找 multiprocessing 模块,特别是 multiprocessing.Pool

如果我这样做,我会将所有不同的参数集提供给 run.py,将您现在在顶层执行的操作package在 main(args) 函数中的 run.py 中,然后使用 Pool\\'s map 方法在所有不同的参数集上调用该方法。

它可能看起来像这样:

(

(python run.py $arg1 &)

(python run.py $arg2 &)

(python run.py $arg3 &)

(python run.py $arg4 &)

(python run.py $arg5 &)

(python run.py $arg6 &)

(python run.py $arg7 &)

(python run.py $arg8 &)

) 2>&1 | cat -u



(

(python run.py $arg9 &)

(python run.py $arg10 &)

(python run.py $arg11 &)

(python run.py $arg12 &)

(python run.py $arg13 &)

(python run.py $arg14 &)

(python run.py $arg15 &)

(python run.py $arg16 &)

) 2>&1 | cat -u



...parallel --max-procs 8 <<EOF

 python run.py $arg1 

 python run.py $arg2 

 python run.py $arg3

 ..

EOF
cat args.list | parallel --max-procs 8 python run.py

import multiprocessing



def main(args):

 # Here's where you would do what you usually do with the arguments



pool = multiprocessing.Pool(processes=8)

pool.map(main, sys.argv[1:], chunksize=1)

pool.close()

pool.join()

请注意,这假定每次运行的参数都可以保存在一个字符串中(因此 sys.argv 的一个条目)。


为什么不使用 DataStage 自己的 Workload Management?定义 M 个队列并将您的 N 个作业循环分配到这些队列中。


相关推荐

  • Spring部署设置openshift

    Springdeploymentsettingsopenshift我有一个问题让我抓狂了三天。我根据OpenShift帐户上的教程部署了spring-eap6-quickstart代码。我已配置调试选项,并且已将Eclipse工作区与OpehShift服务器同步-服务器上的一切工作正常,但在Eclipse中出现无法消除的错误。我有这个错误:cvc-complex-type.2.4.a:Invali…
    2025-04-161
  • 检查Java中正则表达式中模式的第n次出现

    CheckfornthoccurrenceofpatterninregularexpressioninJava本问题已经有最佳答案,请猛点这里访问。我想使用Java正则表达式检查输入字符串中特定模式的第n次出现。你能建议怎么做吗?这应该可以工作:MatchResultfindNthOccurance(intn,Patternp,CharSequencesrc){Matcherm=p.matcher…
    2025-04-161
  • 如何让 JTable 停留在已编辑的单元格上

    HowtohaveJTablestayingontheeditedcell如果有人编辑JTable的单元格内容并按Enter,则内容会被修改并且表格选择会移动到下一行。是否可以禁止JTable在单元格编辑后转到下一行?原因是我的程序使用ListSelectionListener在单元格选择上同步了其他一些小部件,并且我不想在编辑当前单元格后选择下一行。Enter的默认绑定是名为selectNext…
    2025-04-161
  • Weblogic 12c 部署

    Weblogic12cdeploy我正在尝试将我的应用程序从Tomcat迁移到Weblogic12.2.1.3.0。我能够毫无错误地部署应用程序,但我遇到了与持久性提供程序相关的运行时错误。这是堆栈跟踪:javax.validation.ValidationException:CalltoTraversableResolver.isReachable()threwanexceptionatorg.…
    2025-04-161
  • Resteasy Content-Type 默认值

    ResteasyContent-Typedefaults我正在使用Resteasy编写一个可以返回JSON和XML的应用程序,但可以选择默认为XML。这是我的方法:@GET@Path("/content")@Produces({MediaType.APPLICATION_XML,MediaType.APPLICATION_JSON})publicStringcontentListRequestXm…
    2025-04-161
  • 代码不会停止运行,在 Java 中

    thecodedoesn'tstoprunning,inJava我正在用Java解决项目Euler中的问题10,即"Thesumoftheprimesbelow10is2+3+5+7=17.Findthesumofalltheprimesbelowtwomillion."我的代码是packageprojecteuler_1;importjava.math.BigInteger;importjava…
    2025-04-161
  • Out of memory java heap space

    Outofmemoryjavaheapspace我正在尝试将大量文件从服务器发送到多个客户端。当我尝试发送大小为700mb的文件时,它显示了"OutOfMemoryjavaheapspace"错误。我正在使用Netbeans7.1.2版本。我还在属性中尝试了VMoption。但仍然发生同样的错误。我认为阅读整个文件存在一些问题。下面的代码最多可用于300mb。请给我一些建议。提前致谢publicc…
    2025-04-161
  • Log4j 记录到共享日志文件

    Log4jLoggingtoaSharedLogFile有没有办法将log4j日志记录事件写入也被其他应用程序写入的日志文件。其他应用程序可以是非Java应用程序。有什么缺点?锁定问题?格式化?Log4j有一个SocketAppender,它将向服务发送事件,您可以自己实现或使用与Log4j捆绑的简单实现。它还支持syslogd和Windows事件日志,这对于尝试将日志输出与来自非Java应用程序…
    2025-04-161