storm客户端提交topology失败:
java.lang.RuntimeException: org.apache.thrift7.transport.TTransportException at backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:141) at backtype.storm.StormSubmitter.submitTopologyWithProgressBar(StormSubmitter.java:176) at backtype.storm.StormSubmitter.submitTopologyWithProgressBar(StormSubmitter.java:158) at cn.com.tiza.dataquality.service.service.JobService.start(JobService.java:93) at cn.com.tiza.dataquality.service.service.JobService.submit(JobService.java:149) at cn.com.tiza.dataquality.service.resource.JobResource.execute(JobResource.java:33) at sun.reflect.GeneratedMethodAccessor56.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.glassfish.jersey.server.model.internal.ResourceMethodInvocationHandlerFactory$1.invoke(ResourceMethodInvocationHandlerFactory.java:81) at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher$1.run(AbstractJavaResourceMethodDispatcher.java:144) at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.invoke(AbstractJavaResourceMethodDispatcher.java:161) at org.glassfish.jersey.server.model.internal.JavaResourceMethodDispatcherProvider$ResponseOutInvoker.doDispatch(JavaResourceMethodDispatcherProvider.java:160) at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.dispatch(AbstractJavaResourceMethodDispatcher.java:99) at org.glassfish.jersey.server.model.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:389) at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:347) at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:102) at org.glassfish.jersey.server.ServerRuntime$2.run(ServerRuntime.java:326)
nimbus.log:
2017-11-17T16:12:27.038+0800 b.s.d.nimbus [WARN] Topology submission exception. (topology name='dq23') #2017-11-17T16:12:27.038+0800 o.a.t.s.TNonblockingServer [ERROR] Unexpected exception while invoking!java.lang.IllegalArgumentException: storm-local/nimbus/inbox/stormjar-bc004b80-5f6e-4b94-9505-911511e5cc1f.jar to copy to storm-local/nimbus/stormdist/dq23-94-1510906347 does not exist! at backtype.storm.daemon.nimbus$fn__4364.invoke(nimbus.clj:1173) ~[storm-core-0.9.7.jar:0.9.7] at clojure.lang.MultiFn.invoke(MultiFn.java:236) ~[clojure-1.5.1.jar:na] at backtype.storm.daemon.nimbus$setup_storm_code.invoke(nimbus.clj:307) ~[storm-core-0.9.7.jar:0.9.7] at backtype.storm.daemon.nimbus$fn__4261$exec_fn__1104__auto__$reify__4274.submitTopologyWithOpts(nimbus.clj:953) ~[storm-core-0.9.7.jar:0.9.7] at backtype.storm.daemon.nimbus$fn__4261$exec_fn__1104__auto__$reify__4274.submitTopology(nimbus.clj:966) ~[storm-core-0.9.7.jar:0.9.7] at backtype.storm.generated.Nimbus$Processor$submitTopology.getResult(Nimbus.java:1240) ~[storm-core-0.9.7.jar:0.9.7] at backtype.storm.generated.Nimbus$Processor$submitTopology.getResult(Nimbus.java:1228) ~[storm-core-0.9.7.jar:0.9.7] at org.apache.thrift7.ProcessFunction.process(ProcessFunction.java:32) ~[storm-core-0.9.7.jar:0.9.7] at org.apache.thrift7.TBaseProcessor.process(TBaseProcessor.java:34) ~[storm-core-0.9.7.jar:0.9.7] at org.apache.thrift7.server.TNonblockingServer$FrameBuffer.invoke(TNonblockingServer.java:632) ~[storm-core-0.9.7.jar:0.9.7] at org.apache.thrift7.server.THsHaServer$Invocation.run(THsHaServer.java:201) [storm-core-0.9.7.jar:0.9.7] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_144] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_144] at java.lang.Thread.run(Thread.java:748) [na:1.8.0_144]
经过分析nimbus有个clean-inbox的机制来定时清理inbox中的jar文件,并有两个配置项来设置定时策略:
/** * How often nimbus should wake the cleanup thread to clean the inbox. * @see NIMBUS_INBOX_JAR_EXPIRATION_SECS */public static final String NIMBUS_CLEANUP_INBOX_FREQ_SECS = "nimbus.cleanup.inbox.freq.secs";/** * The length of time a jar file lives in the inbox before being deleted by the cleanup thread. * * Probably keep this value greater than or equal to NIMBUS_CLEANUP_INBOX_JAR_EXPIRATION_SECS. * Note that the time it takes to delete an inbox jar file is going to be somewhat more than * NIMBUS_CLEANUP_INBOX_JAR_EXPIRATION_SECS (depending on how often NIMBUS_CLEANUP_FREQ_SECS * is set to). * @see NIMBUS_CLEANUP_FREQ_SECS */public static final String NIMBUS_INBOX_JAR_EXPIRATION_SECS = "nimbus.inbox.jar.expiration.secs";
NIMBUS_CLEANUP_INBOX_FREQ_SECS: 表示nimbus多久唤醒一次清理线程去进行清理;
NIMBUS_INBOX_JAR_EXPIRATION_SECS:表示jar文件在inbox中存活的时长,在清理线程清理之前如果到期了就会被清理
另一方面,通过storm-core提供的StormSubmitter.submitTopology的方法进行提交任务时,上传jar包的逻辑如下:
private static String submittedJar = null; private static void submitJar(Map conf, ProgressListener listener) { if(submittedJar==null) { LOG.info("Jar not uploaded to master yet. Submitting jar..."); String localJar = System.getProperty("storm.jar"); submittedJar = submitJar(conf, localJar, listener); } else { LOG.info("Jar already uploaded to master. Not submitting jar."); } }
只要客户端进程不停,jar包就只上传一次。
所以等一个小时后,jar会被清除,重新提交任务就找不到inbox中的jar文件。