spring - Java RabbitMQ + AMQP blocking producers for some period(Locking) -


issue: have 2 or 3 instances of application. each instance has producer , consumer. have schedule process , use common spring scheduler. scheduler produces messages , throws them "broker" (rabbitmq). in our case process same data 2 or 3 times because each instance throws message. how block producer of instances until first producer throw message?

configuration:

<!-- rabbitmq configuration --> <rabbit:connection-factory         id="connection" host="${rabbit.host}" port="${rabbit.port}"      username="${rabbit.username}" password="${rabbit.password}"         channel-cache-size="${rabbit.publishercachesize}" virtual-host="${rabbit.virtualhost}" />  <!-- declare executor pool worker threads --> <!-- ensure pool-size greater sum of number of concurrent consumers rabbit use pool ensure      have enough threads maximum concurrency. ensuring 1 plus size of connection factory cache      size consumers --> <task:executor id="worker-pool" keep-alive="60" pool-size="${rabbit.consumerchannelcachesize}" queue-capacity="1000" rejection-policy="caller_runs"/>   <!-- message converter --> <bean id="basemessageconverter" class="org.springframework.oxm.jaxb.jaxb2marshaller">     <property name="classestobebound" value="com.company.model.scraper"/> </bean>  <bean id="messageconverter" class="org.springframework.amqp.support.converter.marshallingmessageconverter">     <constructor-arg index="0" ref="basemessageconverter"/> </bean>   <!-- *********************************producer*********************************** --> <!-- outbound company events --> <int:channel id="producerchannelcompany"/> <int:gateway id="jobcompanycompleteeventgateway" service-interface="com.company.eventing.companyeventpublisher"              default-request-channel="producerchannelcompany"              default-request-timeout="2000"              error-channel="errors"/>  <amqp:outbound-channel-adapter id="companyevents.amqpadapter" channel="producerchannelcompany"                                exchange-name="${rabbit.queue.topic}"                                routing-key="${rabbit.queue.routing.key}"                                amqp-template="psrabbittemplate"/>  <rabbit:admin id="psrabbitadmin" connection-factory="connection" /> <rabbit:template id="psrabbittemplate" channel-transacted="${rabbit.channeltransacted}" encoding="utf-8" message-converter="messageconverter" connection-factory="connection"/> <rabbit:topic-exchange id="ps.topic" name="${rabbit.queue.topic}" durable="true" auto-delete="false"/>    <!-- *********************************consumer*********************************** --> <rabbit:queue id="ps.queue" name="${rabbit.queue}"  auto-delete="false" durable="true" exclusive="false"  />   <!-- exchange queue binding --> <rabbit:topic-exchange id="ps.topic" name="${rabbit.queue.topic}" durable="true" auto-delete="false" >     <rabbit:bindings>         <rabbit:binding queue="${rabbit.queue}" pattern="${rabbit.queue.pattern}"></rabbit:binding>     </rabbit:bindings> </rabbit:topic-exchange>  <!-- configuration consuming company complete events --> <amqp:inbound-channel-adapter id="companyadapter"                               channel="companycompleteeventchannel"                               queue-names="${rabbit.queue}"                               channel-transacted="${rabbit.channeltransacted}"                               prefetch-count="${rabbit.prefetchcount}"                               concurrent-consumers="${rabbit.concurrentconsumers}"                               connection-factory="connection"                               message-converter="messageconverter"                               task-executor="worker-pool"                               error-channel="errors"/>    <int:channel id="companycompleteeventchannel"/> <int:service-activator id="companycompleteactivator" input-channel="companycompleteeventchannel"                        ref="companyeventhandler" method="runscraper"/>  <bean id="jvmlauncher" class="com.app.company.jvm.jvmlauncher" /> <!-- company event handler --> <bean id="companyeventhandler" class="com.app.company.eventing.consumer.companyeventhandler" depends-on="jvmlauncher">     <!--<property name="scripthelper" ref="scripthelper"/>-->     <property name="jvmlauncher" ref="jvmlauncher" />     <property name="defaultmemoryoptions" value="${company.memory.opts}"/>     <property name="defaultmemoryregex" value="${company.memory.regex}"/> </bean>   <!-- errors --> <int:channel id="errors"/> <int:service-activator id="pserrorlogger" input-channel="errors" ref="pslogginghandler"/>  <bean id="pslogginghandler" class="org.springframework.integration.handler.logginghandler">     <constructor-arg index="0" value="debug"></constructor-arg>     <!-- <property name="loggername" value="com.app.travelerpayments.logginghandler"/> --> </bean> 

it's not clear architecture have, if instances consume messages same queue, each message consumed once (unless requeued consumer). best way use amqp power in situation, guess. , if missed something, please clarify question.

with a-la fanout messages delivery, when each instance has own queue own messages stack , want controls messages delivery own (definitely, it's bad idea in situations), why not let instances listen personal queue(s) bounded fanout exchange , use exchange control messages. can tell instances when stop or start consuming, flush queues, schedule restart, etc.

note, can use topic exchange , bind queues specific routing key, "control.*"

the idea send who free request, pick random free worker , send payload it. can use specific routing key or publish payload default exchange routing key same queue name (by default queues bounded default exchange routing key same queue name, see section default exchange in rabbitmq docs).


Comments

Popular posts from this blog

How to mention the localhost in android -

php - Calling a template part from a post -

c# - String.format() DateTime With Arabic culture -