12. Interakcja za pomocą komunikatów (JMS)
W środowiskach rozproszonych moduły aplikacji mogą
wspóldziałac poprzez wymianę komunikatów. Jest to dobry i
ciekawy sposób programowania, pozwala bowiem uniezależniać kody
modułów. Poznamy go na przykładzie JMS (Java Messaging Service).
1. Wprowadzenie
Klienci (fragmenty systemu, aplikacje itp.) w środowisku
rozproszonym mogą współdziałać poprzez wymianę
komunikatów, przy czym:
- klient-producent wysyła komunikat,
- klient-konsument odczytuje komunikat,
- obaj klienci nie muszą działać równocześnie,
- żaden z nich nie musi nic "wiedzieć" o budowie, kodzie itp. drugiego.
Współdziałanie aplikacji poprzez wymianę
komunikatów ma tę zaletę, że uniezależnia kody programów
od siebie ("loose coupling")
Ten sposób programowania realizowany jest na różnych
platformach poprzez różne technologie - np. IBM MQseries.
W środowisku Javy mamy do dyspozycji JMS (Java Messaging Service).
Java Messaging Service to API (zestaw interfejsów), które zapewnia m.in.:
- synchroniczną wymianę komunikatów (klient czeka na komunikat, spodziewa się go),
- asynchroniczną wymianę komunikatów (JMS wysyła komunikat
do klienta, gdy tylko komunikat się pojawi, a klient nie musi
sprawdzać czy komunikat jest, obsługuje go na zasadzie "callback"),
- niezawodność - gwarancję, że komunikat zostanie "dostarczony" raz i tylko raz.
2. Architektura JMS
Składowe aplikacji JMS
- serwis (JMS provider) implementuje interfejsy i dostarcza narzędzi administrowania,
- klienci - tworzą, wysyłają i odbierają komunikaty,
- obiekty administrowane - destynacje oraz fabryki połączeń (są
zwykle tworzone i konfigurowane przez narzędzia administracyjne, ale
również istnieje programistyczne API do tych celów)
Źródło: J2EE Tutorial (CF - connection factory, D - destination)
Destynacje i domeny
Destynacja JMS jest miejscem w którym nadawca umieszcza
wiadomości i z którego odbiorca odczytuje wiadomości.
Dwie możliwe domeny (rodzaje destynacji):
- kolejka (Queue) -
Point-to-Point za pośrednictwem kolejki komunikatów,
nadawca wysyła do określonej nazwanej kolejki (destynacji), odbiorca -
odbiera komunikat z tej kolejki.
° każdy komunikat może być odebrany tylko przez jednego odbiorcę;
° odbiorca może odebrać wiadomość niezależnie od tego czy nadawca działa czy też juz zakończył działanie;
° zasadą jest, że odbiorca potwierdza odebranie wiadomości,
co zapewnia, że nie będzie mu ona przysłana po raz kolejny.
Źródło: J2EE Tutorial
- temat (Topic) - zasada publikacji i subskrypcji.
° każda wiadomość może mieć wielu odbiorców (subskrybentów tematu),
° w danym temacie może publikował wielu nadawców,
° odbiorca może odbieracz tylko te wiadomości z danego
tematu, które zostały opublikowane po zapisaniu się przez niego
do subskrypcji
Źródło: J2EE Tutorial
Uwaga: wersja 1.1 JMS pozwala na oprogramowanie obu typów komunikacji za pomocą tych samych interfejsów.
Architektura aplikacji JMS
Źródło: J2EE Tutorial
ConnectionFactory - tworzy połączenie do dostawcy serwisu JMS.
Destination - określa destynację nadawanych i odbieranych wiadomości.
3. Konfigurowanie obiektów administrowanych
Fabryka połączeń (ConnectionFactory) i destynacja (Destination) zwane są obiektami administrowanymi.
Administrowane obiekty JMS określają sposób łączenia z
serwisem JMS oraz destynacje nadawanych i odbieranych
komunikatów i muszą być zdefiniowane przed użyciem usług JMS
Zazwyczaj obiekty administrowane
są określane w konfiguracji serwera JMS (który może być częścią
składową serwera aplikacji) i/lub za pomocą narzędzi administracyjnych
serwera. W ten sposób aplikacja (kod programu) uniezależnia się
od konkretnych implementacji, które dla różnych
serwerów są różne.
Np. w konfiguracji serwera OpenJMS (zob. oprogaramowanie dołączone do kursu) definicja obiektów administrowanych może wyglądać w następujący sposób:
<Connectors>
<Connector scheme="tcp">
<ConnectionFactories>
<ConnectionFactory name="ConnectionFactory" />
</ConnectionFactories>
</Connector>
<Connector scheme="rmi">
<ConnectionFactories>
<QueueConnectionFactory name="JmsQueueConnectionFactory" />
<TopicConnectionFactory name="JmsTopicConnectionFactory" />
</ConnectionFactories>
</Connector>
</Connectors>
.....
<AdministeredDestinations>
<AdministeredTopic name="topic1">
<Subscriber name="sub1" />
<Subscriber name="sub2" />
</AdministeredTopic>
<AdministeredQueue name="queue1" />
<AdministeredQueue name="queue2" />
<AdministeredQueue name="queue3" />
</AdministeredDestinations>
W srodowiskach sewrerów aplikacji zazwyczaj definiujemy obiekty administrowane za pomoca odpowiednich narzędzi(.
Poniżej pokazano konfigurację z pozomu konsoli Sun Java Application Server (SJAS).
Definicja fabryki połączeń:
Definicja fizycznej destynacji
Powiązanie nazwy JNDI z fizyczną destynacją.
Można też tworzyć destynacje programistycznie np.
uzyskanie dostępu do narzędzi administracyjnych:
import org.exolab.jms.administration.AdminConnectionFactory;
import org.exolab.jms.administration.JmsAdminServerIfc;
// ...
String url = "tcp://localhost:3035/";
JmsAdminServerIfc admin = AdminConnectionFactory.create(url);
utworzenie destynacji - kolejki o nazwie myqueue:
String queue = "myqueue";
Boolean isQueue = Boolean.TRUE;
if (!admin.addDestination(queue, isQueue)) {
System.err.println("Failed to create queue " + queue);
}
utworzenie destynacji-tematy o nazwie mytopic:
String topic = "mytopic";
Boolean isQueue = Boolean.FALSE;
if (!admin.addDestination(topic, isQueue)) {
System.err.println("Failed to create topic " + topic);
}
źródło: OpenJMS User Guide.
4. Programowanie klientów JMS
Dostęp do obiektów administrowanych
Mając skonfigurowane obiekty administrowane (ConnectionFactory i
destynacje) w aplikacji JMS uzyskujemy do nich dostęp za pomocą JNDI.
Np.
Context ctx = new InitialContext();
ConnectionFactory fact = (ConnectionFactory) ctx.lookup("ConnectionFactory");
String admDestName = "myqueue";
Destination dest = (Destination) ctx.lookup(admDestName);
Uwaga:
- tutaj nazwą JNDI dla fabryki połączeń jest "ConnectionFactory", może być inna nazwa (np. jms/ConnectionFactory)
- dostawca inicjalnego kontekstu musi być okreslony w
jndi.properties albo za pomoca właściwości systemowych. zgodnie z
konfiguracją serwera JMS.
Połączenie i sesja
Następnym krokiem jest uzyskanie połaczenia z serwerem JMS - uzyskanie
obiektu typu javax.jms.Connection, a na jego podstawie stworzenie sesji:
Connection con = fact.createConnection();
Session ses = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
// false - bez transakcyjnosci
// AUTO_ACKNOWLEDGE - automatyczne potwierdzanie
Sesja JMS jest jednowątkowym
kontekstem nadawania-odbierania wiadomości i służy do tworzenia
nadawców i odbiorców wiadomości oraz samych wiadomości
Przed wysyłką/odbiorem wiadomości należy otworzyć połączenie, a po zakończeniu działania na tym połączeniu - zamknąć je:
con.start();
// ....
// Nadawanie lub odbiór
//...
con.close();
Wiadomości
Wiadomości są obiektami typów pochodnych od typu Message.
Wiadomość sklada się z nagłówka, właściwości i zawartości (ciała wiadomoścu)
Typ
wiadomości
|
Zawartość
|
TextMessage
|
Tekst (String)
|
MapMessage
|
Mapa. Klucze = String, wartości = obiekty klas opakowujących typy proste (np. Integer)
|
BytesMessage
|
Zestaw bajtów.
|
StreamMessage
|
Sekwencyjny strumień typów prostych
|
ObjectMessage
|
Serializowany obiekt
|
Message
|
Pusta zawartośc. Również nadtyp dla w/w typów
|
Wiadomości tworzymy za pomocą odpowiednich metod intefejsu Session np:
// Utworzenie wiadomości tekstowej:
TextMessage msg = ses.createTextMessage();
msg.setText(...);
Nadawanie wiadomości
// Utworzenie nadawcy, który będzie posyłał wiadomości do destynacji dest
MessageProducer sender = ses.createProducer(dest);
// Posłanie wiadomości msg
sender.send(msg);
Odbieranie wiadomości
// Utworzenie odbiorcy, który będzie odbierał wiadomości z destynacji dest
MessageConsumer receiver = ses.createConsumer(dest);
// Odbiór
Message msg = receiver.receive(); // blokowanie,
//dopóki wiadomośc nie będzie dostępna
// ....
Message msg = receiver.receive(n); // blokowanie (ale nie dłużej niż n ms)
// ...
Message msg = receiver.receiveNoWait(); // wraca od razu; null - brak wiadomości
Ogólnie sposób programowania (użycie
interfejsów) przy nadawaniu i odbieraniu wiadomości jest taki
sam, niezależnie od tego czy destynacja jest kolejką (Queue) czy
tematem (Topic).
Należy jednak pamiętać o różnicach pomiędzy działaniem w trybie "Point-to-Point" oraz publisher/subscriber.
Słuchacze wiadomości
Słuchacze wiadomości są obiektami klas implementujących interfejs
MessageListener z jedną metodą onMessage(Message), która jest
wywoływana, gdy wiadomość dociera do danej destynacji (może to być
Queue lub Topic). W ten sposób realizowany jest asynchroniczny -
na zasadzie "callback" odbiór wiadomości.
Po stworzeniu słuchacza wiadomości ustalamy go dla danego odbiorcy za pomocą metody setMessageListener().
class MsgListener implements MessageListener {
public void onMessage(Message msg) {
// ... przetwarzanie wiadomości msg
}
}
// ....
MessageConsumer receiver;
// ....
MessageListener msgl = new MsgListener();
//....
receiver.setMessageListener(msgl);
Ten mechanizm JMS jest stosowany w tzw. message-driven Enterprise Java Beans.
5. Komunikaty w trybie point-to-point
Rozważmy prosty przykład posyłania - odbioru wiadomości tekstowych.
Poniżej pokazano teksty programów klienckich.
Nadawca
Posyła wiadomość tekstową (podaną jako drugi argument) do destynacji podanej jako pierwszy argument
import javax.naming.*;
import javax.jms.*;
public class Sender {
public static void main(String[] args) {
Connection con = null;
try {
Context ctx = new InitialContext();
ConnectionFactory factory = (ConnectionFactory) ctx.lookup("ConnectionFactory");
String admDestName = args[0];
Destination dest = (Destination) ctx.lookup(admDestName);
con = factory.createConnection();
Session ses = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer sender = ses.createProducer(dest);
con.start();
TextMessage msg = ses.createTextMessage();
msg.setText(args[1]);
sender.send(msg);
System.out.println("Sender sent msg: " + args[1]);
} catch (Exception exc) {
exc.printStackTrace();
System.exit(1);
} finally {
if (con != null) {
try {
con.close();
} catch (JMSException exc) {
System.err.println(exc);
}
}
}
System.exit(0);
}
}
Odbiorca
Odbiera wiadomość tekstową z destynacji podanej jako argument.
import javax.naming.*;
import javax.jms.*;
public class Receiver {
public static void main(String[] args) {
Connection con = null;
try {
Context ctx = new InitialContext();
ConnectionFactory factory = (ConnectionFactory) ctx.lookup("ConnectionFactory");
String admDestName = args[0];
Destination dest = (Destination) ctx.lookup(admDestName);
con = factory.createConnection();
Session ses = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer receiver = ses.createConsumer(dest);
con.start();
System.out.println("Receiver started");
Message msg = receiver.receive();
if (msg instanceof TextMessage) {
TextMessage text = (TextMessage) msg;
System.out.println("Received: " + text.getText());
} else if (msg != null) {
System.out.println("Received non text message");
}
} catch (Exception exc) {
exc.printStackTrace();
System.exit(1);
} finally {
if (con != null) {
try {
con.close();
} catch (JMSException exc) {
System.err.println(exc);
}
}
}
System.exit(0);
}
}
Do przetestowania działania pokazanych klas użyjemy destynacji queue1 z konfiguracji serwera OpenJMS.
Kolejnośc uruchomienia: nadawca, odbiorca
E:\>java -cp openjms-0.7.7-alpha-1.jar;.; Sender queue1 "ala ma kota"
Sender sent msg: ala ma kota
E:>_
Nadawca zakończył działanie. Uruchomienie odbiorcy (w innej sesji znakowej) skutkuje natychmiastowym odczytaniem wiadomości:
E:\>java -cp openjms-0.7.7-alpha-1.jar;.; Receiver queue1
Receiver started
Received: ala ma kota
E:\>_
Wiadomość zostala skonsumowana. Ani ten ani żaden inny klient queue1 nie dostanie jej więcej.
Teraz zmienimy kolejność.
Kolejnośc uruchomienia: odbiorca, nadawca
E:\>java -cp openjms-0.7.7-alpha-1.jar;.; Receiver queue1
Receiver started
Odbiorca jest zablokowany i czeka na ew. wiadomośc.
Po wystartowaniu nadawcy:
E:\>java -cp openjms-0.7.7-alpha-1.jar;.; Sender queue1 "Witam. Co slychac?"
Sender sent msg: Witam. Co slychac?
E:>_
odbiorca natychmiast odbierze wiadomość i zakończy działanie:
Received: Witam. Co slychac?
E:\>_
Zobacz prezentację multimedialną
6. Słuchacze wiadomości - przykład
Poniższy program reaguje na komunikaty posyłane do kolejki queue1.
import javax.naming.*;
import javax.jms.*;
import javax.swing.*;
import java.awt.event.*;
import javax.naming.*;
import javax.jms.*;
import javax.swing.*;
import java.awt.event.*;
public class AsynchrReceiver extends JFrame implements MessageListener {
private Connection con;
private JTextArea ta = new JTextArea(10, 20);
public AsynchrReceiver(String destName) {
try {
Context ctx = new InitialContext();
ConnectionFactory factory = (ConnectionFactory) ctx.lookup("ConnectionFactory");
Destination dest = (Destination) ctx.lookup(destName);
con = factory.createConnection();
Session ses = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer receiver = ses.createConsumer(dest);
receiver.setMessageListener(this);
con.start();
} catch (Exception exc) {
exc.printStackTrace();
System.exit(1);
}
add(new JScrollPane(ta));
addWindowListener(new WindowAdapter() {
public void windowClosing(WindowEvent e) {
try { con.close(); } catch(Exception exc) {}
dispose();
System.exit(0);
}
});
setTitle("Czekam");
pack();
setLocationRelativeTo(null);
show();
}
int i=0;
public void onMessage(Message msg) {
setTitle("Received msg " + ++i);
try {
ta.append(((TextMessage) msg).getText() + "\n");
} catch(JMSException exc) { System.err.println(exc); }
}
public static void main(String[] args) {
new AsynchrReceiver("queue1");
}
}
Zobacz przentację mulimedialną działania programu
7. Działanie w trybie publisher/subscriber
Dla tych samych klientów - klas Sender i Receiver - użyjemy teraz destynacji topic1 z konfiguracji serwera OpenJMS.
Kolejnośc uruchomienia: nadawca, odbiorca
E:\>java -cp openjms-0.7.7-alpha-1.jar;.; Sender topic1 "ala ma kota"
Sender sent msg: ala ma kota
E:>_
Nadawca opublikował wiadomość i zakończył działanie.
Uruchamiamy odbiorcę
E:\>java -cp openjms-0.7.7-alpha-1.jar;.; Receiver topic1
Receiver started
Subskrybent nie odebrał wiadomości i jest zablokowany, bowiem publikacja nastąpila w czasie, gdy subskrybent był nieaktywny.
Możemy wystartować kilku innych sybskrybentów. Wszyscy będą czekali.
E:\>java -cp openjms-0.7.7-alpha-1.jar;.; Receiver topic1
Receiver started
|
E:\>java -cp openjms-0.7.7-alpha-1.jar;.; Receiver topic1
Receiver started
|
|
Dopiero opublikowanie kolejnej wiadomości (np. "Ponownie: ala ma kota")
spowoduje, że wszyscy działający subskrybenci ją otrzymają i zakończą
dzialanie.
E:\>java -cp openjms-0.7.7-alpha-1.jar;.; Receiver topic1
Receiver started
Received: Ponownie: ala ma kota
E:>_ |
E:\>java -cp openjms-0.7.7-alpha-1.jar;.; Receiver topic1
Receiver started
Received: Ponownie: ala ma kota
E:>_ |
E:\>java -cp openjms-0.7.7-alpha-1.jar;.; Receiver topic1
Receiver started
Received: Ponownie: ala ma kota
E:>_ |
8. Trwali subskrybenci
Aby zapewnić dostęp do wiadomości publikowanych w danym temacie w
trakcie nieaktywności subskrybenta należy zarejestrować subskrybenta w
danym temacie jako "trwałego subskrybenta" z unikalną nazwą subskrypcji.
String subName = "MySub";
MessageConsumer topicSubscriber =
session.createDurableSubscriber(myTopic, subName);
JMS będzie zbierać publikowane w temacie wiadomości dla takiego
subskrybenta w czasie gdy jest on nieaktywny i prześle mu je gdy będzie
aktywny.
Oto przykładowy program trwałego subskrybenta:
import javax.naming.*;
import javax.jms.*;
public class DurableSubscriber {
static void printMsg(Message message) throws Exception {
if (message instanceof TextMessage) {
TextMessage text = (TextMessage) message;
System.out.println("Received: " + text.getText());
} else if (message != null) {
System.out.println("Received non text message");
}
}
public static void main(String[] args) {
Connection con = null;
try {
Context ctx = new InitialContext();
ConnectionFactory factory = (ConnectionFactory) ctx.lookup("ConnectionFactory");
String admTopicName = args[0];
String subscriptionName = args[1];
Topic topic = (Topic) ctx.lookup(admTopicName);
con = factory.createConnection();
Session ses = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
TopicSubscriber subs = ses.createDurableSubscriber(topic, subscriptionName);
con.start();
while (true) {
Message message = subs.receiveNoWait();
if (message == null) {
System.out.println("Waiting... Press ctrl-c to stop");
try { Thread.sleep(10000); } catch (Exception exc) { break; }
}
else printMsg(message);
}
} catch (Exception exc) {
exc.printStackTrace();
System.exit(1);
} finally {
if (con != null) {
try {
con.close();
} catch (JMSException exc) {
System.err.println(exc);
}
}
}
System.exit(0);
}
}
i scenariusze dzialania.
Startujemy subskrybenta. Ponieważ w temacie nie ma jeszcze żadnej wiadomości - subskrybent czeka na jakąś.
E:\>java -cp E:\openjms-0.7.7-alpha-1\lib\openjms-0.7.7-alpha-1.jar;.; DurableSubscriber topic1 MojaSubskrypcja
Waiting... Press ctrl-c to stop
Waiting... Press ctrl-c to stop
Waiting... Press ctrl-c to stop
Waiting... Press ctrl-c to stop
Waiting... Press ctrl-c to stop
Waiting... Press ctrl-c to stop
Waiting... Press ctrl-c to stop
Waiting... Press ctrl-c to stop
Waiting... Press ctrl-c to stop
Waiting... Press ctrl-c to stop
Ale subskrybent i jego subskrypcja już zostali zarejestrowani, co mozna np. zobaczyć w konsoli administracyjnej OpenJMS.
Przeriwjmy teraz działanie sybskrybenta i opublikujemy w temacie topic1
kilka wiadomości (mogą to zrobić różni nadawcy). Pamiętajmy
odbiorca-subskrybent jest niekatywny.
E:\>java -cp E:\openjms-0.7.7-alpha-1\lib\openjms-0.7.7-alpha-1.jar;.; Sender topic1 "W Londynie jest 17 stopni"
Sender sent msg: W Londynie jest 17 stopni
E:\>java -cp E:\openjms-0.7.7-alpha-1\lib\openjms-0.7.7-alpha-1.jar;.; Sender topic1 "W Kielcach pada deszcz"
Sender sent msg: W Kielcach pada deszcz
E:\>java -cp E:\openjms-0.7.7-alpha-1\lib\openjms-0.7.7-alpha-1.jar;.; Sender topic1 "W Hiszpanii upaly"
Sender sent msg: W Hiszpanii upaly
Jeżeli teraz (albo za godzine, albo jutro) wystartujemy naszego subskrybenta - otrzyma on wszystkie zachowane wiadomosci.
E:\>java -cp E:\openjms-0.7.7-alpha-1\lib\openjms-0.7.7-alpha-1.jar;.; DurableSubscriber topic1 MojaSubskrypcja
Received: W Londynie jest 17 stopni
Received: W Kielcach pada deszcz
Received: W Hiszpanii upaly
Waiting... Press ctrl-c to stop
Waiting... Press ctrl-c to stop
Uwagi:
- jeżeli potwierdzamy odbiór wiadomości (a tak jest w trybie
AUTOMATIC_ACKNOWLEDGE), to otrzymane wiadomości nie będą ponownie
posyłane
- możemy ustalić czas przechowywania nieodebranych wiadomości (domyślnie wiadomości zachowane są na stale).
9. Podsumowanie
Zapoznaliśmy się z interakcją w środowiskach rozproszonych za pomocą wymiany komunikatów.
Przedstawiona został architektura JMS i sposoby programowania i działania aplikacji, w szczególności:
- w trybie point-to-point,
- asynchronicznie, poprzez nasłuch komunikatów,
- w trybie publikacji-subskrypcji.
Pokazano przykłady działania w środowisku serwera OpenJMS, a także sposoby konfigurowania fabryk i destynacji w SJAS.
10. Zadania
Zadanie
Stworzyć aplikację rozproszoną, komunikująca się poprzez zlecenia
posyłane w trybie point-to-point oraz rozsyłającą wiadomości w trybie
publisher/subscriber. Wymyślić praktyczny przykład zastosowania.