maven - java.lang.ClassNotFoundException: TopologyMain -
i trying submit simple word count topology local storm cluster. first, tried using maven , using storm command line client. created jar file using eclipse. but, throws main class not found exception. can tell me problem? attaching code , exception below.
package com.test.newpackage; import com.test.newpackage.wordreader; import backtype.storm.config; import backtype.storm.localcluster; import backtype.storm.topology.topologybuilder; import backtype.storm.tuple.fields; import com.test.newpackage.wordcounter; import com.test.newpackage.wordnormalizer; public class topologymain { public static void main(string[] args) throws interruptedexception { // topology definition topologybuilder builder = new topologybuilder(); builder.setspout("word-reader", new wordreader()); builder.setbolt("word-normalizer", new wordnormalizer()) .shufflegrouping("word-reader"); builder.setbolt("word-counter", new wordcounter(), 2).fieldsgrouping( "word-normalizer", new fields("word")); // configuration config conf = new config(); conf.put("wordsfile", args[0]); conf.setdebug(false); // topology run conf.put(config.topology_max_spout_pending, 1); localcluster cluster = new localcluster(); cluster.submittopology("getting-started-toplogie", conf, builder.createtopology()); thread.sleep(1000); cluster.shutdown(); } } package com.test.newpackage; import java.util.hashmap; import java.util.map; import backtype.storm.task.outputcollector; import backtype.storm.task.topologycontext; import backtype.storm.topology.irichbolt; import backtype.storm.topology.outputfieldsdeclarer; import backtype.storm.tuple.tuple; public class wordcounter implements irichbolt { integer id; string name; map<string, integer> counters; private outputcollector collector; /** * @ end of spout (when cluster shutdown show * word counters */ @override public void cleanup() { system.out.println("-- word counter [" + name + "-" + id + "] --"); (map.entry<string, integer> entry : counters.entryset()) { system.out.println(entry.getkey() + ": " + entry.getvalue()); } } /** * on each word count */ @override public void execute(tuple input) { string str = input.getstring(0); /** * if word dosn't exist in map create this, if not * add 1 */ if (!counters.containskey(str)) { counters.put(str, 1); } else { integer c = counters.get(str) + 1; counters.put(str, c); } // set tuple acknowledge collector.ack(input); } /** * on create */ @override public void prepare(map stormconf, topologycontext context, outputcollector collector) { this.counters = new hashmap<string, integer>(); this.collector = collector; this.name = context.getthiscomponentid(); this.id = context.getthistaskid(); } @override public void declareoutputfields(outputfieldsdeclarer declarer) { } @override public map<string, object> getcomponentconfiguration() { // todo auto-generated method stub return null; } } package com.test.newpackage; import java.util.arraylist; import java.util.list; import java.util.map; import backtype.storm.task.outputcollector; import backtype.storm.task.topologycontext; import backtype.storm.topology.irichbolt; import backtype.storm.topology.outputfieldsdeclarer; import backtype.storm.tuple.fields; import backtype.storm.tuple.tuple; import backtype.storm.tuple.values; public class wordnormalizer implements irichbolt { private outputcollector collector; public void cleanup() { } /** * bolt receive line words file , process * normalize line * * normalize put words in lower case , split line * words in */ public void execute(tuple input) { string sentence = input.getstring(0); string[] words = sentence.split(" "); (string word : words) { word = word.trim(); if (!word.isempty()) { word = word.tolowercase(); // emit word list = new arraylist(); a.add(input); collector.emit(a, new values(word)); } } // acknowledge tuple collector.ack(input); } public void prepare(map stormconf, topologycontext context, outputcollector collector) { this.collector = collector; } /** * bolt emit field "word" */ public void declareoutputfields(outputfieldsdeclarer declarer) { declarer.declare(new fields("word")); } @override public map<string, object> getcomponentconfiguration() { // todo auto-generated method stub return null; } } package com.test.newpackage; import java.io.bufferedreader; import java.io.filenotfoundexception; import java.io.filereader; import java.util.map; import backtype.storm.spout.spoutoutputcollector; import backtype.storm.task.topologycontext; import backtype.storm.topology.irichspout; import backtype.storm.topology.outputfieldsdeclarer; import backtype.storm.tuple.fields; import backtype.storm.tuple.values; public class wordreader implements irichspout { private spoutoutputcollector collector; private filereader filereader; private boolean completed = false; private topologycontext context; public boolean isdistributed() { return false; } public void ack(object msgid) { system.out.println("ok:" + msgid); } public void close() { } public void fail(object msgid) { system.out.println("fail:" + msgid); } /** * thing methods emit each file line */ public void nexttuple() { /** * nextuple called forever, if have been readed file * wait , return */ if (completed) { try { thread.sleep(1000); } catch (interruptedexception e) { // nothing } return; } string str; // open reader bufferedreader reader = new bufferedreader(filereader); try { // read lines while ((str = reader.readline()) != null) { /** * each line emmit new value line */ this.collector.emit(new values(str), str); } } catch (exception e) { throw new runtimeexception("error reading tuple", e); } { completed = true; } } /** * create file , collector object */ public void open(map conf, topologycontext context, spoutoutputcollector collector) { try { this.context = context; this.filereader = new filereader(conf.get("wordsfile").tostring()); } catch (filenotfoundexception e) { throw new runtimeexception("error reading file[" + conf.get("wordfile") + "]"); } this.collector = collector; } /** * declare output field "word" */ public void declareoutputfields(outputfieldsdeclarer declarer) { declarer.declare(new fields("line")); } @override public void activate() { // todo auto-generated method stub } @override public void deactivate() { // todo auto-generated method stub } @override public map<string, object> getcomponentconfiguration() { // todo auto-generated method stub return null; } } mvn exec:java -dexec.mainclass="topologymain" -dexec.args="resources/ words.txt" storm jar topologymain.jar topologymain wordcount exception in thread "main" java.lang.noclassdeffounderror: topologymain caused by: java.lang.classnotfoundexception: topologymain @ java.net.urlclassloader$1.run(urlclassloader.java:217) @ java.security.accesscontroller.doprivileged(native method) @ java.net.urlclassloader.findclass(urlclassloader.java:205) @ java.lang.classloader.loadclass(classloader.java:321) @ sun.misc.launcher$appclassloader.loadclass(launcher.java:294) @ java.lang.classloader.loadclass(classloader.java:266) not find main class: topologymain. program exit. edit exception:
backtype.storm.daemon.nimbus - activating word-count: word-count-1-1374756437 1724 [main] error org.apache.zookeeper.server.nioservercnxn - thread thread[main,5,main] died java.lang.nullpointerexception @ clojure.lang.numbers.ops(numbers.java:942) @ clojure.lang.numbers.ispos(numbers.java:94) @ clojure.core$take$fn__4112.invoke(core.clj:2500) @ clojure.lang.lazyseq.sval(lazyseq.java:42) @ clojure.lang.lazyseq.seq(lazyseq.java:60) @ clojure.lang.rt.seq(rt.java:473) @ clojure.core$seq.invoke(core.clj:133) @ clojure.core$concat$fn__3804.invoke(core.clj:662) @ clojure.lang.lazyseq.sval(lazyseq.java:42) @ clojure.lang.lazyseq.seq(lazyseq.java:60) @ clojure.lang.rt.seq(rt.java:473) @ clojure.core$seq.invoke(core.clj:133) @ clojure.core$concat$cat__3806$fn__3807.invoke(core.clj:671) @ clojure.lang.lazyseq.sval(lazyseq.java:42) @ clojure.lang.lazyseq.seq(lazyseq.java:60) @ clojure.lang.rt.seq(rt.java:473) @ clojure.core$seq.invoke(core.clj:133) @ clojure.core$map$fn__4091.invoke(core.clj:2437) @ clojure.lang.lazyseq.sval(lazyseq.java:42) @ clojure.lang.lazyseq.seq(lazyseq.java:60) @ clojure.lang.rt.seq(rt.java:473) @ clojure.core$seq.invoke(core.clj:133) @ clojure.core.protocols$seq_reduce.invoke(protocols.clj:30) @ clojure.core.protocols$fn__5875.invoke(protocols.clj:54) @ clojure.core.protocols$fn__5828$g__5823__5841.invoke(protocols.clj:13) @ clojure.core$reduce.invoke(core.clj:6030) @ clojure.core$into.invoke(core.clj:6077) @ backtype.storm.daemon.common$storm_task_info.invoke(common.clj:245) @ backtype.storm.daemon.nimbus$compute_executors.invoke(nimbus.clj:408) @ backtype.storm.daemon.nimbus$compute_executor__gt_component.invoke(nimbus.clj:420) @ backtype.storm.daemon.nimbus$read_topology_details.invoke(nimbus.clj:315) @ backtype.storm.daemon.nimbus$mk_assignments$iter__3398__3402$fn__3403.invoke(nimbus.clj:626) @ clojure.lang.lazyseq.sval(lazyseq.java:42) @ clojure.lang.lazyseq.seq(lazyseq.java:60) @ clojure.lang.rt.seq(rt.java:473) @ clojure.core$seq.invoke(core.clj:133) @ clojure.core.protocols$seq_reduce.invoke(protocols.clj:30) @ clojure.core.protocols$fn__5875.invoke(protocols.clj:54) @ clojure.core.protocols$fn__5828$g__5823__5841.invoke(protocols.clj:13) @ clojure.core$reduce.invoke(core.clj:6030) @ clojure.core$into.invoke(core.clj:6077) @ backtype.storm.daemon.nimbus$mk_assignments.doinvoke(nimbus.clj:625) @ clojure.lang.restfn.invoke(restfn.java:410) @ backtype.storm.daemon.nimbus$fn__3590$exec_fn__1207__auto__$reify__3603.submittopology(nimbus.clj:898) @ sun.reflect.nativemethodaccessorimpl.invoke0(native method) @ sun.reflect.nativemethodaccessorimpl.invoke(nativemethodaccessorimpl.java:57) @ sun.reflect.delegatingmethodaccessorimpl.invoke(delegatingmethodaccessorimpl.java:43) @ java.lang.reflect.method.invoke(method.java:616) @ clojure.lang.reflector.invokematchingmethod(reflector.java:93) @ clojure.lang.reflector.invokeinstancemethod(reflector.java:28) @ backtype.storm.testing$submit_local_topology.invoke(testing.clj:227) @ backtype.storm.localcluster$_submittopology.invoke(localcluster.clj:19) @ backtype.storm.localcluster.submittopology(unknown source) @ storm.starter.wordcounttopology.main(wordcounttopology.java:83) 11769 [thread-2] error backtype.storm.daemon.nimbus - error when processing event java.lang.nullpointerexception @ clojure.lang.numbers.ops(numbers.java:942) @ clojure.lang.numbers.ispos(numbers.java:94) @ clojure.core$take$fn__4112.invoke(core.clj:2500) @ clojure.lang.lazyseq.sval(lazyseq.java:42) @ clojure.lang.lazyseq.seq(lazyseq.java:60) @ clojure.lang.rt.seq(rt.java:473) @ clojure.core$seq.invoke(core.clj:133) @ clojure.core$concat$fn__3804.invoke(core.clj:662) @ clojure.lang.lazyseq.sval(lazyseq.java:42) @ clojure.lang.lazyseq.seq(lazyseq.java:60) @ clojure.lang.rt.seq(rt.java:473) @ clojure.core$seq.invoke(core.clj:133) @ clojure.core$concat$cat__3806$fn__3807.invoke(core.clj:671) @ clojure.lang.lazyseq.sval(lazyseq.java:42) @ clojure.lang.lazyseq.seq(lazyseq.java:60) @ clojure.lang.rt.seq(rt.java:473) @ clojure.core$seq.invoke(core.clj:133) @ clojure.core$map$fn__4091.invoke(core.clj:2437) @ clojure.lang.lazyseq.sval(lazyseq.java:42) @ clojure.lang.lazyseq.seq(lazyseq.java:60) @ clojure.lang.rt.seq(rt.java:473) @ clojure.core$seq.invoke(core.clj:133) @ clojure.core.protocols$seq_reduce.invoke(protocols.clj:30) @ clojure.core.protocols$fn__5875.invoke(protocols.clj:54) @ clojure.core.protocols$fn__5828$g__5823__5841.invoke(protocols.clj:13) @ clojure.core$reduce.invoke(core.clj:6030) @ clojure.core$into.invoke(core.clj:6077) @ backtype.storm.daemon.common$storm_task_info.invoke(common.clj:245) @ backtype.storm.daemon.nimbus$compute_executors.invoke(nimbus.clj:408) @ backtype.storm.daemon.nimbus$compute_executor__gt_component.invoke(nimbus.clj:420) @ backtype.storm.daemon.nimbus$read_topology_details.invoke(nimbus.clj:315) @ backtype.storm.daemon.nimbus$mk_assignments$iter__3398__3402$fn__3403.invoke(nimbus.clj:626) @ clojure.lang.lazyseq.sval(lazyseq.java:42) @ clojure.lang.lazyseq.seq(lazyseq.java:60) @ clojure.lang.rt.seq(rt.java:473) @ clojure.core$seq.invoke(core.clj:133) @ clojure.core.protocols$seq_reduce.invoke(protocols.clj:30) @ clojure.core.protocols$fn__5875.invoke(protocols.clj:54) @ clojure.core.protocols$fn__5828$g__5823__5841.invoke(protocols.clj:13) @ clojure.core$reduce.invoke(core.clj:6030) @ clojure.core$into.invoke(core.clj:6077) @ backtype.storm.daemon.nimbus$mk_assignments.doinvoke(nimbus.clj:625) @ clojure.lang.restfn.invoke(restfn.java:410) @ backtype.storm.daemon.nimbus$fn__3590$exec_fn__1207__auto____3591$fn__3596$fn__3597.invoke(nimbus.clj:860) @ backtype.storm.daemon.nimbus$fn__3590$exec_fn__1207__auto____3591$fn__3596.invoke(nimbus.clj:859) @ backtype.storm.timer$schedule_recurring$this__1753.invoke(timer.clj:69) @ backtype.storm.timer$mk_timer$fn__1736$fn__1737.invoke(timer.clj:33) @ backtype.storm.timer$mk_timer$fn__1736.invoke(timer.clj:26) @ clojure.lang.afn.run(afn.java:24) @ java.lang.thread.run(thread.java:679) 11774 [thread-2] info backtype.storm.util - halting process: ("error when processing event")
naresh, based on see solution problem may lie in class name used. here command line argument specified:
storm jar topologymain.jar topologymain wordcount you refer main class "topologymain" using class name rather qualified name. here revision of storm command line attempt:
storm jar topologymain.jar com.test.newpackage.topologymain i use full package name instead of class itself. note removed reference "wordcount" because don't know it's doing there (there no classes, methods, or variables called "wordcount" in code sample).
here excellent article on google groups covers setup problems storm: early setup question
i found myself using article first couple of weeks when started using storm.
please let know if solves problem.
Comments
Post a Comment