java - Camel: stop the route when the jdbc connection loss is detected -
i have application following route:
from("netty:tcp://localhost:5150?sync=false&keepalive=true") .routeid("tcp.input") .transform() .simple("insert tamponems (avis) values (\"${in.body}\");") .to("jdbc:mydb");
this route receives new message every 59 millisecondes. want stop route when connection database lost, before second message arrives. , mainly, want never lose message.
i proceeded way:
i added errorhandler
:
errorhandler(deadletterchannel("direct:backup") .redeliverydelay(5l) .maximumredeliveries(1) .retryattemptedloglevel(logginglevel.warn) .logexhausted(false));
my errorhandler
tries redeliver message , if fails again, redirects message deadletterchannel
.
the following deadletterchannel stop tcp.input
route , try redeliver message database:
routepolicy policy = new stoproutepolicy(); from("direct:backup") .routepolicy(policy) .errorhandler( defaulterrorhandler() .redeliverydelay(1000l) .maximumredeliveries(-1) .retryattemptedloglevel(logginglevel.error) ) .to("jdbc:mydb");
here code of routepolicy
:
public class stoproutepolicy extends routepolicysupport { private static final logger log = loggerfactory.getlogger(string.class); @override public void onexchangedone(route route, exchange exchange) { string stop = "tcp.input"; camelcontext context = exchange.getcontext(); if (context.getroutestatus(stop) != null && context.getroutestatus(stop).isstarted()) { try { exchange.getcontext().getinflightrepository().remove(exchange); log.info("stop route: {}", stop); context.stoproute(stop); } catch (exception e) { getexceptionhandler().handleexception(e); } } } }
my problems method are:
- in
"direct:backup"
route, if setmaximumredeliveries
-1 routetcp.input
never stop - i'm loosing messages during stop
- this method detecting connection loss , stopping route long
please, have idea make faster or make differently in order not lose message?
i have found way resolve problems. in order make application faster, added asynchronous processes , multithreading seda.
from("netty:tcp://localhost:5150?sync=false&keepalive=true").to("seda:break"); from("seda:break").threads(5) .routeid("tcp.input") .transform() .simple("insert tamponems (avis) values (\"${in.body}\");") .to("jdbc:mydb");
i did same backup route.
from("seda:backup") .routepolicy(policy) .errorhandler( defaulterrorhandler() .redeliverydelay(1000l) .maximumredeliveries(-1) .retryattemptedloglevel(logginglevel.error) ).threads(2).to("jdbc:mydb");
and modified routepolicy that:
public class stoproutepolicy extends routepolicysupport { private static final logger log = loggerfactory.getlogger(string.class); @override public void onexchangebegin(route route, exchange exchange) { string stop = "tcp.input"; camelcontext context = exchange.getcontext(); if (context.getroutestatus(stop) != null && context.getroutestatus(stop).isstarted()) { try { exchange.getcontext().getinflightrepository().remove(exchange); log.info("stop route: {}", stop); context.stoproute(stop); } catch (exception e) { getexceptionhandler().handleexception(e); } } } @override public void onexchangedone(route route, exchange exchange) { string stop = "tcp.input"; camelcontext context = exchange.getcontext(); if (context.getroutestatus(stop) != null && context.getroutestatus(stop).isstopped()) { try { log.info("restart route: {}", stop); context.startroute(stop); } catch (exception e) { getexceptionhandler().handleexception(e); } } } }
with these updates, tcp route stopped before backup route processed. , route restarted when jdbc connection back.
now, camel, the application able handle database failure without losing message , without manual intervention.
i hope you.
Comments
Post a Comment