Using MessageAuthorizationPolicy in ActiveMQ

The ActiveMQ broker allows message level security, that means you can ask ActiveMQ to call your business logic before consuming every message, and as a result get control on which message can be consumed by which MessageListener. I wanted to try that, so i created this SimpleMessagePolicy.java class which checks if the message body contains Secrete message for clientId com.webspherenotes.secret text, if yes it checks if the consumers's caller Id is com.webspherenotes.secret if yes then only it will allow the consumer to consume message. You can download the sample code for SampleMessagePolicy from here First i had to create SampleMessagePolicy class that implements MessageAuthorizationPolicy interface.The isAllowedToConsume() method of your class gets before a consumer is trying to consume every message. Create this class in separate java project, compile that project and copy the .jar file in the activemq/lib directory

package com.webspherenotes.jms;

import javax.jms.JMSException;
import javax.jms.TextMessage;

import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.Message;
import org.apache.activemq.security.MessageAuthorizationPolicy;

public class SampleMessagePolicy implements MessageAuthorizationPolicy {

  @Override
  public boolean isAllowedToConsume(ConnectionContext messageContext,
      Message message) {
    try {
      System.out
          .println("Inside SampleMessagePolicy.isAllowedToConsume() ");
      System.out.println("Client Id " + messageContext.getClientId());
      if (message instanceof TextMessage) {
        TextMessage textMessage = (TextMessage) message;
        System.out.println("Text message is " + textMessage.getText());

        String messageStr = textMessage.getText();
        if (messageStr
            .equals("Secrete message for clientId com.webspherenotes.secret")) {
          System.out
              .println("Secret message received check the clientId");
          if (messageContext.getClientId().equals(
              "com.webspherenotes.secret")) {
            System.out
                .println("Got request from com.webspherenotes.secret, for secret message returning message");
            return true;
          } else {
            System.out
                .println("Got request from some other client, for secret message returning, hidding message");
            return false;
          }
        } else {
          System.out
              .println("Non secret message received, returning message");
          return true;
        }
      }

    } catch (JMSException e) {
      e.printStackTrace();
    }

    return true;
  }

}
Next configure the ActiveMQ so that it will use SampleMessagePolicy as MessageAuthorizationPolicy like this

<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:amq="http://activemq.apache.org/schema/core"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://www.springframework.org/schema/beans 
  http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
  http://activemq.apache.org/schema/core 
  http://activemq.apache.org/schema/core/activemq-core.xsd
  http://activemq.apache.org/camel/schema/spring 
  http://activemq.apache.org/camel/schema/spring/camel-spring.xsd">

  <!-- Allows us to use system properties as variables in this configuration file -->
  <bean
    class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer" />

  <broker xmlns="http://activemq.apache.org/schema/core"
    brokerName="localhost" dataDirectory="${activemq.base}/data">


        <messageAuthorizationPolicy>
            <bean class="com.webspherenotes.jms.SampleMessagePolicy"
                xmlns="http://www.springframework.org/schema/beans" />
        </messageAuthorizationPolicy>

    <!-- The transport connectors ActiveMQ will listen to -->
    <transportConnectors>
      <transportConnector name="openwire"
        uri="tcp://localhost:61616" />
    </transportConnectors>

  </broker>

</beans>
After configuring the ActiveMQ restart the server for your changes to take effect. Then create MessageConsumer.java like this, in this file set the ClientId to com.webspherenotes.secret so that it will receive the secrete message.

package com.webspherenotes.jms;

import java.io.BufferedReader;
import java.io.InputStreamReader;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.InitialContext;
import javax.naming.NamingException;

public class MessageConsumer implements MessageListener{

  QueueConnection queueConnection;
  QueueSession queueSession;

  public MessageConsumer() {
    try {
      InitialContext context = new InitialContext();
      QueueConnectionFactory queueConnectionFactory = (QueueConnectionFactory) context
          .lookup("QueueCF");
      queueConnection = queueConnectionFactory.createQueueConnection("consumer","password");

      queueConnection.setClientID("com.webspherenotes.secret");
      queueSession = queueConnection.createQueueSession(false,
          Session.AUTO_ACKNOWLEDGE);
      Queue queue = (Queue) context.lookup("SampleQ");
      QueueReceiver queueReceiver = queueSession.createReceiver(queue);
      queueReceiver.setMessageListener(this);
      queueConnection.start();
    } catch (NamingException e) {
      e.printStackTrace();
    } catch (JMSException e) {
      e.printStackTrace();
    }
  }

  @Override
  public void onMessage(Message message) {
    try {
      TextMessage textMessage = (TextMessage) message;
      System.out.println("Inside MessageConsumer.onMessage "
          + textMessage.getText());
    } catch (JMSException e) {
      e.printStackTrace(System.out);
    }
  }

  public static void main(String[] argv) {
    try {
      MessageConsumer messageConsumer = new MessageConsumer();

      BufferedReader stdin = new BufferedReader(new InputStreamReader(
          System.in));
      System.out.println("Press enter to quit application");
      stdin.readLine();
      messageConsumer.queueConnection.close();

    } catch (Exception e) {
      e.printStackTrace();
    }
  }

}

No comments: