Java concurrency – Multiple queue monitors

At work, I’ve implemented a queue monitor batch application. Due to business rule changes, it now needs to monitor two queues. Instead of creating another batch application, I really wanted to stick with the same application but just create two threads, each monitoring its own queue.

However, the twist I need is to have the main batch application thread die as soon as either queue monitoring thread dies.

I’ve been searching for a graceful way to handle such a concurrency need in Java. Thank you to Gary Myers, I’ve got a great start on it.

The basic idea is to pass a java blocking queue to both of the threads and if either thread fails, do blockingQueue.offer to indicate so. Then in the main thread, it will check for the blocking queue’s result. blockingQueue.take() blocks/waits for it to return and then continues the execution of the main thread.

Things I learned that made today AMAZING!

  • Put repositories in the pom.xml instead of settings.xml for maven to look through multiple repositories
  • Use SynchronousQueue to ensure only 1 element at a time may exist in the queue
  • Use daemon to make sure the JVM will die even if there is a thread running
  • You can EMBED ActiveMQ using spring so you don’t have install it at all

Below is a simple concurrency example that demonstrates the idea from Gary Myers.

Worker:

public class Worker implements Runnable {
   private BlockingQueue<String> finishedQueue;
   private String result;
   private long sleepTime;
   
   public Worker(BlockingQueue<String> finishedQueue, String result, long sleepTime) {
      this.finishedQueue=finishedQueue;
      this.result=result;
      this.sleepTime=sleepTime;
   }
   
   public void run() {
      try {
         TimeUnit.SECONDS.sleep(sleepTime);
         finishedQueue.offer(result); //you have to use offer to get this queue to work.  It will throw an exception if there is something in the queue.
      } catch(InterruptedException e) {
         e.printStackTrace();
      }
   }
}

DaemonThreadFactory:

public class DaemonThreadFactory implements ThreadFactory {
   private AtomicInteger counter;
   
   public DaemonThreadFactory() {
      this.counter=new AtomicInteger(0);
   }
   
   public Thread newThread(Runnable r) {
      Thread thread=new Thread(r);
      
      //if you wanted you can make this class generic by having the constructor take arguments that can be used to configure the following
      thread.setDaemon(true); //need it to be daemon so the JVM will die even if there is a thread running
      thread.setName("Daemon Thread: " + counter.incrementAndGet()); //you don't have to give it a name, but I always do.
      
      return thread;
   }
}

Test the threads in a simple example:

public class Main {
   public static void main(String[] args) throws InterruptedException {
      ExecutorService service=Executors.newFixedThreadPool(2, new DaemonThreadFactory());
      SynchronousQueue<String> queue=new SynchronousQueue<String>(); //this queue can hold 1 element at a time, so basically the first thread to finish will be the one to successfully put the element in the queue
      //create the runnables before hand so that extra time isn't spent instantiating the runnables at submission time.
      Runnable runnable1=new Worker(queue, "Runnable 1", 4);
      Runnable runnable2=new Worker(queue, "Runnable 2", 3);
      
      service.execute(runnable1);
      service.execute(runnable2);
      
      System.out.println("before queue");
      System.out.println(queue.take());
      System.out.println("after queue");
      
      service.shutdown(); 
   }
}

I will not go into the details of the code I added for queue monitoring as they are a lot more involved.

The coolest thing I learned about ActiveMQ is how you can embed it using the following spring configuration:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:p="http://www.springframework.org/schema/p"
       xmlns:jms="http://www.springframework.org/schema/jms"
       xmlns:amq="http://activemq.apache.org/schema/core"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
       				http://www.springframework.org/schema/jms http://www.springframework.org/schema/context/spring-jms-3.0.xsd
					http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
 
    <!--  Embedded ActiveMQ Broker -->
    <amq:broker id="broker" useJmx="false" persistent="true">
        <amq:transportConnectors>
            <amq:transportConnector uri="tcp://localhost:0" />
        </amq:transportConnectors>
    </amq:broker>

    <!--  ActiveMQ Destination  -->
    <amq:queue id="destination" physicalName="com.threads.example.queue" />

    <!-- JMS ConnectionFactory to use, configuring the embedded broker using XML -->
    <amq:connectionFactory id="jmsFactory" brokerURL="vm://localhost" />

    <!-- JMS Producer Configuration -->
    <bean id="jmsProducerConnectionFactory" 
          class="org.springframework.jms.connection.SingleConnectionFactory"
          depends-on="broker"
          p:targetConnectionFactory-ref="jmsFactory" />

    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"
          p:connectionFactory-ref="jmsProducerConnectionFactory"
          p:defaultDestination-ref="destination" />
</beans>

Once done, your application will run without your manual ActiveMQ installation. Above is from the example here.

If running in Eclipse…

  • Install m2eclipse plugin if you have not
  • Import -> Maven -> Existing Maven Projects
  • Select the MultipleQueueMonitors folder
  • Add src, resources, properties as source folders
  • Change the jre to whatever you want to use

The following error may occur in Eclipse

Caused by: org.xml.sax.SAXParseException: cvc-complex-type.2.4.c:
The matching wildcard is strict, but no declaration can be found for element 'amq:broker'. 

To fix it, you must associate the ActiveMQ XSD URL with the schema.

Go to XML->XML Catalog in Preferences, and add a User Specified Entry.

Location: http://activemq.apache.org/schema/core/activemq-core-5.3.0.xsd
Key Type: Namespace Name
Key: http://activemq.apache.org/schema/core

Then add a second one:

Location: http://activemq.apache.org/schema/core/activemq-core-5.3.0.xsd
Key Type: Schema Location
Key: http://activemq.apache.org/schema/core/activemq-core.xsd

Hit OK.

For more info, visit this stackoverflow thread.

>>>Check out or download the source from my github account if you are interested<<<

About these ads

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s