<

12. Interakcja za pomocą komunikatów (JMS)


W środowiskach rozproszonych moduły aplikacji mogą wspóldziałac poprzez wymianę komunikatów. Jest to dobry i ciekawy sposób programowania, pozwala bowiem uniezależniać kody modułów. Poznamy go na przykładzie JMS (Java Messaging Service).


1. Wprowadzenie


Klienci (fragmenty systemu, aplikacje itp.)  w środowisku rozproszonym mogą współdziałać poprzez wymianę komunikatów, przy czym:
Współdziałanie aplikacji poprzez wymianę komunikatów ma tę zaletę, że uniezależnia kody programów od siebie ("loose coupling")


Ten sposób programowania realizowany jest na różnych platformach poprzez różne technologie - np.  IBM MQseries.

W środowisku Javy mamy do dyspozycji JMS (Java Messaging Service).


Java Messaging Service to API (zestaw interfejsów), które zapewnia m.in.:



2. Architektura JMS


Składowe aplikacji JMS


r
Źródło: J2EE Tutorial (CF - connection factory, D - destination)

Destynacje i domeny

Destynacja JMS  jest miejscem w którym nadawca umieszcza wiadomości i z którego odbiorca odczytuje wiadomości.


Dwie możliwe domeny (rodzaje destynacji):
r°   każdy komunikat może być odebrany tylko przez jednego odbiorcę;
°   odbiorca może odebrać wiadomość niezależnie od tego czy nadawca działa czy też juz zakończył działanie;
°   zasadą jest, że odbiorca potwierdza odebranie wiadomości, co zapewnia, że nie będzie mu ona przysłana po raz kolejny.


Źródło: J2EE Tutorial


 
r°   każda wiadomość może mieć wielu odbiorców (subskrybentów tematu),

°   w danym temacie może publikował wielu nadawców,

°   odbiorca może odbieracz tylko te wiadomości z danego tematu, które zostały opublikowane po zapisaniu się przez niego do subskrypcji




Źródło: J2EE Tutorial
Uwaga: wersja 1.1 JMS pozwala na oprogramowanie obu typów komunikacji za pomocą tych samych interfejsów.

Architektura aplikacji JMS

r
Źródło: J2EE Tutorial


ConnectionFactory -  tworzy połączenie do dostawcy serwisu JMS.
Destination - określa destynację nadawanych i odbieranych wiadomości.


3. Konfigurowanie obiektów administrowanych

Fabryka połączeń (ConnectionFactory) i destynacja (Destination)  zwane są obiektami administrowanymi.

Administrowane obiekty JMS określają sposób łączenia z serwisem JMS oraz destynacje nadawanych i odbieranych komunikatów i muszą być zdefiniowane przed użyciem usług JMS


Zazwyczaj obiekty administrowane są określane w konfiguracji serwera JMS (który może być częścią składową serwera aplikacji) i/lub za pomocą narzędzi administracyjnych serwera. W ten sposób aplikacja (kod programu) uniezależnia się od konkretnych implementacji, które dla różnych serwerów są różne.

Np. w konfiguracji serwera OpenJMS (zob. oprogaramowanie dołączone do kursu) definicja obiektów administrowanych może wyglądać w następujący sposób:
  <Connectors>
    <Connector scheme="tcp">
      <ConnectionFactories>
        <ConnectionFactory name="ConnectionFactory" />
      </ConnectionFactories>
    </Connector>
    <Connector scheme="rmi">
      <ConnectionFactories>
        <QueueConnectionFactory name="JmsQueueConnectionFactory" />
        <TopicConnectionFactory name="JmsTopicConnectionFactory" />
      </ConnectionFactories>
    </Connector>
  </Connectors>

.....

  <AdministeredDestinations>
    <AdministeredTopic name="topic1">
      <Subscriber name="sub1" />
      <Subscriber name="sub2" />
    </AdministeredTopic>

    <AdministeredQueue name="queue1" />
    <AdministeredQueue name="queue2" />
    <AdministeredQueue name="queue3" />
  </AdministeredDestinations>
 

W srodowiskach sewrerów aplikacji zazwyczaj definiujemy obiekty administrowane za pomoca odpowiednich narzędzi(.
Poniżej pokazano konfigurację z pozomu konsoli Sun Java Application Server (SJAS).

Definicja fabryki połączeń:

r


Definicja fizycznej destynacji

r

Powiązanie nazwy JNDI z fizyczną destynacją.

r


Można też tworzyć destynacje programistycznie np. 

uzyskanie dostępu do narzędzi administracyjnych:
import org.exolab.jms.administration.AdminConnectionFactory;
import org.exolab.jms.administration.JmsAdminServerIfc;

// ...
    String url = "tcp://localhost:3035/";
    JmsAdminServerIfc admin = AdminConnectionFactory.create(url);

utworzenie destynacji - kolejki o nazwie myqueue:
    String queue = "myqueue";
    Boolean isQueue = Boolean.TRUE;
    if (!admin.addDestination(queue, isQueue)) {
        System.err.println("Failed to create queue " + queue);
    }
        

utworzenie destynacji-tematy o nazwie mytopic:

    String topic = "mytopic";
    Boolean isQueue = Boolean.FALSE;
    if (!admin.addDestination(topic, isQueue)) {
        System.err.println("Failed to create topic " + topic);
    }
        
źródło: OpenJMS User Guide.

4. Programowanie klientów JMS


Dostęp do obiektów administrowanych

Mając skonfigurowane obiekty administrowane (ConnectionFactory i destynacje) w aplikacji JMS uzyskujemy do nich dostęp za pomocą JNDI. Np.

      Context ctx = new InitialContext();
      ConnectionFactory fact = (ConnectionFactory) ctx.lookup("ConnectionFactory");
      String admDestName = "myqueue";
      Destination dest = (Destination) ctx.lookup(admDestName);
Uwaga:
  1. tutaj nazwą JNDI dla fabryki połączeń jest "ConnectionFactory", może być inna nazwa (np. jms/ConnectionFactory)
  2. dostawca inicjalnego kontekstu musi być okreslony w jndi.properties albo za pomoca właściwości systemowych. zgodnie z konfiguracją serwera JMS.

Połączenie i sesja

Następnym krokiem jest uzyskanie połaczenia z serwerem JMS - uzyskanie obiektu typu javax.jms.Connection, a na jego podstawie stworzenie sesji:

      Connection con = fact.createConnection();
      Session ses = con.createSession(false, Session.AUTO_ACKNOWLEDGE);

// false - bez transakcyjnosci
// AUTO_ACKNOWLEDGE - automatyczne potwierdzanie

Sesja JMS jest jednowątkowym kontekstem nadawania-odbierania wiadomości i służy do tworzenia nadawców i odbiorców wiadomości oraz samych wiadomości


Przed wysyłką/odbiorem wiadomości należy otworzyć połączenie, a po zakończeniu działania na tym połączeniu - zamknąć je:

con.start();
// ....
// Nadawanie lub odbiór
//...
con.close();
Wiadomości

Wiadomości są obiektami typów pochodnych od typu Message.
Wiadomość sklada się z nagłówka, właściwości i zawartości (ciała wiadomoścu)

 Typ
wiadomości 
Zawartość
TextMessage
Tekst (String)
MapMessage
Mapa. Klucze = String, wartości = obiekty klas opakowujących typy proste (np. Integer)
BytesMessage
Zestaw bajtów.
StreamMessage
Sekwencyjny strumień typów prostych
ObjectMessage
Serializowany obiekt
Message
Pusta zawartośc. Również nadtyp dla w/w typów

Wiadomości tworzymy za pomocą odpowiednich metod intefejsu Session np:


// Utworzenie wiadomości tekstowej:
TextMessage msg = ses.createTextMessage();
msg.setText(...);

Nadawanie wiadomości

// Utworzenie nadawcy, który będzie posyłał wiadomości do destynacji dest
MessageProducer sender = ses.createProducer(dest);

// Posłanie wiadomości msg
sender.send(msg);

Odbieranie wiadomości

// Utworzenie odbiorcy, który będzie odbierał wiadomości z destynacji dest
MessageConsumer receiver = ses.createConsumer(dest);

// Odbiór
Message msg = receiver.receive(); // blokowanie,
                                  //dopóki wiadomośc nie będzie dostępna
// ....
Message msg = receiver.receive(n); // blokowanie (ale nie dłużej niż n ms)
// ...
Message msg = receiver.receiveNoWait(); // wraca od razu; null - brak wiadomości

Ogólnie sposób programowania (użycie interfejsów) przy nadawaniu i odbieraniu wiadomości jest taki sam, niezależnie od tego czy destynacja jest kolejką (Queue) czy tematem (Topic).
Należy jednak pamiętać o różnicach pomiędzy działaniem w trybie "Point-to-Point" oraz publisher/subscriber.


Słuchacze wiadomości

Słuchacze wiadomości są obiektami klas implementujących interfejs MessageListener z jedną metodą onMessage(Message), która jest wywoływana, gdy wiadomość dociera do danej destynacji (może to być Queue lub Topic). W ten sposób realizowany jest asynchroniczny - na zasadzie "callback" odbiór wiadomości.


Po stworzeniu słuchacza wiadomości ustalamy go dla danego odbiorcy za pomocą metody setMessageListener().

class MsgListener implements MessageListener {
   public void onMessage(Message msg) {
       // ... przetwarzanie wiadomości msg
   }
}

// ....
MessageConsumer receiver;
// .... 
MessageListener msgl = new MsgListener();
//....
receiver.setMessageListener(msgl);

Ten mechanizm JMS jest stosowany w tzw. message-driven Enterprise Java Beans.


5. Komunikaty w trybie point-to-point

Rozważmy prosty przykład posyłania - odbioru wiadomości tekstowych.
Poniżej pokazano teksty programów klienckich.

Nadawca
Posyła wiadomość tekstową (podaną jako drugi argument) do destynacji podanej jako pierwszy argument
import javax.naming.*;
import javax.jms.*;

public class Sender {

  public static void main(String[] args) {

    Connection con = null;
    try {
      Context ctx = new InitialContext();
      ConnectionFactory factory = (ConnectionFactory) ctx.lookup("ConnectionFactory");
      String admDestName = args[0];
      Destination dest = (Destination) ctx.lookup(admDestName);
      con = factory.createConnection();
      Session ses = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
      MessageProducer sender = ses.createProducer(dest);
      con.start();
      TextMessage msg = ses.createTextMessage();
      msg.setText(args[1]);
      sender.send(msg);
      System.out.println("Sender sent msg: " + args[1]);
    } catch (Exception exc) {
        exc.printStackTrace();
        System.exit(1);
    } finally {
        if (con != null) {
          try {
            con.close();
          } catch (JMSException exc) {
              System.err.println(exc);
          }
        }
    }
    System.exit(0);
  }
}

Odbiorca
Odbiera wiadomość tekstową z destynacji podanej jako argument.
import javax.naming.*;
import javax.jms.*;

public class Receiver {

  public static void main(String[] args) {
      Connection con = null;
      try {
        Context ctx = new InitialContext();
        ConnectionFactory factory = (ConnectionFactory) ctx.lookup("ConnectionFactory");
        String admDestName = args[0];
        Destination dest = (Destination) ctx.lookup(admDestName);
        con = factory.createConnection();
        Session ses = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
        MessageConsumer receiver = ses.createConsumer(dest);

        con.start();
        System.out.println("Receiver started");
        Message msg = receiver.receive();
        if (msg instanceof TextMessage) {
            TextMessage text = (TextMessage) msg;
            System.out.println("Received: " + text.getText());
        } else if (msg != null) {
            System.out.println("Received non text message");
        }
      } catch (Exception exc) {
          exc.printStackTrace();
          System.exit(1);
      } finally {
          if (con != null) {
            try {
              con.close();
            } catch (JMSException exc) {
                System.err.println(exc);
            }
          }
      }
      System.exit(0);
  }
}

Do przetestowania działania pokazanych klas użyjemy destynacji queue1 z konfiguracji serwera OpenJMS.

Kolejnośc uruchomienia: nadawca, odbiorca


E:\>java -cp openjms-0.7.7-alpha-1.jar;.; Sender queue1 "ala ma kota"
Sender sent msg: ala ma kota
E:>_


Nadawca zakończył działanie. Uruchomienie odbiorcy (w innej sesji znakowej) skutkuje natychmiastowym odczytaniem wiadomości:


E:\>java -cp openjms-0.7.7-alpha-1.jar;.; Receiver queue1
Receiver started
Received: ala ma kota
E:\>_


Wiadomość zostala skonsumowana. Ani ten ani żaden inny klient queue1 nie dostanie jej więcej.

Teraz zmienimy kolejność.
Kolejnośc uruchomienia: odbiorca, nadawca


E:\>java -cp openjms-0.7.7-alpha-1.jar;.; Receiver queue1
Receiver started


Odbiorca jest zablokowany i czeka na ew. wiadomośc.

Po wystartowaniu nadawcy:

E:\>java -cp openjms-0.7.7-alpha-1.jar;.; Sender queue1 "Witam. Co slychac?"
Sender sent msg: Witam. Co slychac?
E:>_


odbiorca natychmiast odbierze wiadomość i zakończy działanie:


Received: Witam. Co slychac?
E:\>_



Zobacz prezentację multimedialną  r


6. Słuchacze wiadomości - przykład

Poniższy program reaguje na komunikaty posyłane do kolejki queue1.
import javax.naming.*;
import javax.jms.*;
import javax.swing.*;
import java.awt.event.*;

import javax.naming.*;
import javax.jms.*;
import javax.swing.*;
import java.awt.event.*;

public class AsynchrReceiver extends JFrame implements MessageListener {

  private Connection con;
  private JTextArea ta = new JTextArea(10, 20);

  public AsynchrReceiver(String destName) {

    try {
      Context ctx = new InitialContext();
      ConnectionFactory factory = (ConnectionFactory) ctx.lookup("ConnectionFactory");
      Destination dest = (Destination) ctx.lookup(destName);
      con = factory.createConnection();
      Session ses = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
      MessageConsumer receiver = ses.createConsumer(dest);
      receiver.setMessageListener(this);
      con.start();
    } catch (Exception exc) {
         exc.printStackTrace();
         System.exit(1);
    }

    add(new JScrollPane(ta));

    addWindowListener(new WindowAdapter() {
      public void windowClosing(WindowEvent e) {
        try { con.close(); } catch(Exception exc) {}
        dispose();
        System.exit(0);
      }
    });
    setTitle("Czekam");
    pack();
    setLocationRelativeTo(null);
    show();
  }

  int i=0;
  public void onMessage(Message msg) {
    setTitle("Received msg " + ++i);
    try {
      ta.append(((TextMessage) msg).getText() + "\n");
    } catch(JMSException exc) { System.err.println(exc); }
  }

  public static void main(String[] args) {
    new AsynchrReceiver("queue1");
  }

}

rZobacz przentację mulimedialną działania programu

7. Działanie w trybie publisher/subscriber


Dla tych samych klientów - klas Sender i Receiver - użyjemy teraz destynacji topic1 z konfiguracji serwera OpenJMS.

Kolejnośc uruchomienia: nadawca, odbiorca


E:\>java -cp openjms-0.7.7-alpha-1.jar;.; Sender topic1 "ala ma kota"
Sender sent msg: ala ma kota
E:>_


Nadawca opublikował wiadomość i zakończył działanie.
Uruchamiamy odbiorcę 


E:\>java -cp openjms-0.7.7-alpha-1.jar;.; Receiver topic1
Receiver started


Subskrybent nie odebrał wiadomości i jest zablokowany, bowiem publikacja nastąpila w czasie, gdy subskrybent był nieaktywny.

Możemy wystartować kilku innych sybskrybentów. Wszyscy będą czekali.

E:\>java -cp openjms-0.7.7-alpha-1.jar;.; Receiver topic1
Receiver started
E:\>java -cp openjms-0.7.7-alpha-1.jar;.; Receiver topic1
Receiver started

Dopiero opublikowanie kolejnej wiadomości (np. "Ponownie: ala ma kota") spowoduje, że wszyscy działający subskrybenci ją otrzymają i zakończą dzialanie.

E:\>java -cp openjms-0.7.7-alpha-1.jar;.; Receiver topic1
Receiver started
Received: Ponownie: ala ma kota
E:>_
E:\>java -cp openjms-0.7.7-alpha-1.jar;.; Receiver topic1
Receiver started
Received: Ponownie: ala ma kota
E:>_
E:\>java -cp openjms-0.7.7-alpha-1.jar;.; Receiver topic1
Receiver started
Received: Ponownie: ala ma kota
E:>_


8. Trwali subskrybenci


Aby zapewnić dostęp do wiadomości publikowanych w danym temacie w trakcie nieaktywności subskrybenta należy zarejestrować subskrybenta w danym temacie jako "trwałego subskrybenta" z unikalną nazwą subskrypcji.
String subName = "MySub";
MessageConsumer topicSubscriber =
  session.createDurableSubscriber(myTopic, subName);
JMS będzie zbierać publikowane w temacie wiadomości dla takiego subskrybenta w czasie gdy jest on nieaktywny i prześle mu je gdy będzie aktywny.

Oto przykładowy program trwałego subskrybenta:
import javax.naming.*;
import javax.jms.*;

public class DurableSubscriber {

  static void printMsg(Message message) throws Exception {
    if (message instanceof TextMessage) {
        TextMessage text = (TextMessage) message;
        System.out.println("Received: " + text.getText());
    } else if (message != null) {
        System.out.println("Received non text message");
    }
  }

  public static void main(String[] args) {
    Connection con = null;
    try {
      Context ctx = new InitialContext();
      ConnectionFactory factory = (ConnectionFactory) ctx.lookup("ConnectionFactory");
      String admTopicName = args[0];
      String subscriptionName = args[1];
      Topic topic = (Topic) ctx.lookup(admTopicName);
      con = factory.createConnection();
      Session ses = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
      TopicSubscriber subs = ses.createDurableSubscriber(topic, subscriptionName);

      con.start();
      while (true) {
        Message message = subs.receiveNoWait();
        if (message == null) {
          System.out.println("Waiting... Press ctrl-c to stop");
          try { Thread.sleep(10000); } catch (Exception exc) { break; }
        }
        else printMsg(message);
      }
    } catch (Exception exc) {
      exc.printStackTrace();
      System.exit(1);
    } finally {
      if (con != null) {
          try {
              con.close();
          } catch (JMSException exc) {
              System.err.println(exc);
          }
      }
    }
    System.exit(0);
  }

}

i scenariusze dzialania.

Startujemy subskrybenta. Ponieważ w temacie nie ma  jeszcze żadnej wiadomości - subskrybent czeka na jakąś.


E:\>java -cp E:\openjms-0.7.7-alpha-1\lib\openjms-0.7.7-alpha-1.jar;.; DurableSubscriber topic1 MojaSubskrypcja
Waiting... Press ctrl-c to stop
Waiting... Press ctrl-c to stop
Waiting... Press ctrl-c to stop
Waiting... Press ctrl-c to stop
Waiting... Press ctrl-c to stop
Waiting... Press ctrl-c to stop
Waiting... Press ctrl-c to stop
Waiting... Press ctrl-c to stop
Waiting... Press ctrl-c to stop
Waiting... Press ctrl-c to stop


Ale subskrybent i jego subskrypcja już zostali zarejestrowani, co mozna np. zobaczyć w konsoli administracyjnej OpenJMS.

r

Przeriwjmy teraz działanie sybskrybenta i opublikujemy w temacie topic1 kilka wiadomości (mogą to zrobić różni nadawcy). Pamiętajmy odbiorca-subskrybent jest niekatywny.



E:\>java -cp E:\openjms-0.7.7-alpha-1\lib\openjms-0.7.7-alpha-1.jar;.; Sender topic1 "W Londynie jest 17 stopni"
Sender sent msg: W Londynie jest 17 stopni
E:\>java -cp E:\openjms-0.7.7-alpha-1\lib\openjms-0.7.7-alpha-1.jar;.; Sender topic1 "W Kielcach pada deszcz"
Sender sent msg: W Kielcach pada deszcz
E:\>java -cp E:\openjms-0.7.7-alpha-1\lib\openjms-0.7.7-alpha-1.jar;.; Sender topic1 "W Hiszpanii  upaly"
Sender sent msg: W Hiszpanii  upaly


Jeżeli teraz (albo za godzine, albo jutro) wystartujemy naszego subskrybenta - otrzyma on wszystkie zachowane wiadomosci.


E:\>java -cp E:\openjms-0.7.7-alpha-1\lib\openjms-0.7.7-alpha-1.jar;.; DurableSubscriber topic1 MojaSubskrypcja
Received: W Londynie jest 17 stopni
Received: W Kielcach pada deszcz
Received: W Hiszpanii upaly
Waiting... Press ctrl-c to stop
Waiting... Press ctrl-c to stop


Uwagi:
  1. jeżeli potwierdzamy odbiór wiadomości (a tak jest w trybie AUTOMATIC_ACKNOWLEDGE), to otrzymane wiadomości nie będą ponownie posyłane
  2. możemy ustalić czas przechowywania nieodebranych wiadomości (domyślnie wiadomości zachowane są na stale).


9. Podsumowanie


Zapoznaliśmy się z interakcją w środowiskach rozproszonych za pomocą wymiany komunikatów.
Przedstawiona został architektura JMS i sposoby programowania i działania  aplikacji, w szczególności:

Pokazano przykłady działania w środowisku serwera OpenJMS, a także sposoby konfigurowania fabryk i destynacji w SJAS.

10. Zadania


Zadanie

Stworzyć aplikację rozproszoną, komunikująca się poprzez zlecenia posyłane w trybie point-to-point oraz rozsyłającą wiadomości w trybie publisher/subscriber. Wymyślić praktyczny przykład zastosowania.