8. Współbieżna Java: zadania i wykonawcy
Głównym celem tego (i następnego) wykładu jest
przedstawienie nowych środków programowania
współbieżnego, dostępnych w pakiecie java.util.concurrent.
Rozpoczynamy od przypomnienia podstawowych pojęć, po czym zajmiemy się
podstawowymi nowymi koncepcjami "konkurencyjnej Javy". Tutaj
uwagę skupiamy na zadaniach i wykonawcach.
1. Programowanie współbieżne - przypomnienie podstawowych pojęć
Wątek - to sekwencja działań, która może
wykonywać się równolegle z innymi sekwencjami działań w
kontekście danego procesu
(programu).
W systemach jednoprocesorowych wrażenie równoległości działania wątków osiągane jest przez mechanizm przydzielania
czasu procesora poszczególnym wykonującym się wątkom. Każdy wątek uzyskuje
dostęp do procesora na krótki czas (kwant czasu), po czym "oddaje procesor"
innemu wątkowi. Zmiany są tak szybkie, że powstaje wrażenie równoległości
działania.
Najbardziej popularnym mechanizmem "zamiany wątków u procesora" jest tzw. wywłaszczanie (pre-emptive multistasking) - o dostępie
wątków do procesora decyduje systemowy zarządca wątków: przydziela on wątkowi
kwant czasu procesora, po upłynięciu którego odsuwa wątek od procesora i
przydziela kwant czasu procesora innemu wątkowi.
Podstawowa różnica pomiędzy procesami i wątkami polega na tym, że różne wątki
w ramach jednego procesu mają dostęp do całego kontekstu tego procesu (m.in.
przydzielonych mu zasobów).
Wobec tego zamiana wątków jednego procesu "przy procesorze" jest wykonywana
szybciej niż zamiana procesów (wątków różnych procesów).
Z punktu widzenia programisty wspólny dostęp wszystkich wątków jednego procesu
do kontekstu tego procesu ma zarówno zalety jak i wady.
Zaletą jest możliwość łatwego dostępu do wspólnych danych programu. Wadą
- brak ochrony danych programu przed równoległymi zmianami, dokonywanymi
przez różne wątki, co może prowadzić do niespójności danych, a czego unikanie
wiąże się z koniecznością synchronizacji działania wątków.
W Javie uruchamianiem wątków i zarządzaniem nimi zajmuje się klasa Thread.
Do wersji 1.5 obowiązywały następujące zasady:
Aby uruchomić wątek należy stworzyć obiekt klasy Thread i użyć metody start() wobec tego obiektu.
Ale kod, wykonujący się jako wątek (sekwencja działań, wykonująca się równolegle
z innymi działaniami programu) określany jest przez obiekt klasy implementującej interfejs
Runnable.
Interfejs ten zawiera deklarację metody run(), która przy implementacji musi być zdefiniowana.
Właśnie w metodzie run() zapisujemy kod, który będzie wykonywany jako wątek (równolegle z innymi wątkami programu).
Metoda run() określa co ma robić wątek.
Klasa Thread implementuje interfejs Runnable (podając "pustą" metodę run).
Stąd pierwszy sposób tworzenia i uruchamiania wątku.
Pierwszy sposób tworzenia i uruchamiania wątku (Java 1.4)
|
- Zdefiniować własną klasę dziedziczącą Thread (np. class Timer extends Thread)
-
Przedefiniować odziedziczoną metodą run(), podając w niej działania, które ma wykonywać wątek
-
Stworzyć obiekt naszej klasy (np. Timer timer = new Timer(...);
-
Wysłać mu komunikat start() (np. timer.start())
|
Jak wiemy, kod wykonywany przez wątek podajemy w metodzie run(). A
metoda run() może być zdefiniowana w dowolnej klasie implementującej interfejs
Runnable.
Klasa Thread dostarcza zaś konstruktora, którego argument jest typu Runnable.
Konstruktor ten tworzy wątek, który będzie wykonywał kod zapisany w metodzie
run() w klasie obiektu, do którego referencję przekazano wspomnianemu wyżej
konstruktorowi.
Stąd drugi sposób tworzenia i uruchamiania wątków.
Drugi sposób tworzenia i uruchamiania wątku (Java 1.4)
|
- Zdefiniować klasę implementującą interfejs Runnable (np. class X implements Runnable).
- Dostarczyć w niej definicji metody run (co ma robić wątek).
- Utworzyć obiekt tej klasy (np. X x = new X(); )
- Utworzyć obiekt klasy Thread, przekazując w konstruktorze referencję
do obiektu utworzonego w p.3 (np.Thread thread = new Thread(x);).
- Wywołać na rzecz nowoutworzonego obiektu klasy Thread metodę start ( thread.start();)
|
Kończenie pracy wątku
Wątek kończy pracę w sposób naturalny wtedy, gdy zakończy się jego metoda run().
Jeśli chcemy programowo zakończyć pracę wątku, to należy zapewnić w metodzie
run() sprawdzenie warunków zakończenia (ustalanych programowo) i jeśli są
spełnione - spowodować wyjście z run() albo przez "dobiegnięcie do końca",
albo przez return.
Warunki zakończenia mogą być formułowane w postaci
wartości jakiejś zmiennej, które są ustalane przez inne fragmenty kodu programu
(wykonywane w innym wątku).
Do kończenia pracy wątku możliwe jest także użycie metody interrupt()
ale wymaga to odpowiedniego przygotowania kodu w metodzie run(). Jest
to właściwy sposób postępowania, jeśli używamy nowych koncepcji
(serwisów wykonawców) z pakietu java.util.concurrent.
O tym mowa będzie w dalszej części wykładu.
2. Pakiet java.util.concurrent z lotu ptaka
Java zawsze była przygotowana na współbieżność. Od początku
jej istnienia, w samym języku i jego pakietach, zawarto
odpowiednie konstrukcje (synchronize, synchronized, wait, notify,
notifyAll).
Java umożliwiała pisanie współbieżnych programów, ale nie zawsze w sposób skalowalny.
Czyli wszystko działało dobrze, dopóki o zasoby nie konkurowało
zbyt wiele wątków. Przy dużej konkurencji ("high-contention") tylko genialnie
napisane programy sprawdzały się i pod względem miarodajności i pod
względem efektywności.
Genialne pisanie programu wymaga i dużo wysiłku i zwykle nie wychodzi.
Pakiet java.util.concurrent (opracowany głównie przez Douga Lea i obecny w Javie od wersji 1.5) zmienia tę
sytuację. Teraz dość prosto można napisać programy nastawione na
konkurencyjność (a więc nie tylko współbieżność, ale i skalowalną współbieżność).
A dodatkowo mamy do dyspozycji nieprzebrane mnóstwo narzędzi do
tworzenia całkiem własnych, przygotowanych na specyficzne
sytuacje, rozwiązań problemów współbieżności,
Pakiet java.util.concurrent:
- umożliwia separację zadań i wątków,
- pozwala na anulowanie zadań,
- dostarcza puli wątków prowadzonych przez Wykonawców,
- zawiera bardziej efektywne przy "high contention" - bazowe synchronizatory (Lock),
- umożliwia łatwą realizację read/write locks,
- udostępnia nowe synchronizatory wyższego poziomu (semafory, bariery, zasuwy),
- pozwala na użycie blokujących kolejek (łatwa koordynacja zadań),
- dostarcza konkurencyjnych (efektywnych przy dużej konkurencji) kolekcji,
- definiuje atomiki (dane podstawowych typów, bezpiecznie dostępne współbieżnie).
3. Separacja zadań od sposobu ich wykonania
Od początków Javy programiści mieli wiele kłopotów
ze zrozumieniem pojęcia wątku (a szczególnie obiektów
klasy
Thread). Nawet zaawansowanym programistom wydawało się, że użyteczną
pracę wykonuje klasa Thread. Tymczasem kod wątku zapisywany jest
w metodzie run(), a klasa Thread tak naprawdę nic nie robi. To kod
metody run() wykonuje się "w wątku" (czyli współbieżnie), choć
często mówimy nieco mylnie (potocznie): "wątek się wykonuje".
Tymczasem wcale nie jesteśmy zainteresowani wątkami (obiektami
klasy Thread) tylko zadaniami (zapisanymi w metodzie run()),
które "poprzez wątki" się wykonują. Chcielibyśmy rozumować
raczej w kategoriach zadań do wykonania, a nie technicznych
szczegółów sposobu ich wykonania.
I w Javie 1.5 stało się to możliwe. Zadania "do wykonania" mogą być odseparowane od wątków (od zarządzanie
nimi, umartwiania się nimi, śledzenia ich - co w dotychczasowej Javie
wcale nie było łatwe, o ile w ogóle możliwe).
Dotychczas było tak:
Runnable r = ...;
Thread t = new Thread(r);
t.start();
to znaczy - trzeba było samemu uruchomić wątek i dalej martwić się o jego los.
Gdy miał wyprodukować jakieś wyniki trzeba
było o nie zadbać (dość żmudnie oprogramować ich
przejęcie).
Gdy było dużo wątków - pojawiał się duży problem.
Serwery odmawiały posłuszeństwa.
Na przykład, przy poniższym kodzie:
import java.net.*;
import java.io.*;
public class SomeServer {
private ServerSocket ss = null;
private volatile boolean serverRunning = true;
public SomeServer(ServerSocket ss) {
this.ss = ss;
System.out.println("Server started");
serviceConnections();
}
private void serviceConnections() {
while (serverRunning) {
try {
final Socket conn = ss.accept();
System.out.println("Connection established");
Runnable serviceCode = new Runnable() {
public void run() {
serviceRequests(conn);
}
}
new Thread(serviceCode).start();
} catch (Exception exc) {
exc.printStackTrace();
}
}
try { ss.close(); } catch (Exception exc) { //... }
}
//...
}
gdy zglosi się (do serwisu) naraz dużo klientów wpadniemy w kłopoty. (Dlaczego?)
A co się stanie, gdy nasza metoda run() zgłosi wyjątek?
Interfejs Runnable nie przewidział tego w deklaracji run(). Pozostaje tylko RuntimeException.
Sposoby jego obsługi w Javie "przedkonkurencyjnej" były i uciążliwe i niezbyt niezawodne.
Na te wszystkie problemy pakiet java.util.concurrent ma gotowe odpowiedzi.
Chcesz łatwo tworzyć pule wątków, i zarządzać nimi bez trudnego programowania? |
Użyj odpowiednich Serwisów Wykonawców (ExecutorService) |
|
Chcesz myśleć w kategoriach zadań (Task), nie wątków? |
Daj Wykonawcom zadania do wykonania, oni zdecydują
jak
najlepiej podzielić je między wątki, ale ogólna strategia
podziału i uruchamiania jest pod Twoją kontrolą (mamy wybór
różnych strategii) |
|
Chcesz mieć łatwo dostępne wyniki współbieżnych zadań? |
Użyj interfejsu Callable, zaufaj Wykonawcom i odbieraj wyniki w postaci FutureTask -
obiektu pozwalającego na asynchroniczne testy (wyniki już są? jeszcze
nie ma?), reagowanie na wyjątki, odczytywanie wyników zadań i podłączanie callbacków. |
|
Pierwszym krokiem ku poprawie sytuacji jest odseparowanie zadań do wykonania od mechanizmów tworzenia i uruchamiania wątków.
Pakiet java.util.concurrent definiuje interfejs Executor:
public interface Executor {
void execute(Runnable);
}
Jego implementacje (nie my) winny zajmowac się tworzeniem i uruchamianiem wątków.
Sposób, polityka tworzenia i uruchamiania wątków spoczywa
na Wykonawcach (klasach implementujących interfejs Executor).
A zatem napiszemy raczej tak (choć jeszcze nie do końca poprawnie):
Executor executor = ....;
private void serviceConnections() {
while (serverRunning) {
try {
final Socket conn = ss.accept();
System.out.println("Connection established");
Runnable serviceCode = new Runnable() {
public void run() {
serviceRequests(conn);
}
}
executor.execute(serviceCode);
} catch (Exception exc) {
exc.printStackTrace();
}
}
try { ss.close(); } catch (Exception exc) { //... }
}
W szczególności konkretny Wykonawca może prowadzić pulę
wątków, zapewniając naszemu serwerowi odpowiednią
efektywność.
Tworzenie wątków jest kosztowne czasowo.
Pule wątków
pozwalają na ponowne użycie wolnych wątków, a także na ew.
limitowanie maksymalnej liczby wątków w puli.
My rozumujemy w kategoriach zadania do wykonania (określanegu tu przez kod Runnable), tworzeniem i uruchamianiem wątków zajmuja się Wykonawcy.
W Javie mamy do dyspozycji kilka rodzajów gotowych Wykonawców, fabrykowanych przez odpowiednie metody klasy Executors m.in.:
- Wykonawca uruchamiający podane mu zadania w jednym wątku (po kolei) (Executors.newSingleThreadExecutor()),
- Wykonawca, prowadzący pulę wątków o zadanych maksymalnych rozmiarach (Executors.newFixedThreadPool()),
- Wykonawca, prowadzący pulę wątków o dynamicznych rozmiarach
- (Executors.newCachedThreadPool()),
- Wykonawcy zarządzający tworzeniem i wykonaniem wątków w określonym czasie lub z określoną periodycznością (Executors.newScheduled....())
Możemy więc napisać coś takiego:
import java.util.concurrent.*;
class Task implements Runnable {
private String name;
public Task(String name) {
this.name = name;
}
public void run() {
for (int i=1; i <= 4; i++) {
System.out.println(name + " " + i);
Thread.yield();
}
}
}
public class Wykonawca {
public static void main(String[] args) {
Executor exec = Executors.newFixedThreadPool(2);
for (int i=1; i<=4; i++) {
exec.execute(new Task("Task " + i));
}
}
}
Program wypisze następujące wyniki:
Task 1 1
Task 2 1
Task 1 2
Task 2 2
Task 1 3
Task 2 3
Task 1 4
Task 2 4
Task 3 1
Task 4 1
Task 3 2
Task 4 2
Task 3 3
Task 4 3
Task 3 4
Task 4 4
Zwróćmy uwagę, że pula wątków jest ograniczona do
dwóch. Zatem najpierw wspólbieżnie działają dwa pierwsze
zadania, a po nich - trzecie i czwarte.
Gdy nasze zadania zakończą się, Wykonawca nadal "działa" i jest gotowy do przyjmowania nowych zadań.
Zamknięcie Wykonawcy oznacza, iż nie będzie on już przyjmował
nowych zadań do wykonania; jednak przekazane mu wcześniej i jeszcze nie
zakończone - będzie wykonywał.
Usługę zamknięcia dostarcza interfejs ExecutorService, który jest rozszerzeniem interfejsu Executor.
Metody fabryczne klasy Executors zwracają Wykonawców implementujących ExecutorService
Zobaczmy.
public static void main(String[] args) {
ExecutorService exec = Executors.newFixedThreadPool(2);
for (int i=1; i<=4; i++) {
exec.execute(new Task("Task " + i));
}
Thread.yield();
exec.shutdown();
try {
exec.execute(new Task("Task after shutdown"));
} catch (RejectedExecutionException exc) {
exc.printStackTrace();
}
try {
exec.awaitTermination(5, TimeUnit.SECONDS);
} catch(InterruptedException exc) { exc.printStackTrace(); }
System.out.println("Terminated: " + exec.isTerminated());
}
.......
Task 3 4
Task 4 4
Terminated: true
java.util.concurrent.RejectedExecutionException
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:1477)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:384)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:856)
at exec0.Wykonawca.main(Wykonawca.java:34)
Ponieważ metoda shutdown() zamyka ExecutorService, zadanie "Task after
shutdown" nie zostanie uruchomione (powstanie wyjątek
RejectedExecutionException; ten wyjątek może powstawać również
wtedy, gdy ExecutorService z innych powodów niż zamknięcie
odmawia wykonania zadania).
TimeUnit pozwala na lepszą i wygodniejszą granulację czasu - wszędzie
tam gdzie wchodzą w grę opóźnienia i oczekiwania.. Są dostępne
stałe, określające wybór jednostek:
- TimeUnit.SECONDS
- TimeUnit.MICROSECONDS
- TimeUnit.MILLISECONDS
- TimeUnit.NANOSECONDS
Na końcu programu - za pomocą metody awaitTermination(...) wstrzymujemy
bieżący wątek dopóki Wykonawca nie zakończy wszystkich zadań
(albo dopóki nie minie 5 sekund lub też nie wystąpi przerwanie
bieżącego wątku za pomocą metody interrupt). Warto stosować metodę
awaitTermination(), kiedy chcemy mieć pewność, że Wykonawaca
naprawdę zakończył działanie i wyczyścił wszystkie swoje zajęte zasoby
(np. bez tego nasz głowny wątek może sie skończyć wcześniej niż
Wykonawcy i aplikacja nie zakończy działania).
ExecutorService dostarcza także metody shutdownNow(),
która ma za zadanie zakończyć działanie wszystkich aktualnie
wykonujących się zadań (wątków) i zamknąć Wykonawcę.
Spróbujmy jej użyć w naszym kodzie.
Niech pętla w run() wykonuje się w nieskończoność np. for (byte i=1; i<=128; i++)
W metodzie main - w miejsce shutdown() wstawmy kod:
try {
Thread.sleep(1000);
} catch(Exception exc) {}
exec.shutdownNow();
Co się stanie? Nic. Dwa zadania (wątki) będą działać w nieskończoność (i nie zostaną zatrzymane).
Dlaczego?
4. Kończenie zadań przez interrupt()
W poprzednim przykładzie zadania dzialały w nieskończoność gdyż
metoda shutDownNow() kończy działające zadania poprzez użycie metody
interrupt() wobec odpowiednich wątków.
Metoda
interrupt() ustala jedynie status wątku jako przerwany, a zakończenie
pracy wątku odbywa się zawsze przez zakończenie jego kodu.
Trzeba zatem zmodyfikować kod metody run() np tak.
public void run() {
for (byte i=1; i <= 128 ; i++) {
if (Thread.currentThread().isInterrupted()) return;
System.out.println(name + " " + i);
Thread.yield();
}
}
Teraz wątki zostaną zatrzymane i zadania zakończone.
Pamiętać należy, że w sytuacjach gdy wątek jest uśpiony lub
zablokowany z możliwością przerwania blokady (sleep, wait i jego
odpowiedniki w java.util.concurrent, przerywalne synchronizatory
(interruptible locks), przerywalne operacje we-wy (interruptible
channels)) kończenie zadań - czyli wywołanie interrupt() - powoduje
zgłoszenie wyjątku InterruptedException i w obsłudze tego wyjątku
należy zakończyć wykonanie kodu wątku.
Możliwość wykonywania kodów
jako zadań obsługiwanych przez Wykonawców powoduje, że
poczynając od Javy 1.5 kończenie pracy wątków musimy zapewniać
poprzez sprawdzanie stanu INTERRUPTED oraz obsługę wyjątku
InterruptedException
Przykład.
class Interruptible {
Lock lock = new ReentrantLock();
Runnable task1 = new Runnable() {
public void run() {
System.out.println("Task 1 begins");
try {
lock.tryLock(1000, TimeUnit.SECONDS); // próba zamknięcia rygla (czeka na wolny rygiel lub 1000 sekund)
System.out.println("Task 1 entered");
} catch(InterruptedException exc) {
System.out.println("Task 1 interrupted");
}
System.out.println("Task 1 stopped");
}
};
Runnable task2 = new Runnable() {
public void run() {
System.out.println("Task 2 begins");
for (int i=1; i <= 600; i++) {
if (Thread.currentThread().isInterrupted()) break;
// jakieś obliczenie
if (Thread.currentThread().isInterrupted()) break; // chcemy przerwać możliwie najszybciej
try { // sleep() jest przerywane pzrez interrupt()!
Thread.sleep(1000);
} catch (InterruptedException exc) { break; }
}
System.out.println("Task 2 stopped");
}
};
Runnable task3 = new Runnable() {
Scanner scan = new Scanner( // musimy miec InterruptibleChannel, aby móc przerwać czekanie na wejściu
new FileInputStream(FileDescriptor.in).getChannel(), "Cp852");
public void run() {
System.out.println("Task 3 begins");
System.out.print(">>");
while (scan.hasNextLine()) {
try {
String s = scan.nextLine();
System.out.print('\n'+s + "\n>>");
} catch (Exception exc) {
// Uwaga: scanner nie zgłasza wyjątków, ale przerywa dzialanie
exc.printStackTrace();
break;
}
}
System.out.println("Task 3 stopped - " + scan.ioException()); // jaki wyjątek go przerwał?
}
};
Interruptible() {
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(new Runnable() { // wątek zamyka rygiel
public void run() {
lock.lock();
}
}
);
exec.execute(task1);
exec.execute(task2);
exec.execute(task3);
JOptionPane.showMessageDialog(null, "Press Ok to stop all tasks");
exec.shutdownNow();
}
}
Gdy uruchomimy program (z metody main(), może w innej klasie) przez:
new Interruptible();
otrzymamy następujący wynik:
Task 1 begins
Task 2 begins
Task 3 begins
>>ala ma kota
^Z
ala ma kota
>>
............................ Teraz wciskamy Ok w dialogu
Task 1 interrupted
Task 1 stopped
Task 2 stopped
Task 3 stopped - java.nio.channels.ClosedByInterruptException
5. Odbiór wyników zadań. Zadania "z prawdziwego zdarzenia".
Wróćmy teraz do innego, poruszonego wcześniej, zagadnienia:
w
jaki sposób zapewnić, aby zadania mogły zwracać wyniki, ew.
sygnalizować (różne) wyjątki i jak odbierać te wyniki od nich?
Odpowiedzą na pierwszą część pytania jest interfejs Callable.
Ma on następującą postać.
public interface Callable<V> {
V call() throws Exception;
}
W stosunku do interfejsu Runnable są tu dwa - ważne - udogodnienia.
- zwrot wyniku,
- możliwość zgłoszenia wyjątku kontrolowanego.
Zarówno Callable, jak i Runnable określają kod do
(ewentualnie współbieżnego) wykonania. To są jak gdyby dwie
różne wersje definiowania zadania do wykonania.
Można (wstępnie) powiedzieć tak - mamy dwa rodzaje zadań.
Takie które zwracają wyniki i takie które nie zwracają wyników.
To że kod zadania zwraca wynik jest realizowane przez implementację
Callable. Ale samo Callable (które zwraca wynik) nie wystarczy,
aby można było ten wynik łatwo uzyskać. Zadanie jest uruchamiane
przez Wykonawcę i nie zawsze wiemy kiedy zostanie uruchomione. Wykonuje
się asynchronicznie, zatem nie wiemy kiedy wynik będzie gotowy.
Dlatego wprowadzono interfejs Future<V>, który - jak napisano w dokumentacji - reprezentuje wynik asynchronicznych obliczeń i zawiera następujące metody.
boolean |
cancel(boolean mayInterruptIfRunning)
Próbuje
anulować wykonanie zadania (argument mówi o tym, czy można
przerwać wykonujące się zadanie). Nie wykonujące się (jeszcze) zadania
są usuwane z listy zadań Wykonawcy. |
V |
get()
Pobiera wynik
zadania, jeśli zadanie się nie zakończyło - czeka (blokuje).
Oczekiwanie może być przerwane przez CancellationException (zadanie
anulowane), ExecutionException (kod zadania zgłosił wyjątek),
InterruptedException (metoda interrupt() wobec wątku, w którym
wykonywane jest zadanie).
Wyjątki Execution i Interrupoted muszą być obsługiwane. |
V |
get(long timeout,
TimeUnit unit)
Pobiera wynik
zadania, jeśli zadanie się nie zakończyło - czeka (blokuje), ale nie
dłużej niż podany czas. Wyjątki jw. + TimeoutException |
boolean |
isCancelled()
Czy anulowane?
|
boolean |
isDone()
Czy zakończone
(w dowolny sposób, również przez anulowanie)? |
Tak naprawdę Future opisuje nie tylko metody dotyczące wyników
zadania, ale również wykonania zadania. Metoda cancel() -
dotyczy przecież wykonania zadaniu.
Zadania, które zwracają wyniki muszą więc mieć nie
tylko kod podany jako Callable, ale również muszą być
Future.
Ale to znowu nie wystarczy!
Zadania są uruchamiane przez Wykonawców w zarządzanych przez nich wątkach.
Zatem zawsze zadania muszą być Runnable.
Zwróćmy uwagę: w metodzie execute() Wykonawcy można podać tylko Runnable.
Dlaczego zatem Callable też możemy traktować jako kod zadania do
wykonania? A skoro możemy, to w jaki sposób ten kod jest uruchamiany
przez metodę execute()?
Wróćmy na chwilę do naszych poprzednich przykładów (gdy "zadaniami" były zwykłe Runnable).
Zauważmy, to nie były "pełnokrwiste" zadania.
"Prawdziwe" zadanie musi spełniać następujące warunki:
- zadanie musi pozwalać na asynchroniczne wykonanie w odrębnym wątku (a więc w obecnej Javie być Runnable),
- musi istnieć możliwość traktowania zadania do wykonania
odrębnie od sposobu i czasu jego uruchamiania w jakimś wątku (separacja
przez Wykonawców),
- musimy mieć możliwość łatwego uzyskania informacji o wyniku
zadania (a wynikiem może być nie tylko konkretna wartość, ale
również informacja o tym "czy wykonanie kodu zakończyło się
normalnie", "czy powstał wyjątek w trakcie wykonania?", "czy
zadanie zostało anulowane - przerwane w trakcie wykonania lub zakończone
przez odwołanie przyszłego wykonania?")
- musimy mieć możliwość, bez konieczności operowania na wątkach -
anulowania zadania (w tym przerwania już wykonującego się zadania).
Połączenie czystego Runnable z execute(...) Wykonawców spełnia tylko dwa pierwsze warunki.
Odpytywanie o wyniki zadania (a także możliwość anulowania-przerywania zadań) wymaga, by zadanie było Future.
Po to, by móc obsługiwać wyjątki wykonania (nawet dla
kodów, które nie zwracają wyników) - kod
zadania musi być Callable.
Zadanie jest obiektem implementującym interfejsy Runnable i Future, a
jego kod musi być zawarty w metodzie call() klasy implementującej
interfejs Callable
Co nie oznacza, że ze zwykłego Runnable nie możemy stworzyć zadania!
W Javie implementacja pojęcia zadania zrealizowana jest jako klasa FutureTask.
/*
* @since 1.5
* @author Doug Lea
*/
public class FutureTask<V> implements Future<V>, Runnable {
// .....
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
sync = new Sync(callable);
}
public FutureTask(Runnable runnable, V result) {
sync = new Sync(Executors.callable(runnable, result));
}
// .....
private final class Sync extends AbstractQueuedSynchronizer {
// .....
private final Callable<V> callable;
// .....
Sync(Callable<V> callable) {
this.callable = callable;
}
// .....
}
}
Fragment na podstawie żródeł klasy FutureTask (SunMicrosystem 2005).
A zatem tworząc zadanie możemy podać w konstruktorze Callable. Jak widać,
to Callable zostanie użyte przez kompozycję w wewnętrznej klasie
Sync.
Możemy też podać zwykłe Runnable (z dodatkowym argumentem ustalającym
jego wynik). Jak widać, w klasie Executors mamy statyczne metody
"zamieniające" Runnable na Callable (w FutureTask brakuje konstruktora
w postaci: FutureTask(Runnable), który mógłby
skorzystać ze statycznej metody klasy Executors
Callable<Object> callable(Runnable), która po prosto
przyjmuje, że wynikiem call z wynikowego Callable jest null).
No i teraz możemy zrobić tak:
Runnable r = new Runnable() {
public void run() {
// ...
}
};
Callable<String> c = new Callable<String>() {
public String call() throws Exception {
String result;
// ...
return result;
}
};
FutureTask<Boolean> task1 = new FutureTask<Boolean>(r, true);
FutureTask<Object> task2 = new FutureTask<?>(Executors.callable(r));
FutureTask<String> task3 = new FutureTask<String>(c);
Executor exec = Executors.new....();
exec.execute(task1);
exec.execute(task1);
exec.execute(task2);
Co więcej, wcale nie trzeba tworzyć samodzielnie obiektów FutureTask.
Interfejs ExecutorService zawiera trzy metody submit(...), które tworzą - i zwracają - dla nas Future.
|
submit(Callable<T> task)
Submits a value-returning task for execution and returns a Future
representing the pending results of the task. |
Future<?> |
submit(Runnable task)
Submits a Runnable task for execution and returns a Future
representing that task. |
|
submit(Runnable task,
T result)
Submits a Runnable task for execution and returns a Future
representing that task that will upon completion return
the given result |
Czyli w poprzednim przykładzie moglibyśmy napisać:
ExecutorService exec = Executors.new....();
Future<Boolean> future1 = exec.submit(task1, true);
Future<Object> future2 = exec.submit(task1);
Future<String> future3 = exec.submit(task2);
Konkretna implementacja submit w klasie AbstractExecutorService
(którą dziedziczy ThreadPoolExecutor zwracany przez
Executors.new....() wygląda mniej więcej tak:
/*
* @author Doug Lea
*/
public abstract class AbstractExecutorService implements ExecutorService {
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
FutureTask<Object> ftask = new FutureTask<Object>(task, null);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
FutureTask<T> ftask = new FutureTask<T>(task, result);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
FutureTask<T> ftask = new FutureTask<T>(task);
execute(ftask);
return ftask;
}
// ...
}
Fragment na podstawie żródeł klasy FutureTask (SunMicrosystem 2005).
Rozważmy przykład:
import javax.swing.*;
import java.awt.event.*;
import java.util.List;
import java.util.concurrent.*;
import java.lang.reflect.*;
public class Exec1 extends JFrame implements ActionListener {
int k = 0;
int n = 15;
JTextArea ta = new JTextArea(40,20);
Exec1() {
add(new JScrollPane(ta));
JPanel p = new JPanel();
JButton b = new JButton("Start");
b.addActionListener(this);
p.add(b);
b = new JButton("Stop current");
b.setActionCommand("Stop");
b.addActionListener(this);
p.add(b);
b = new JButton("Curent result");
b.setActionCommand("Result");
b.addActionListener(this);
p.add(b);
b = new JButton("Shutdown");
b.addActionListener(this);
p.add(b);
b = new JButton("ShutdownNow");
b.addActionListener(this);
p.add(b);
add(p, "South");
setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
pack();
setVisible(true);
}
public void actionPerformed(ActionEvent e) {
String cmd = e.getActionCommand();
try {
Method m = this.getClass().getDeclaredMethod("task"+cmd);
m.invoke(this);
} catch(Exception exc) { exc.printStackTrace(); }
}
class SumTask implements Callable<Integer> {
private int taskNum,
limit;
public SumTask(int taskNum, int limit) {
this.taskNum = taskNum;
this.limit = limit;
}
public Integer call() throws Exception {
int sum = 0;
for (int i = 1; i <= limit; i++) {
if (Thread.currentThread().isInterrupted()) return null;
sum+=i;
ta.append("Task " + taskNum + " part result = " + sum + '\n');
Thread.sleep(1000);
}
return sum;
}
};
Future<Integer> task;
//ExecutorService exec = Executors.newSingleThreadExecutor();
ExecutorService exec = Executors.newFixedThreadPool(3);
public void taskStart() {
try {
task = exec.submit(new SumTask(++k, 15));
} catch(RejectedExecutionException exc) {
ta.append("Execution rejected\n");
return;
}
ta.append("Task " + k + " submitted\n");
}
public void taskResult() {
String msg = "";
if (task.isCancelled()) msg = "Task cancelled.";
else if (task.isDone()) {
try {
msg = "Ready. Result = " + task.get();
} catch(Exception exc) {
msg = exc.getMessage();
}
}
else msg = "Task is running or waiting for execution";
JOptionPane.showMessageDialog(null, msg);
}
public void taskStop() {
task.cancel(true);
}
public void taskShutdown() {
exec.shutdown();
ta.append("Executor shutdown\n");
}
public void taskShutdownNow() {
List<Runnable> awaiting = exec.shutdownNow();
ta.append("Eeecutor shutdown now - awaiting tasks:\n");
for (Runnable r : awaiting) {
ta.append(r.getClass().getName()+'\n');
}
}
public static void main(String[] args) {
new Exec1();
}
}
Uwagi:
- warto zaobserwować, że w call() nie musimy obsługiwać wyjątku InterruptedException przy sleep (throws Exception)
- jednak pisząc kody zadań nie możemy pozbyć się myślenia w kategoriach wątków (isInterrupted(), sleep)
- shutdownNow zwraca listę zadań
oczekujących na wykonanie w momencie zamknięcia serwisu - to są
ogólnie Runnable, ale ponieważ zostały przekazane do wykonania jako FutureTasks - prawdziwą klasą jest FutureTask.
6. Obsługa wyników zadań za pomocą odwołań zwrotnych
Bezpośrednia konstrukcja (za pomocą konstruktorów klasy) zadań
jako FutureTask ma pewną zaletę wobec submit(Callable) czy
submit(Runnable): w łatwy sposób możemy dostarczać odwołań zwrotnych (callback) reagujących na zakończenie zadania.
Klasa FutureTask dostarcza bowiem chronionej metody done(), która jest wywoływana po zakończeniu zadania.
Można zrobić nawet tak:
Callable<Integer> callable = ...;
ExecutorService exec = Executors.new...();
exec.execute( new FutureTask<Integer>(callable) {
public void done() {
// kod będzie wywołany, gdy zadanie się zakończy lub zostanie anulowane
}
});
Przykład:
import javax.swing.*;
import java.awt.*;
import java.awt.event.*;
import java.util.concurrent.*;
class FutureTaskCallback<V> extends FutureTask<V> {
public FutureTaskCallback(Callable<V> callable) {
super(callable);
}
public void done() {
String result = "Wynik: ";
if (isCancelled()) result += "Cancelled.";
else try {
result += get();
} catch(Exception exc) {
result += exc.toString();
}
JOptionPane.showMessageDialog(null, result);
}
}
public class SimpleCallback extends JFrame {
JTextField input = new JTextField(40),
progress = new JTextField(40);
String toReverse;
Callable<String> reverseTask = new Callable<String>() {
public String call() throws Exception {
Thread t = Thread.currentThread();
if (toReverse == null || toReverse.trim().equals(""))
throw new IllegalArgumentException("Set string to reverse");
if (t.isInterrupted()) return null;
char[] org = toReverse.toCharArray();
StringBuffer out = new StringBuffer();
if (t.isInterrupted()) return null;
for (int i = org.length-1; i>=0; i--) {
Thread.sleep(500);
out.append(org[i]);
if (t.isInterrupted()) return null;
progress.setText(out.toString());
if (t.isInterrupted()) return null;
}
return out.toString();
}
};
ExecutorService exec = Executors.newSingleThreadExecutor();
FutureTaskCallback<String> ft;
public SimpleCallback() {
Font f = new Font("Dialog", Font.PLAIN, 16);
input.setFont(f);
progress.setFont(f);
JPanel p = new JPanel();
JButton b = new JButton("Start");
b.addActionListener(new ActionListener() {
public void actionPerformed(ActionEvent e) {
ft = new FutureTaskCallback<String>(reverseTask);
toReverse = input.getText();
exec.execute(ft);
}
});
p.add(b);
b = new JButton("Stop");
b.addActionListener(new ActionListener() {
public void actionPerformed(ActionEvent e) {
if (ft != null) ft.cancel(true);
}
});
p.add(b);
Container cp = getContentPane();
cp.setLayout(new BoxLayout(cp, BoxLayout.Y_AXIS));
add(input);
add(p);
add(progress);
setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
pack();
setVisible(true);
}
public static void main(String[] args) {
new SimpleCallback();
}
}
Zobacz demo programu.
Rozważmy nieco bardziej uniwersalny i zaawansowany przykład
dostarczenia asynchronicznej obsługi wyników zadań.
Umówimy się, że metoda obsługi (dowolna metoda dowolnej klasy)
otrzymuje jako argumenty - wynik zadania oraz ew. wyjątek. Jeśli
wyjątek nie powstał (nie było ani CancellationException, ani
ExecutionException) - to ten drugi argument ma wartość null. Jeśli zaś powstał - to piewrszy argument ma wartość null.
Klasa Ftask dziedziczy FutureTask i przedefiniowuje metodę done w której wywołuje metodę obsługi.
import java.lang.reflect.Method;
import java.util.concurrent.*;
public class Ftask<V> extends FutureTask<V> {
private Method handlerMethod;
private Object handlerObject;
public Ftask(Callable<V> callable, Object handler, String mname) throws Exception {
super(callable);
handlerObject = handler;
handlerMethod = handler.getClass().getDeclaredMethod(mname, Object.class, Exception.class);
}
public void done() {
V result = null;
try {
result = (V) this.get();
} catch(Exception exc) {
try {
handlerMethod.invoke(handlerObject, null, exc);
} catch(Exception ex) {
ex.printStackTrace();
}
return;
}
try {
handlerMethod.invoke(handlerObject, result, null);
} catch(Exception exc) {
exc.printStackTrace();
}
}
Wprowadzimy też klasę abstrakcyjnego zadania, która - dla
konkretnych instancjacji poprzez dziedziczenie - wymaga tylko
implementacji metody call():
public abstract class AbstractTask<V> implements Callable<V> {
private String name;
private FutureTask<V> task;
public AbstractTask(String name, Object resultHandler, String handlerMethodName) throws Exception {
this.name = name;
task = new Ftask<V>(this, resultHandler, handlerMethodName);
}
public FutureTask<V> getTask() { return task; }
public String getName() { return name; }
}
Teraz możemy ich użyć np. tak:
class ResultHandler<V> {
int x = 400, y = 50;
public void handleResult(V result, Exception exc) {
String msg;
if (exc != null) msg = exc.toString();
else msg = "Wynik = " + result;
JFrame f = new JFrame("Task results");
JLabel lab = new JLabel(" " + msg);
f.add(lab);
f.setBounds(x, y, 300, 100);
f.setDefaultCloseOperation(JFrame.DISPOSE_ON_CLOSE);
x+=50;
y+=50;
f.setVisible(true);
}
}
class SumTaskA extends AbstractTask<Integer> {
public SumTaskA(String taskName, ResultHandler h, String handlerMethod )
throws Exception
{
super(taskName, h, handlerMethod);
}
public Integer call() throws Exception {
Future<Integer> task = this.getTask();
int sum = 0;
if (task.isCancelled()) return null;
for (int i = 1; i <= 10; i++) {
if (task.isCancelled()) break;
sum+=i;
append(getName() + " part result = " + sum + '\n');
Thread.sleep(1000);
}
return sum;
}
};
ExecutorService exec = Executors.newFixedThreadPool(3);
ResultHandler<Integer> handler = new ResultHandler<Integer>();
SumTaskA task = new SumTaskA("Task 1", handler, "handleResults")
exec.execute(task.getTask());
Zwrócmy uwagę, że wprowadzając AbstractTask (jako implementującą
Callable i zawierającą Future przez kompozycję) uzyskaliśmy możliwość
dostarczania takich implementacji metody call(), w której
rozumowanie w kategoriach wątków ograniczone jest do minimum,
np. zamiast pobierać bieżący wątek i sprawdzać czy jest Interrupted -
możemy zastosować metodę isCancelled().
Uwaga:
FutureTask może znajdować się w czterech stanach: inicjalnym
(0), RUNNING (1) , RAN (2), CANCELLED(4). Po zakończeniu znajduje się w
stanie RAN lub CANCELLED i nie może być ponownie uruchomione przez tego
samego Wykonawcę.
Przykład:
import java.util.List;
import java.util.concurrent.*;
public class RepeatedTaskTest {
FutureTask<Object> task = new FutureTask<Object>( new Callable<Object>() {
public Object call() {
for (int i= 1; i <= 3; i++) {
System.out.println(i);
}
return null;
}
}) {
public void done() {
System.out.println("Done.");
}
};
public RepeatedTaskTest() {
ExecutorService exec = Executors.newSingleThreadExecutor();
System.out.println("Starting first task");
exec.execute(task);
try {
System.out.println("Starting second task");
exec.execute(task);
Thread.sleep(5000);
} catch(Exception exc) {
exc.printStackTrace();
}
System.out.println("Executor shutdown now");
List<Runnable> awaiting = exec.shutdownNow();
for (Runnable r : awaiting) {
System.out.println(r.getClass().getName()+'\n');
}
try {
exec.awaitTermination(5, TimeUnit.SECONDS);
} catch(InterruptedException exc) { exc.printStackTrace(); }
}
public static void main(String[] args) {
new RepeatedTaskTest();
}
Program wyprowadzi:
Starting first task
Starting second task
1
2
3
Done.
Executor shutdown now
Drugie zadanie w ogóle się nie wykonało i nie powstał żaden wyjątek!
Logika jest taka - jest tylko jedno zadanie. Ono się wykonało.
Wykonawca nie wykonuje już wykonanych zadań. Wynik jest gotowy - cały
czas możemy po niego sięgać. Nie ma żadnego wyjątku, bo wszystko jest w
porządku - zadanie zostało wykonane.
7. Wykonawcy a pule wątków
Przyjrzyjmy się teraz bliżej Wykonawcom. Ich klasy dostarczają dość rozbudowanych możliwości.
Interfejs ExecutorService zawiera kilku dodatkowych - nie znanych nam
jeszcze - metod, pod hasłem "przekazywania do wykonania kolekcji
zadań i uzyskiwania dostępu do ich wyników":
|
invokeAll(Collection<Callable<T>> tasks)
Executes the given tasks, returning a list of Futures holding
their status and results when all complete. |
|
invokeAll(Collection<Callable<T>> tasks,
long timeout,
TimeUnit unit)
Executes the given tasks, returning a list of Futures holding
their status and results
when all complete or the timeout expires, whichever happens first. |
|
invokeAny(Collection<Callable<T>> tasks)
Executes the given tasks, returning the result
of one that has completed successfully (i.e., without throwing
an exception), if any do. Pozostałe zadania są anulowane. |
|
invokeAny(Collection<Callable<T>> tasks,
long timeout,
TimeUnit unit)
Executes the given tasks, returning the result
of one that has completed successfully (i.e., without throwing
an exception), if any do before the given timeout elapses. Pozostałę zadania są anulowane. |
Zwrócmy uwagę, że te odwołania są blokujące - czekają na zakończenie zadań (normalne - lub przez wyjątek).
Przykład:
import java.util.concurrent.*;
import java.util.*;
class Eval implements Callable<Integer> {
Integer num;
public Eval(int n) {
num = n;
}
public Integer call() throws Exception {
Thread.sleep(1000);
return num;
}
}
public class InvokeTest {
public static int sum(ExecutorService exec, List<Callable<Integer>> tasks) throws Exception {
long start = System.currentTimeMillis();
System.out.println("Start");
List<Future<Integer>> results = exec.invokeAll(tasks); // InterruptedException
long elapsed = System.currentTimeMillis() - start;
System.out.println("End after " + elapsed/1000 + " sec.");
int sum = 0;
for (Future<Integer> r : results) sum += r.get();
return sum;
}
public static void main(String[] args) {
List<Callable<Integer>> taskList = new ArrayList<Callable<Integer>>();
ExecutorService exec = Executors.newFixedThreadPool(10);
for (int i=1; i <=5; i++) {
Callable<Integer> task = new Eval(i);
taskList.add(task);
}
try {
int result = sum(exec, taskList);
System.out.println("Wynik: " + result);
} catch(Exception exc) { exc.printStackTrace(); }
}
}
Ale czym naprawdę są Wykonawcy?
Metody fabryczne klasy Executors tworzą dla nas wykonawcaów jako obiekty klasy ThreadPoolExecutor.
Klasa zajmuje się dynamicznym tworzeniem wątków (które
będą przydzielane do wykonania zadań) i prowadzeniem ich puli -
zsynchronizowanej, blokującej kolejki wątków.
Pula wątków charakteryzuje się następującymi parametrami:
-
poolCoreSize,
-
poolMaximumSize,
-
keepAliveTime,
-
threadFactory,
-
workQueue
które można ustalać w konstruktorze, pobierać za pomocą metod
get.. i (niektóre) zmieniać w trakcie działania programu za
pomocą metod set...
Znaczenie tych parametrów jest następujące.
Niech n oznacza aktualną liczbę wątków w puli (niektóre
mogą być nieaktywne) i niech zlecane jest nowe zadanie do wykonania
(exec()).
- gdy n < poolCoreSize, to jest tworzony nowy wątek, nawet wtedy, gdy w puli są nieaktywne wątki (a zatem zadanie,
- gdy poolCoreSize < n < maximumPoolSize, to nowy wątek jest
tworzony tylko wtedy, gdy kolejka zadań jest pełna, jeśli nie jest
wypełniona, to zadanie jest dodawane do kolejki, a wątek nie jest
tworzony (bo być może będzie można ponownie użyć innego),
- wątki od 1 do poolCoreSize są normalnie tworzone tylko "na
życzenie" (gdy przychodzi zadanie), można jednak utworzyć te
wątki metodą prestartCoreThread lub prestartAllCoreThreads,
- keepAliveTime określa okres czasu utrzymywania w puli
nieaktywnych wątków: nieaktywne wątki są usuwane z puli, ale
tylko wtedy, gdy n > corePoolSize (w przeciwnym razie nieaktywne nie
są usuwane),
- nowe wątki tworzone są metodami fabrycznymi ThreadFactory; normalnie DefaultThreadFactory, ale można podać własną,
- workQuee jest używane jako kolejka zadań do wykonania (czekających!) i musi implementować interfejs BlockingQueue<Runnable>; standardowo jest to klasa LinkedBlockingQueue<Runnable>.
Co zwracają fabryczne metody klasy Executors?
Widać to wyraźnie na wyciągu z kodu źródłowego:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
public static ExecutorService newSingleThreadExecutor() {
return new DelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new DelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
Źródło: kod JDK, Sun Microsystem
Przykład:
import java.awt.*;
import java.awt.event.*;
import java.util.concurrent.*;
import javax.swing.*;
import java.lang.reflect.*;
class Code implements Callable<Integer> {
String name;
int num;
public Code(String n, int sleep) {
name = n;
num = sleep;
}
public Integer call() throws Exception {
Thread.sleep(num*1000);
return num;
}
public String getName() { return name; }
}
class Task extends FutureTask<Integer> {
Code code;
public Task(Code c) {
super(c);
code = c;
}
public String getName() {
return code.getName();
}
}
public class ThreadPoolMethods extends JFrame implements ActionListener {
ThreadPoolExecutor exec = (ThreadPoolExecutor) Executors.newFixedThreadPool(2);
public ThreadPoolMethods() {
setLayout(new FlowLayout());
JButton b = new JButton("Start");
b.addActionListener(this);
add(b);
b = new JButton("Show");
b.addActionListener(this);
add(b);
pack();
setVisible(true);
}
public void actionPerformed(ActionEvent e) {
String cmd = e.getActionCommand();
if (cmd.equals("Start")) {
int sleep = 5, step = 3;
for (int k=1; k <= 10; k++) {
sleep += step;
exec.execute(new Task(new Code("Task " + k, sleep)));
}
System.out.println("All tasks submitted");
}
else {
try {
printStat();
} catch(Exception exc) { exc.printStackTrace(); }
BlockingQueue<Runnable> que = exec.getQueue();
System.out.println("Que size " + que.size());
for (Runnable r : que) {
Task t = (Task) r;
System.out.println(t.getName());
}
}
}
void printStat() throws Exception {
Method[] mets = ThreadPoolExecutor.class.getDeclaredMethods();
for (Method m : mets) {
String name = m.getName();
if (name.startsWith("get")) {
Object res = null;
try {
res = m.invoke(exec);
} catch(Exception exc) {
System.out.println("Unable to call " + name);
}
System.out.println(name + " = " + res);
}
}
}
public static void main(String[] args) {
new ThreadPoolMethods();
}
}
Wydruk:
All tasks submitted
getQueue = [exec3.Task@ee22f7, exec3.Task@39ab89, exec3.Task@2cb49d,
exec3.Task@105d88a, exec3.Task@cb6009, exec3.Task@e28b9,
exec3.Task@193a66f, exec3.Task@93d6bc]
getActiveCount = 2
getCompletedTaskCount = 0
getCorePoolSize = 2
Unable to call getKeepAliveTime
getKeepAliveTime = null
getLargestPoolSize = 2
getMaximumPoolSize = 2
getPoolSize = 2
getRejectedExecutionHandler = java.util.concurrent.ThreadPoolExecutor$AbortPolicy@be0e27
Unable to call getTask
getTask = null
getTaskCount = 10
getThreadFactory = java.util.concurrent.Executors$DefaultThreadFactory@193385d
Que size 8
Task 3
Task 4
Task 5
Task 6
Task 7
Task 8
Task 9
Task 10
i po kilku sekundach
getQueue = [exec3.Task@2cb49d, exec3.Task@105d88a, exec3.Task@cb6009, exec3.Task@e28b9, exec3.Task@193a66f, exec3.Task@93d6bc]
getActiveCount = 2
getCompletedTaskCount = 2
getCorePoolSize = 2
Unable to call getKeepAliveTime
getKeepAliveTime = null
getLargestPoolSize = 2
getMaximumPoolSize = 2
getPoolSize = 2
getRejectedExecutionHandler = java.util.concurrent.ThreadPoolExecutor$AbortPolicy@be0e27
Unable to call getTask
getTask = null
getTaskCount = 10
getThreadFactory = java.util.concurrent.Executors$DefaultThreadFactory@193385d
Que size 6
Task 5
Task 6
Task 7
Task 8
Task 9
Task 10
W pakiecie java.util.concurrent dostępne są też klasy
wykonawców, zapewniających wykonanie zadań w sposób
rytmiczny bądź z podanym opóźnieniem.
Proszę zobaczyć dokumentację.
8. Podsumowanie
W podsumowaniu - rysunek, pokazujący powiązania pomiędzy klasami i interfejsami.
I niektóre wnioski:
- Callable nie jest Runnable
- nie należy o nim myśleć jako o mającym związek z Runnable, jest
raczej jego zamiennikiem - w tym sensie, że to takie "Runnable co ma
wynik",
- zadanie (task) może być Callable lub Runnable (ale w przypadku Callable to jest kompozycja!),
- Future (w szczególności FutureTask, które jest Runnable!) możemy: odpytywać o stan wykonania, zabijać lub też pobierać wynik,
- wykonawcom można podać Callable, ale i Runnable (i w szczególności FutureTask, co daje nam w każdym przypadku kolekcję w pełni zarządzanych zadań!),
- jeszcze raz: FutureTask jest Runnable, a w związku z tym może być argumentem metody submit Execut-orów,
- FutureTask po wykonaniu (lub przerwaniu) woła metodę done(). Dobre miejsce na handlery wyników!
- ThreadPool
ExecutorService po otrzymaniu zadań do wykonania przez submit (nie ważne czy
Callable czy Runnable) zwraca FutureTask. Bez specjalnego
tworzenia (obiektów typu Future) może tu być i cancel i
odpytywanie o koniec zadania!
9. Zadania
Zadanie 1.
Napisać program, w którym uruchamiane zadania pokazywane są
na liście. Zadania z listy możemy odpytywac o ich stan, anulować,
pokazywac ich wyniki, gdy są gotowe itp.
Zadanie 2.
Stworzyć mechanizm obsługi wyników zadań, działający na
zasadzie słuchaczy. Przetestować na przykładach.
Zadanie 3.
Przetestowac działanie metod scheduledExecutor....
Zadanie 4.
Napisać w sposób właściwy serwer wielowątkowy TCP/IP.
Obsługę zleceń zrealizowac w postaci FutureTask. Zadbać o
możliwość przerywania obsługi w każdym momencie.
Zadanie 5.
Stworzyć klasę ponawialnych zadań (takich, które mogą być ponownie wykonane przez Wykonawców).
To jest łatwe, gdy zadanie jest opakowane przez FutureTask i jest
Runnable, a co jeśli jest ono Callable i chcemy odczytywac jego wyniki?