glassfish
  1. glassfish
  2. GLASSFISH-3898

Duplicate JMS messages are delivered, when multiple consumers use selectors to fetch messages from a queue

    Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Minor Minor
    • Resolution: Duplicate
    • Affects Version/s: 9.1pe
    • Fix Version/s: 4.0
    • Component/s: jms
    • Labels:
      None
    • Environment:

      Operating System: All
      Platform: Macintosh

    • Issuezilla Id:
      3,898
    • Status Whiteboard:
      Hide

      as91ur1-na

      Show
      as91ur1-na

      Description

      Here is my test setup:

      1) I use two queues.
      1.1) In the first one, I push "job descriptions" (one by one, from a client). In every message, I attach a
      property that identifies the type of entity I want to work on (in my example, I use greek gods)
      1.2) In the second one, I push "triggers" to initate the batch processing of several jobs

      2) I have one MDB, connected on the second queue (the one where I push triggers)
      2.1) Each time the client asks to process a batch of, say, 5 jobs, the following happens:
      2.1.1) In the onMessage() method, I open a MessageConsumer to fetch messages from the first queue
      (where the client has previously published jobs to do).
      2.1.2) I want to use selectors, because in a given batch, I want to handle only one type of entity (one
      batch for Zeus, one batch for Ares, etc.)
      2.1.3) Each time I get a job message, I process it. When I have consumed enough (i.e. reached batch
      size), then I commit. If messages don't come fast enough, I rollback. I am using an XA connection
      factory.

      3) With this setup, I first send 2 messages in the "trigger queue". Therefore, I now have two MDB
      instances waiting for incoming jobs. They have created a consumer, with a selector, and wait for "job
      messages" with a property value of "Zeus"
      3.1) I now send 1 "job message" with a property value of "Zeus".

      ----> 3.2) the problem is that my job message is received by both consumers, which should not be the
      case!

      Note: if I don't use selectors, then I don't see duplicate message consumption.

      Here is the content of my onMessage() method in my MDB:

      public void onMessage(Message message) {

      Connection con = null;
      Session session = null;
      MessageConsumer consumer = null;

      try {
      System.out.println("MDB " + this + " has received message: " + message.getJMSMessageID());

      // We have received a notification: we should consume n messages from the job queue and
      process them
      ObjectMessage msg = (ObjectMessage) message;
      JobRequest jobRequest = (JobRequest) msg.getObject();

      // We extract the batch size and the god name from the message received by the MDB
      int batchSize = jobRequest.getBatchSize();
      String godName = jobRequest.getGodName();

      // Generate a random batch name (for tracking/debugging...)
      int random = (int) (Math.random() * 1000);
      String batchName = random + "-" + this;

      // We will try to get "batchSize" messages from the job queue, with a filter on "godName"
      con = xaQueueConnectionFactory.createConnection();
      session = con.createSession(true, Session.SESSION_TRANSACTED); // the arguments will
      probably be ignored, because behind the scenes, the appserver is preparing an XA session
      String selector = "godName = '" + godName + "'";

      consumer = session.createConsumer(jobQueue, selector); // we use a selector to consume only
      the jobs for a given god
      System.out.println("We are only interested in this selector: " + selector);

      con.start();

      System.out.println("We want to process a batch of " + batchSize + " jobs, and want to create " +
      godName);
      boolean shouldContinue = batchSize > 0;
      int numberOfJobsProcessed = 0;

      while (shouldContinue) {
      ObjectMessage jobMsg = (ObjectMessage) consumer.receive(1000); // wait no more than 1
      second
      if (jobMsg != null)

      { Job job = (Job) jobMsg.getObject(); String name = job.getGodName(); boolean createTemple = job.isCreateTemple(); System.out.println("We have fetched a job in the queue: " + name + ", createTemple:" + createTemple); System.out.println("MDB " + this + " has fetched message: " + jobMsg.getJMSMessageID()); int n = numberOfJobsProcessed + 1; System.out.println("Calling olympusManagerBean"); olympusManagerBean.createGod(name, "a god created in BatchWorkerBean. N: " + n + "Batch: " + batchName, createTemple); numberOfJobsProcessed++; shouldContinue = (numberOfJobsProcessed < batchSize); System.out.println("We have processed " + numberOfJobsProcessed + "/" + batchSize + " jobs in the batch."); }

      else

      { System.out.println("There is no job available in the queue..."); shouldContinue = false; }

      }
      if (numberOfJobsProcessed != batchSize)

      { System.out.println(new java.util.Date() + ". We did not have enough jobs to do a batch... rolling back!"); ctx.setRollbackOnly(); }

      } catch (JMSException ex)

      { Logger.getLogger(BatchWorkerBean.class.getName()).log(Level.SEVERE, null, ex); ctx.setRollbackOnly(); }

      finally {
      try {
      System.out.println("Cleanup JMS resources...");
      if (consumer != null)

      { consumer.close(); System.out.println("Consumer closed."); }

      if (session != null)

      { session.close(); System.out.println("Session closed."); }

      if (con != null)

      { con.close(); System.out.println("Connection closed."); }

      } catch (JMSException e)

      { Logger.getLogger(BatchWorkerBean.class.getName()).log(Level.SEVERE, null, e); }

      }
      }

        Activity

        Hide
        basler added a comment -

        Not a 91ur1 release stopper

        Show
        basler added a comment - Not a 91ur1 release stopper
        Hide
        rampsarathy added a comment -

        If you are using DAS, could you try the test by changing the jms-service type
        from EMBEDDED to LOCAL

        Show
        rampsarathy added a comment - If you are using DAS, could you try the test by changing the jms-service type from EMBEDDED to LOCAL
        Hide
        rampsarathy added a comment -

        requesting satish to investigate

        Show
        rampsarathy added a comment - requesting satish to investigate
        Hide
        sanandal added a comment -

        "Reclassifying as P4 because this issue is not deemed "must fix" for this v2.1
        release whose primary release driver is SailFin.
        This issue will be scrubbed after this release and will be given the right
        priority for the next release."

        Show
        sanandal added a comment - "Reclassifying as P4 because this issue is not deemed "must fix" for this v2.1 release whose primary release driver is SailFin. This issue will be scrubbed after this release and will be given the right priority for the next release."
        Hide
        Tom Mueller added a comment -

        Bulk update to change fix version to "not determined" for all issues still open but with a fix version for a released version.

        Show
        Tom Mueller added a comment - Bulk update to change fix version to "not determined" for all issues still open but with a fix version for a released version.
        Hide
        David Zhao added a comment - - edited

        The control message and job message are redelivered infinitely for it setRollbackOnly() is used, the root cause is a defect in MQ. This defect is duplicate to GLASSFISH-6617.

        Show
        David Zhao added a comment - - edited The control message and job message are redelivered infinitely for it setRollbackOnly() is used, the root cause is a defect in MQ. This defect is duplicate to GLASSFISH-6617 .

          People

          • Assignee:
            David Zhao
            Reporter:
            oliechti
          • Votes:
            0 Vote for this issue
            Watchers:
            0 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved: