<

8. Współbieżna Java: zadania i wykonawcy


Głównym celem tego (i następnego) wykładu jest przedstawienie nowych środków programowania współbieżnego, dostępnych w pakiecie java.util.concurrent. Rozpoczynamy od przypomnienia podstawowych pojęć, po czym zajmiemy się podstawowymi nowymi koncepcjami "konkurencyjnej Javy".  Tutaj uwagę skupiamy na zadaniach i wykonawcach.


1. Programowanie współbieżne - przypomnienie podstawowych pojęć


Wątek - to sekwencja działań, która może wykonywać się równolegle  z innymi sekwencjami działań w kontekście danego procesu (programu).

W systemach jednoprocesorowych wrażenie równoległości działania wątków osiągane jest przez mechanizm przydzielania czasu procesora poszczególnym wykonującym się wątkom. Każdy wątek uzyskuje dostęp do procesora na krótki czas (kwant czasu), po czym "oddaje procesor" innemu wątkowi. Zmiany są tak szybkie, że powstaje wrażenie równoległości działania.

Najbardziej popularnym mechanizmem "zamiany wątków u procesora" jest tzw. wywłaszczanie (pre-emptive multistasking) - o dostępie wątków do procesora decyduje systemowy zarządca wątków: przydziela on wątkowi kwant czasu procesora, po upłynięciu którego odsuwa wątek od procesora i przydziela kwant czasu procesora innemu wątkowi.

Podstawowa różnica pomiędzy procesami i wątkami polega na tym, że różne wątki w ramach jednego procesu mają dostęp do całego kontekstu tego procesu (m.in. przydzielonych mu zasobów).
Wobec tego zamiana wątków jednego procesu "przy procesorze" jest wykonywana szybciej niż zamiana procesów (wątków różnych procesów).
Z punktu widzenia programisty wspólny dostęp wszystkich wątków jednego procesu do kontekstu tego procesu ma zarówno zalety jak i wady.
Zaletą jest możliwość łatwego dostępu do wspólnych danych programu. Wadą - brak ochrony danych programu przed równoległymi zmianami, dokonywanymi przez różne wątki, co może prowadzić do niespójności danych, a czego unikanie wiąże się z koniecznością synchronizacji działania wątków.

W Javie uruchamianiem wątków i zarządzaniem nimi zajmuje się klasa Thread.

Do wersji 1.5 obowiązywały następujące zasady:

Aby uruchomić wątek należy stworzyć obiekt klasy Thread i  użyć metody start() wobec tego obiektu.

Ale kod, wykonujący się jako wątek (sekwencja działań, wykonująca się równolegle z innymi działaniami programu) określany jest przez obiekt klasy implementującej interfejs Runnable.
Interfejs ten zawiera deklarację metody run(), która przy implementacji musi być zdefiniowana.
Właśnie w metodzie run() zapisujemy kod, który będzie wykonywany jako wątek (równolegle z innymi wątkami programu).

   Metoda run() określa co ma robić wątek.


Klasa Thread implementuje interfejs Runnable (podając "pustą" metodę run).

Stąd pierwszy sposób tworzenia i uruchamiania wątku.

Pierwszy sposób tworzenia i uruchamiania wątku (Java 1.4)
  1. Zdefiniować własną klasę dziedziczącą Thread (np. class Timer extends Thread)
  2. Przedefiniować odziedziczoną metodą run(), podając w niej działania, które ma wykonywać wątek
  3. Stworzyć obiekt naszej klasy (np. Timer timer = new Timer(...);
  4. Wysłać mu komunikat start() (np. timer.start()) 

Jak wiemy, kod wykonywany przez wątek podajemy w metodzie run(). A metoda run() może być zdefiniowana w dowolnej klasie implementującej interfejs Runnable.
Klasa Thread dostarcza zaś konstruktora, którego argument jest  typu Runnable.
Konstruktor ten  tworzy wątek, który będzie wykonywał kod zapisany w metodzie run() w klasie obiektu, do którego referencję przekazano wspomnianemu wyżej konstruktorowi.

Stąd drugi sposób tworzenia i uruchamiania wątków.

Drugi sposób tworzenia i uruchamiania wątku (Java 1.4)
  1. Zdefiniować klasę implementującą interfejs Runnable (np. class X implements Runnable).
  2. Dostarczyć w niej definicji metody run (co ma robić wątek).
  3. Utworzyć obiekt tej klasy (np.  X x = new X(); )
  4.  Utworzyć obiekt klasy Thread, przekazując w konstruktorze referencję do obiektu utworzonego w p.3 (np.Thread thread = new Thread(x);).
  5. Wywołać na rzecz nowoutworzonego obiektu klasy Thread  metodę start ( thread.start();)

Kończenie pracy wątku

Wątek kończy pracę w sposób naturalny wtedy, gdy zakończy się jego metoda run().


Jeśli chcemy programowo zakończyć pracę wątku, to  należy zapewnić w metodzie run() sprawdzenie warunków zakończenia (ustalanych programowo) i jeśli są spełnione - spowodować wyjście z run() albo przez "dobiegnięcie do końca", albo przez return.
Warunki zakończenia mogą być formułowane w postaci wartości jakiejś zmiennej, które są ustalane przez inne fragmenty kodu programu (wykonywane w innym wątku).

Do kończenia pracy wątku możliwe jest także użycie metody interrupt() ale wymaga to odpowiedniego przygotowania kodu w metodzie run(). Jest to właściwy sposób postępowania, jeśli używamy nowych koncepcji (serwisów wykonawców) z pakietu java.util.concurrent.  O tym mowa będzie w dalszej części wykładu.


2. Pakiet java.util.concurrent z lotu ptaka

Java zawsze była przygotowana na współbieżność. Od początku  jej istnienia, w samym języku i jego pakietach, zawarto odpowiednie konstrukcje (synchronize, synchronized,  wait, notify, notifyAll).
Java umożliwiała  pisanie współbieżnych programów, ale nie zawsze w sposób skalowalny.
Czyli wszystko działało dobrze, dopóki o zasoby nie konkurowało zbyt wiele wątków. Przy dużej konkurencji ("high-contention") tylko genialnie napisane programy sprawdzały się i pod względem miarodajności i pod względem efektywności.
Genialne pisanie programu wymaga i dużo wysiłku i zwykle nie wychodzi.
Pakiet java.util.concurrent (opracowany głównie przez Douga Lea i obecny w Javie od wersji 1.5) zmienia tę sytuację. Teraz dość prosto można napisać programy nastawione na konkurencyjność (a więc nie tylko współbieżność, ale i  skalowalną współbieżność). A dodatkowo mamy do dyspozycji nieprzebrane mnóstwo narzędzi do tworzenia całkiem własnych, przygotowanych na  specyficzne sytuacje, rozwiązań problemów współbieżności,

Pakiet java.util.concurrent:


3. Separacja zadań od sposobu ich wykonania


Od początków Javy  programiści mieli wiele kłopotów ze zrozumieniem pojęcia wątku (a szczególnie obiektów klasy Thread). Nawet zaawansowanym programistom wydawało się, że użyteczną pracę wykonuje klasa Thread. Tymczasem  kod wątku zapisywany jest w metodzie run(), a klasa Thread tak naprawdę nic nie robi. To kod metody run() wykonuje się "w wątku" (czyli współbieżnie), choć często mówimy nieco mylnie (potocznie): "wątek się wykonuje".  Tymczasem wcale nie jesteśmy zainteresowani wątkami (obiektami klasy Thread) tylko zadaniami (zapisanymi w metodzie run()), które "poprzez wątki" się wykonują. Chcielibyśmy rozumować raczej w kategoriach zadań do wykonania, a nie technicznych szczegółów sposobu ich wykonania.

I w Javie 1.5 stało się to możliwe.  Zadania "do wykonania" mogą być odseparowane od wątków (od zarządzanie nimi, umartwiania się nimi, śledzenia ich - co w dotychczasowej Javie wcale nie było łatwe, o ile w ogóle możliwe).

Dotychczas było tak:

    Runnable r = ...;
    Thread t = new Thread(r);
    t.start();


to znaczy - trzeba było samemu uruchomić wątek i dalej martwić się o jego los.

Gdy miał wyprodukować jakieś wyniki  trzeba było o nie zadbać  (dość żmudnie  oprogramować ich przejęcie).

Gdy było dużo wątków - pojawiał się duży problem. Serwery odmawiały posłuszeństwa.
Na przykład, przy poniższym kodzie:

import java.net.*;
import java.io.*;

public class SomeServer  {

  private ServerSocket ss = null;
  private volatile boolean serverRunning = true;

  public SomeServer(ServerSocket ss) {
    this.ss = ss;
    System.out.println("Server started");
    serviceConnections();
  }


  private void serviceConnections() {
    while (serverRunning) {
      try {
        final Socket conn = ss.accept();
        System.out.println("Connection established");

        Runnable serviceCode = new Runnable() {
           public void run() {
              serviceRequests(conn);
           }
        }

        new Thread(serviceCode).start();

      } catch (Exception exc) {
          exc.printStackTrace();
      }
    }
    try { ss.close(); } catch (Exception exc) { //... }
  }

//...

}
gdy zglosi się (do serwisu) naraz dużo klientów wpadniemy w kłopoty. (Dlaczego?)

A co się stanie, gdy nasza metoda run() zgłosi wyjątek?
Interfejs Runnable nie przewidział tego w deklaracji run(). Pozostaje tylko RuntimeException.
Sposoby jego obsługi  w Javie "przedkonkurencyjnej" były i uciążliwe i niezbyt niezawodne.

Na te wszystkie  problemy pakiet java.util.concurrent ma gotowe odpowiedzi.

Chcesz łatwo tworzyć pule wątków, i zarządzać nimi bez  trudnego programowania? Użyj odpowiednich Serwisów Wykonawców (ExecutorService)
Chcesz myśleć  w kategoriach zadań (Task), nie wątków? Daj Wykonawcom zadania do wykonania, oni zdecydują jak najlepiej podzielić je między wątki, ale ogólna strategia podziału i uruchamiania jest pod Twoją kontrolą (mamy wybór różnych strategii)
Chcesz mieć łatwo dostępne wyniki współbieżnych zadań? Użyj interfejsu Callable, zaufaj Wykonawcom i odbieraj wyniki w postaci FutureTask - obiektu pozwalającego na asynchroniczne testy (wyniki już są? jeszcze nie ma?), reagowanie na wyjątki, odczytywanie wyników zadań i podłączanie callbacków.

Pierwszym krokiem ku poprawie sytuacji jest odseparowanie zadań do wykonania od  mechanizmów tworzenia i uruchamiania wątków.
Pakiet java.util.concurrent definiuje interfejs Executor:

public interface Executor {
  void execute(Runnable);
}
Jego implementacje  (nie my) winny zajmowac się tworzeniem i uruchamianiem wątków.
Sposób, polityka tworzenia i uruchamiania wątków spoczywa na Wykonawcach (klasach implementujących interfejs Executor).

A zatem napiszemy raczej tak (choć jeszcze nie do końca poprawnie):

  Executor executor = ....;

  private void serviceConnections() {
    while (serverRunning) {
      try {
        final Socket conn = ss.accept();
        System.out.println("Connection established");

        Runnable serviceCode = new Runnable() {
           public void run() {
              serviceRequests(conn);
           }
        }

        executor.execute(serviceCode);

      } catch (Exception exc) {
          exc.printStackTrace();
      }
    }
    try { ss.close(); } catch (Exception exc) { //... }
  }
W szczególności konkretny Wykonawca może prowadzić pulę wątków, zapewniając naszemu serwerowi odpowiednią efektywność. 

Tworzenie wątków jest kosztowne czasowo. Pule wątków pozwalają na ponowne użycie wolnych wątków, a także na ew. limitowanie maksymalnej liczby wątków w puli.


My rozumujemy w kategoriach zadania do wykonania (określanegu tu przez kod Runnable), tworzeniem i uruchamianiem wątków zajmuja się Wykonawcy.

W Javie mamy do dyspozycji kilka rodzajów gotowych Wykonawców, fabrykowanych przez odpowiednie metody klasy Executors m.in.:
Możemy więc napisać coś takiego:
import java.util.concurrent.*;


class Task implements Runnable {

  private String name;

  public Task(String name) {
    this.name = name;
  }

  public void run() {
    for (int i=1; i <= 4; i++) {
      System.out.println(name + " " + i);
      Thread.yield();
    }
  }
}


public class Wykonawca {

  public static void main(String[] args) {
    Executor exec = Executors.newFixedThreadPool(2);
    for (int i=1; i<=4; i++) {
      exec.execute(new Task("Task " + i));
    }
  }
}
Program wypisze następujące wyniki:

Task 1 1
Task 2 1
Task 1 2
Task 2 2
Task 1 3
Task 2 3
Task 1 4
Task 2 4
Task 3 1
Task 4 1
Task 3 2
Task 4 2
Task 3 3
Task 4 3
Task 3 4
Task 4 4


Zwróćmy uwagę, że pula wątków jest ograniczona do dwóch. Zatem najpierw wspólbieżnie działają dwa pierwsze zadania, a po nich - trzecie i czwarte.

Gdy nasze zadania zakończą się, Wykonawca nadal "działa" i jest gotowy do przyjmowania nowych zadań.

Zamknięcie Wykonawcy oznacza, iż nie będzie on  już przyjmował nowych zadań do wykonania; jednak przekazane mu wcześniej i jeszcze nie zakończone - będzie wykonywał.


Usługę zamknięcia dostarcza interfejs ExecutorService, który jest rozszerzeniem interfejsu Executor.

Metody fabryczne klasy Executors zwracają Wykonawców implementujących ExecutorService


Zobaczmy.

  public static void main(String[] args) {
    ExecutorService exec = Executors.newFixedThreadPool(2);
    for (int i=1; i<=4; i++) {
      exec.execute(new Task("Task " + i));
    }
    Thread.yield();
    exec.shutdown();

    try {
      exec.execute(new Task("Task after shutdown"));
    } catch (RejectedExecutionException  exc) {
        exc.printStackTrace();
    }
    try {
      exec.awaitTermination(5, TimeUnit.SECONDS);
    } catch(InterruptedException exc) { exc.printStackTrace(); }
    System.out.println("Terminated: " + exec.isTerminated());

  }

.......
Task 3 4
Task 4 4
Terminated: true
java.util.concurrent.RejectedExecutionException
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:1477)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:384)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:856)
    at exec0.Wykonawca.main(Wykonawca.java:34)


Ponieważ metoda shutdown() zamyka ExecutorService, zadanie "Task after shutdown" nie zostanie uruchomione (powstanie wyjątek RejectedExecutionException; ten wyjątek może powstawać również wtedy, gdy ExecutorService z innych powodów niż zamknięcie odmawia wykonania zadania).
TimeUnit pozwala na lepszą i wygodniejszą granulację czasu - wszędzie tam gdzie wchodzą w grę opóźnienia i oczekiwania.. Są dostępne stałe, określające wybór jednostek:

Na końcu programu - za pomocą metody awaitTermination(...) wstrzymujemy bieżący wątek dopóki Wykonawca nie zakończy wszystkich zadań (albo dopóki nie minie 5 sekund lub też nie wystąpi przerwanie bieżącego wątku za pomocą metody interrupt). Warto stosować metodę awaitTermination(),  kiedy chcemy mieć pewność, że Wykonawaca naprawdę zakończył działanie i wyczyścił wszystkie swoje zajęte zasoby (np. bez tego nasz głowny wątek może sie skończyć wcześniej niż Wykonawcy i aplikacja nie zakończy działania).

ExecutorService dostarcza także metody shutdownNow(), która ma za zadanie zakończyć działanie wszystkich aktualnie wykonujących się zadań (wątków) i zamknąć Wykonawcę.


Spróbujmy jej użyć w naszym kodzie.
Niech pętla w run() wykonuje się w nieskończoność np. for (byte i=1; i<=128; i++)
W metodzie main - w miejsce shutdown() wstawmy kod:
    try {
      Thread.sleep(1000);
    } catch(Exception exc) {}

    exec.shutdownNow();
Co się stanie? Nic. Dwa zadania (wątki) będą działać w nieskończoność (i nie zostaną zatrzymane).
Dlaczego?

4. Kończenie zadań przez interrupt()

W poprzednim przykładzie zadania dzialały w nieskończoność gdyż metoda shutDownNow() kończy działające zadania poprzez użycie metody interrupt() wobec odpowiednich wątków.

Metoda interrupt() ustala jedynie status wątku jako przerwany, a zakończenie pracy wątku odbywa się zawsze przez zakończenie jego kodu.


Trzeba zatem zmodyfikować kod metody run() np tak.
  public void run() {
    for (byte i=1; i <= 128 ; i++) {
      if (Thread.currentThread().isInterrupted()) return;
      System.out.println(name + " " + i);
      Thread.yield();
    }
  }
Teraz wątki zostaną zatrzymane i zadania zakończone.

Pamiętać należy, że w sytuacjach gdy wątek jest uśpiony lub zablokowany z możliwością przerwania blokady (sleep,  wait i jego odpowiedniki w java.util.concurrent, przerywalne synchronizatory (interruptible locks), przerywalne operacje we-wy (interruptible channels)) kończenie zadań - czyli wywołanie interrupt() - powoduje zgłoszenie wyjątku InterruptedException i w obsłudze tego wyjątku należy zakończyć wykonanie kodu wątku.


Możliwość wykonywania kodów jako zadań obsługiwanych przez Wykonawców powoduje, że poczynając od Javy 1.5 kończenie pracy wątków musimy zapewniać poprzez sprawdzanie stanu INTERRUPTED oraz obsługę wyjątku InterruptedException


Przykład.

class Interruptible  {

  Lock lock = new ReentrantLock();

  Runnable task1 = new Runnable() {
     public void run() {
       System.out.println("Task 1 begins");
       try {
         lock.tryLock(1000, TimeUnit.SECONDS);  // próba zamknięcia rygla (czeka na wolny rygiel lub 1000 sekund)
         System.out.println("Task 1 entered");
       } catch(InterruptedException exc) {
           System.out.println("Task 1 interrupted");
       }
       System.out.println("Task 1 stopped");
     }
  };

  Runnable task2 = new Runnable() {
    public void run() {
      System.out.println("Task 2 begins");
      for (int i=1; i <= 600; i++) {
        if (Thread.currentThread().isInterrupted()) break;
        // jakieś obliczenie
        if (Thread.currentThread().isInterrupted()) break;  // chcemy przerwać możliwie najszybciej
        try {                                               // sleep() jest przerywane pzrez interrupt()!
          Thread.sleep(1000);
        } catch (InterruptedException exc) { break; }
      }
      System.out.println("Task 2 stopped");
    }
  };


  Runnable task3 = new Runnable() {
    Scanner scan = new Scanner(  // musimy miec InterruptibleChannel, aby móc przerwać czekanie na wejściu
                    new FileInputStream(FileDescriptor.in).getChannel(), "Cp852");
    public void run() {
      System.out.println("Task 3 begins");
      System.out.print(">>");
      while (scan.hasNextLine()) {
        try {
          String s = scan.nextLine();
          System.out.print('\n'+s + "\n>>");
        } catch (Exception exc) {
            // Uwaga: scanner nie zgłasza wyjątków, ale przerywa dzialanie
            exc.printStackTrace();
            break;
        }
      }
      System.out.println("Task 3 stopped - " + scan.ioException());  // jaki wyjątek go przerwał?
    }
  };

  Interruptible() {
    ExecutorService exec = Executors.newCachedThreadPool();

    exec.execute(new Runnable() {        // wątek zamyka rygiel
                    public void run() {
                      lock.lock();
                    }
                 }
     );
    exec.execute(task1);
    exec.execute(task2);
    exec.execute(task3);
    JOptionPane.showMessageDialog(null, "Press Ok to stop all tasks");
    exec.shutdownNow();
  }

}
Gdy uruchomimy program (z metody main(), może w innej klasie) przez:

    new Interruptible();

otrzymamy następujący wynik:


Task 1 begins
Task 2 begins
Task 3 begins
>>ala ma kota
^Z

ala ma kota
>>
............................ Teraz wciskamy Ok w dialogu
Task 1 interrupted
Task 1 stopped
Task 2 stopped
Task 3 stopped - java.nio.channels.ClosedByInterruptException



5. Odbiór wyników zadań. Zadania "z prawdziwego zdarzenia".


Wróćmy teraz do innego, poruszonego wcześniej, zagadnienia:
w jaki sposób zapewnić, aby zadania mogły zwracać wyniki, ew. sygnalizować (różne) wyjątki i jak odbierać te wyniki od nich?

Odpowiedzą na pierwszą część pytania jest interfejs Callable.
Ma on następującą postać.

public interface Callable<V> {
  V call() throws Exception;
}

W stosunku do interfejsu Runnable są tu dwa - ważne - udogodnienia.
Zarówno Callable, jak i Runnable określają kod do  (ewentualnie współbieżnego) wykonania. To są jak gdyby dwie różne wersje definiowania zadania do wykonania. 

Można (wstępnie) powiedzieć tak - mamy dwa rodzaje zadań.
Takie które zwracają wyniki i takie które nie zwracają wyników.
To że kod zadania zwraca wynik jest realizowane przez implementację Callable. Ale samo Callable (które zwraca wynik) nie wystarczy, aby można było ten wynik łatwo uzyskać. Zadanie jest uruchamiane przez Wykonawcę i nie zawsze wiemy kiedy zostanie uruchomione. Wykonuje się asynchronicznie, zatem nie wiemy kiedy wynik będzie gotowy.

Dlatego wprowadzono interfejs Future<V>, który - jak napisano w dokumentacji - reprezentuje wynik asynchronicznych obliczeń i zawiera następujące metody.

boolean cancel(boolean mayInterruptIfRunning)
          Próbuje anulować wykonanie zadania (argument mówi o tym, czy można przerwać wykonujące się zadanie). Nie wykonujące się (jeszcze) zadania są usuwane z listy zadań Wykonawcy.
 V get()
          Pobiera wynik zadania, jeśli zadanie się nie zakończyło - czeka (blokuje). Oczekiwanie może być przerwane przez CancellationException (zadanie anulowane), ExecutionException (kod zadania zgłosił wyjątek), InterruptedException (metoda interrupt() wobec wątku, w którym wykonywane jest zadanie).
Wyjątki Execution i Interrupoted muszą być obsługiwane.
 V get(long timeout, TimeUnit unit)
          Pobiera wynik zadania, jeśli zadanie się nie zakończyło - czeka (blokuje), ale nie dłużej niż podany czas. Wyjątki jw. + TimeoutException
 boolean isCancelled()
          Czy anulowane?
 boolean isDone()
          Czy zakończone (w dowolny sposób, również przez anulowanie)?

Tak naprawdę Future opisuje nie tylko metody dotyczące wyników zadania, ale również wykonania zadania. Metoda cancel() - dotyczy przecież wykonania zadaniu.

Zadania, które zwracają wyniki muszą więc mieć nie tylko kod  podany jako Callable, ale również muszą być Future.


Ale to znowu nie wystarczy!
Zadania są uruchamiane przez Wykonawców w zarządzanych przez nich wątkach.
Zatem zawsze zadania muszą być Runnable.
Zwróćmy uwagę: w metodzie execute() Wykonawcy można podać tylko Runnable. Dlaczego zatem Callable też możemy traktować jako kod zadania do wykonania? A skoro możemy, to w jaki sposób ten kod  jest uruchamiany przez metodę execute()?

Wróćmy na chwilę do naszych poprzednich przykładów (gdy "zadaniami" były zwykłe Runnable).
Zauważmy, to nie były "pełnokrwiste" zadania.

"Prawdziwe" zadanie musi spełniać następujące warunki:
Połączenie czystego Runnable z execute(...) Wykonawców spełnia tylko  dwa pierwsze warunki.
Odpytywanie o wyniki zadania (a także możliwość anulowania-przerywania zadań) wymaga, by zadanie było Future.
Po to, by móc obsługiwać wyjątki wykonania (nawet dla  kodów, które nie zwracają wyników) - kod zadania musi być Callable.

Zadanie jest obiektem implementującym interfejsy Runnable i Future, a jego kod musi być zawarty w metodzie call() klasy implementującej interfejs Callable


Co nie oznacza, że ze zwykłego Runnable nie możemy stworzyć zadania!

W Javie implementacja pojęcia zadania zrealizowana jest jako klasa FutureTask.

/*
* @since 1.5
* @author Doug Lea
*/
public class FutureTask<V> implements Future<V>, Runnable {
    // .....
    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        sync = new Sync(callable);
    }

    public FutureTask(Runnable runnable, V result) {
        sync = new Sync(Executors.callable(runnable, result));
    }
    // .....
    private final class Sync extends AbstractQueuedSynchronizer {
        // .....
        private final Callable<V> callable;
        // .....
        Sync(Callable<V> callable) {
            this.callable = callable;
        }
        // .....
    }
}
Fragment na podstawie żródeł klasy FutureTask (SunMicrosystem 2005).

A zatem tworząc zadanie możemy podać w konstruktorze Callable.  Jak widać, to Callable zostanie użyte przez kompozycję w wewnętrznej klasie Sync.
Możemy też podać zwykłe Runnable (z dodatkowym argumentem ustalającym jego wynik). Jak widać, w klasie Executors mamy statyczne metody "zamieniające" Runnable na Callable (w FutureTask brakuje konstruktora w postaci:  FutureTask(Runnable), który mógłby skorzystać ze statycznej metody klasy Executors  Callable<Object> callable(Runnable), która po prosto przyjmuje, że wynikiem call z wynikowego Callable jest null).

No i teraz możemy zrobić tak:
Runnable r = new Runnable() {
  public void run() {
    // ...
  }
};

Callable<String> c = new Callable<String>() {
   public String call() throws Exception {
     String result;
     // ...
     return result;
   }
};


FutureTask<Boolean> task1 = new FutureTask<Boolean>(r, true);
FutureTask<Object> task2 = new FutureTask<?>(Executors.callable(r));
FutureTask<String> task3 = new FutureTask<String>(c);

Executor exec = Executors.new....();  

exec.execute(task1);
exec.execute(task1);
exec.execute(task2);

Co więcej, wcale nie trzeba tworzyć samodzielnie obiektów FutureTask.
Interfejs ExecutorService zawiera trzy metody submit(...), które tworzą - i zwracają - dla nas Future.

<T> Future<T>
submit(Callable<T> task)
          Submits a value-returning task for execution and returns a Future representing the pending results of the task.
 Future<?> submit(Runnable task)
          Submits a Runnable task for execution and returns a Future representing that task.
<T> Future<T>
submit(Runnable task, T result)
          Submits a Runnable task for execution and returns a Future representing that task that will upon completion return the given result

Czyli w poprzednim przykładzie moglibyśmy napisać:

ExecutorService exec = Executors.new....();  

Future<Boolean> future1 = exec.submit(task1, true);
Future<Object> future2 = exec.submit(task1);
Future<String> future3 = exec.submit(task2);
Konkretna implementacja submit w klasie AbstractExecutorService (którą dziedziczy ThreadPoolExecutor zwracany przez Executors.new....() wygląda mniej więcej tak:

/*
 * @author Doug Lea
 */
public abstract class AbstractExecutorService implements ExecutorService {

    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        FutureTask<Object> ftask = new FutureTask<Object>(task, null);
        execute(ftask);
        return ftask;
    }

    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        FutureTask<T> ftask = new FutureTask<T>(task, result);
        execute(ftask);
        return ftask;
    }

    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        FutureTask<T> ftask = new FutureTask<T>(task);
        execute(ftask);
        return ftask;
    }

// ...
}
Fragment na podstawie żródeł klasy FutureTask (SunMicrosystem 2005).

Rozważmy przykład:

import javax.swing.*;
import java.awt.event.*;
import java.util.List;
import java.util.concurrent.*;
import java.lang.reflect.*;

public class Exec1 extends JFrame implements ActionListener {

  int k = 0;
  int n = 15;
  JTextArea ta = new JTextArea(40,20);

  Exec1() {
    add(new JScrollPane(ta));
    JPanel p = new JPanel();
    JButton b = new JButton("Start");
    b.addActionListener(this);
    p.add(b);
    b = new JButton("Stop current");
    b.setActionCommand("Stop");
    b.addActionListener(this);
    p.add(b);
    b = new JButton("Curent result");
    b.setActionCommand("Result");
    b.addActionListener(this);
    p.add(b);
    b = new JButton("Shutdown");
    b.addActionListener(this);
    p.add(b);
    b = new JButton("ShutdownNow");
    b.addActionListener(this);
    p.add(b);
    add(p, "South");
    setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
    pack();
    setVisible(true);
  }

  public void actionPerformed(ActionEvent e)  {
    String cmd = e.getActionCommand();
    try {
      Method m = this.getClass().getDeclaredMethod("task"+cmd);
      m.invoke(this);
    } catch(Exception exc) { exc.printStackTrace(); }
  }


  class SumTask implements Callable<Integer> {

    private int taskNum,
                limit;

    public SumTask(int taskNum, int limit) {
      this.taskNum = taskNum;
      this.limit = limit;
    }

    public Integer call() throws Exception {
      int sum = 0;
      for (int i = 1; i <= limit; i++) {
        if (Thread.currentThread().isInterrupted()) return null;
        sum+=i;
        ta.append("Task " + taskNum + " part result = " + sum + '\n');
        Thread.sleep(1000);
      }
      return sum;
    }
  };

  Future<Integer> task;

  //ExecutorService exec = Executors.newSingleThreadExecutor();
  ExecutorService exec = Executors.newFixedThreadPool(3);

  public void taskStart() {
    try {
      task = exec.submit(new SumTask(++k, 15));
    } catch(RejectedExecutionException exc) {
        ta.append("Execution rejected\n");
        return;
    }
    ta.append("Task " + k + " submitted\n");
  }

  public void taskResult() {
    String msg = "";
    if (task.isCancelled()) msg = "Task cancelled.";
    else if (task.isDone()) {
      try {
        msg = "Ready. Result = " + task.get();
      } catch(Exception exc) {
          msg = exc.getMessage();
      }
    }
    else msg = "Task is running or waiting for execution";
    JOptionPane.showMessageDialog(null, msg);
  }

  public void taskStop() {
    task.cancel(true);
  }

  public void taskShutdown() {
    exec.shutdown();
    ta.append("Executor shutdown\n");
  }

  public void taskShutdownNow() {
    List<Runnable> awaiting = exec.shutdownNow();
    ta.append("Eeecutor shutdown now - awaiting tasks:\n");
    for (Runnable r : awaiting) {
      ta.append(r.getClass().getName()+'\n');
    }

 }


  public static void main(String[] args) {
     new Exec1();
  }

}

Uwagi:
  1. warto zaobserwować, że w call() nie musimy obsługiwać wyjątku InterruptedException przy sleep (throws Exception)
  2. jednak pisząc kody zadań nie możemy pozbyć się  myślenia w kategoriach wątków (isInterrupted(), sleep)
  3. shutdownNow zwraca listę zadań oczekujących na wykonanie w momencie zamknięcia serwisu - to są ogólnie Runnable, ale ponieważ zostały przekazane do wykonania jako FutureTasks - prawdziwą klasą jest FutureTask.


6. Obsługa wyników zadań za pomocą odwołań zwrotnych

Bezpośrednia konstrukcja (za pomocą konstruktorów klasy) zadań jako FutureTask ma pewną zaletę wobec submit(Callable) czy submit(Runnable):  w łatwy sposób możemy dostarczać odwołań  zwrotnych (callback) reagujących na zakończenie zadania.

Klasa FutureTask dostarcza bowiem chronionej metody done(), która jest wywoływana po zakończeniu zadania.

Można zrobić nawet tak:

Callable<Integer> callable = ...;
ExecutorService exec = Executors.new...();
exec.execute( new FutureTask<Integer>(callable) {
    public void done() {
      // kod będzie wywołany, gdy zadanie się zakończy lub zostanie anulowane
    }
});
Przykład:

import javax.swing.*;
import java.awt.*;
import java.awt.event.*;
import java.util.concurrent.*;


class  FutureTaskCallback<V> extends  FutureTask<V> {

  public FutureTaskCallback(Callable<V> callable) {
    super(callable);
  }

  public void done() {
    String result = "Wynik: ";
    if (isCancelled()) result += "Cancelled.";
    else try {
      result += get();
    } catch(Exception exc) {
        result += exc.toString();
    }
    JOptionPane.showMessageDialog(null, result);
  }

}


public class SimpleCallback extends JFrame {

  JTextField input = new JTextField(40),
             progress = new JTextField(40);

  String toReverse;

  Callable<String> reverseTask = new Callable<String>() {
    public String call() throws Exception {
      Thread t = Thread.currentThread();
      if (toReverse == null || toReverse.trim().equals(""))
        throw new IllegalArgumentException("Set string to reverse");
      if (t.isInterrupted()) return null;
      char[] org = toReverse.toCharArray();
      StringBuffer out = new StringBuffer();
      if (t.isInterrupted()) return null;
      for (int i = org.length-1; i>=0; i--) {
        Thread.sleep(500);
        out.append(org[i]);
        if (t.isInterrupted()) return null;
        progress.setText(out.toString());
        if (t.isInterrupted()) return null;
      }
      return out.toString();
    }
  };

  ExecutorService exec = Executors.newSingleThreadExecutor();
  FutureTaskCallback<String> ft;


  public SimpleCallback() {
    Font f = new Font("Dialog", Font.PLAIN, 16);
    input.setFont(f);
    progress.setFont(f);
    JPanel p = new JPanel();
    JButton b = new JButton("Start");
    b.addActionListener(new ActionListener() {
      public void actionPerformed(ActionEvent e) {
        ft = new FutureTaskCallback<String>(reverseTask);
        toReverse = input.getText();
        exec.execute(ft);
      }
    });
    p.add(b);
    b = new JButton("Stop");
    b.addActionListener(new ActionListener() {
      public void actionPerformed(ActionEvent e) {
        if (ft != null) ft.cancel(true);
      }
    });
    p.add(b);
    Container cp = getContentPane();
    cp.setLayout(new BoxLayout(cp, BoxLayout.Y_AXIS));
    add(input);
    add(p);
    add(progress);
    setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
    pack();
    setVisible(true);
  }





  public static void main(String[] args) {
     new SimpleCallback();
  }

}

Zobacz demo programu.

Rozważmy nieco bardziej uniwersalny i zaawansowany przykład dostarczenia asynchronicznej obsługi wyników zadań. Umówimy się, że metoda obsługi (dowolna metoda dowolnej klasy) otrzymuje jako argumenty - wynik zadania oraz ew. wyjątek. Jeśli wyjątek nie powstał (nie było ani CancellationException, ani ExecutionException) - to ten drugi argument ma wartość null. Jeśli zaś powstał - to piewrszy argument ma wartość null.

Klasa Ftask dziedziczy FutureTask i przedefiniowuje metodę done w której wywołuje metodę obsługi.

import java.lang.reflect.Method;
import java.util.concurrent.*;

public class Ftask<V> extends FutureTask<V> {

  private Method handlerMethod;
  private Object handlerObject;

  public Ftask(Callable<V> callable, Object handler, String mname) throws Exception {
    super(callable);
    handlerObject = handler;
    handlerMethod = handler.getClass().getDeclaredMethod(mname, Object.class, Exception.class);
  }

  public void done() {
    V result = null;
    try {
      result = (V) this.get();
    } catch(Exception exc) {
      try {
        handlerMethod.invoke(handlerObject, null, exc);
      } catch(Exception ex) {
          ex.printStackTrace();
      }
      return;
    }
    try {
      handlerMethod.invoke(handlerObject, result, null);
    } catch(Exception exc) {
      exc.printStackTrace();
  }
}

Wprowadzimy też klasę abstrakcyjnego zadania, która - dla konkretnych instancjacji poprzez dziedziczenie - wymaga tylko implementacji metody call():

public abstract class AbstractTask<V> implements Callable<V> {

  private String   name;
  private FutureTask<V> task;

  public AbstractTask(String name, Object resultHandler, String handlerMethodName) throws Exception {
    this.name = name;
    task = new Ftask<V>(this, resultHandler, handlerMethodName);
  }

  public FutureTask<V> getTask() { return task; }
  public String getName() { return name; }

}

Teraz możemy ich użyć np. tak:
class ResultHandler<V> {

  int x = 400, y = 50;
  public void handleResult(V result, Exception exc) {
    String msg;
    if (exc != null) msg = exc.toString();
    else msg = "Wynik = " + result;
    JFrame f = new JFrame("Task results");
    JLabel lab = new JLabel("        " + msg);
    f.add(lab);
    f.setBounds(x, y, 300, 100);
    f.setDefaultCloseOperation(JFrame.DISPOSE_ON_CLOSE);
    x+=50;
    y+=50;
    f.setVisible(true);

  }
}
  class SumTaskA extends AbstractTask<Integer> {

    public SumTaskA(String taskName, ResultHandler h, String handlerMethod )
                 throws Exception
    {
      super(taskName, h, handlerMethod);
    }

    public Integer call() throws Exception {
      Future<Integer> task = this.getTask();
      int sum = 0;
      if (task.isCancelled()) return null;
      for (int i = 1; i <= 10; i++) {
        if (task.isCancelled()) break;
        sum+=i;
        append(getName() + " part result = " + sum + '\n');
        Thread.sleep(1000);
      }
      return sum;
    }
  };


  ExecutorService exec = Executors.newFixedThreadPool(3);
  ResultHandler<Integer> handler = new ResultHandler<Integer>();
  SumTaskA task = new SumTaskA("Task 1", handler, "handleResults")
  exec.execute(task.getTask());

Zwrócmy uwagę, że wprowadzając AbstractTask (jako implementującą Callable i zawierającą Future przez kompozycję) uzyskaliśmy możliwość dostarczania takich implementacji metody call(), w której rozumowanie w kategoriach wątków ograniczone jest do minimum, np. zamiast pobierać bieżący wątek i sprawdzać czy jest Interrupted - możemy zastosować metodę isCancelled().


Uwaga: FutureTask może znajdować się w czterech stanach: inicjalnym (0), RUNNING (1) , RAN (2), CANCELLED(4). Po zakończeniu znajduje się w stanie RAN lub CANCELLED i nie może być ponownie uruchomione przez tego samego Wykonawcę.


Przykład:
import java.util.List;
import java.util.concurrent.*;

public class RepeatedTaskTest {

  FutureTask<Object> task = new FutureTask<Object>( new Callable<Object>() {
                                      public Object call() {
                                        for (int i= 1; i <= 3; i++) {
                                          System.out.println(i);

                                        }
                                        return null;
                                      }
                                   }) {
           public void done() {
             System.out.println("Done.");
           }
  };

  public RepeatedTaskTest() {
    ExecutorService exec = Executors.newSingleThreadExecutor();
    System.out.println("Starting first task");
    exec.execute(task);
    try {
      System.out.println("Starting second task");
      exec.execute(task);
      Thread.sleep(5000);
    } catch(Exception exc) {
      exc.printStackTrace();
    }
    System.out.println("Executor shutdown now");
    List<Runnable> awaiting = exec.shutdownNow();
    for (Runnable r : awaiting) {
      System.out.println(r.getClass().getName()+'\n');
    }
    try {
      exec.awaitTermination(5, TimeUnit.SECONDS);
    } catch(InterruptedException exc) { exc.printStackTrace(); }

  }


  public static void main(String[] args) {
    new RepeatedTaskTest();
  }  
Program wyprowadzi:

Starting first task
Starting second task
1
2
3
Done.
Executor shutdown now


Drugie zadanie w ogóle się nie wykonało i nie powstał żaden wyjątek!

Logika jest taka - jest tylko jedno zadanie. Ono się wykonało. Wykonawca nie wykonuje już wykonanych zadań. Wynik jest gotowy - cały czas możemy po niego sięgać. Nie ma żadnego wyjątku, bo wszystko jest w porządku - zadanie zostało wykonane.

7. Wykonawcy a pule wątków


Przyjrzyjmy się teraz bliżej Wykonawcom. Ich klasy dostarczają dość rozbudowanych możliwości.
Interfejs ExecutorService zawiera kilku dodatkowych - nie znanych nam jeszcze - metod,  pod  hasłem "przekazywania do wykonania kolekcji zadań i uzyskiwania dostępu do ich wyników":

<T> List<Future<T>>
invokeAll(Collection<Callable<T>> tasks)
          Executes the given tasks, returning a list of Futures holding their status and results when all complete.
<T> List<Future<T>>
invokeAll(Collection<Callable<T>> tasks, long timeout, TimeUnit unit)
          Executes the given tasks, returning a list of Futures holding their status and results when all complete or the timeout expires, whichever happens first.
<T> T
invokeAny(Collection<Callable<T>> tasks)
          Executes the given tasks, returning the result of one that has completed successfully (i.e., without throwing an exception), if any do. Pozostałe zadania są anulowane.
<T> T
invokeAny(Collection<Callable<T>> tasks, long timeout, TimeUnit unit)
          Executes the given tasks, returning the result of one that has completed successfully (i.e., without throwing an exception), if any do before the given timeout elapses. Pozostałę zadania są anulowane.

Zwrócmy uwagę, że te odwołania są blokujące - czekają na zakończenie zadań (normalne - lub przez wyjątek).

Przykład:

import java.util.concurrent.*;
import java.util.*;

class Eval implements Callable<Integer> {

  Integer num;

  public Eval(int n) {
    num = n;
  }

  public Integer call() throws Exception {
    Thread.sleep(1000);
    return num;
  }

}

public class InvokeTest {

  public static int sum(ExecutorService exec, List<Callable<Integer>> tasks) throws Exception {
    long start = System.currentTimeMillis();
    System.out.println("Start");
    List<Future<Integer>> results =  exec.invokeAll(tasks);  // InterruptedException
    long elapsed = System.currentTimeMillis() - start;
    System.out.println("End after " + elapsed/1000 + " sec.");
    int sum = 0;
    for (Future<Integer> r : results) sum += r.get();
    return sum;
  }

  public static void main(String[] args) {
    List<Callable<Integer>> taskList = new ArrayList<Callable<Integer>>();
    ExecutorService exec = Executors.newFixedThreadPool(10);
    for (int i=1; i <=5; i++) {
      Callable<Integer> task = new Eval(i);
      taskList.add(task);
    }
    try {
      int result = sum(exec, taskList);
      System.out.println("Wynik: " + result);
    } catch(Exception exc) { exc.printStackTrace(); }
  }

}
 
Ale czym naprawdę są Wykonawcy?
Metody fabryczne klasy Executors tworzą dla nas wykonawcaów jako obiekty klasy ThreadPoolExecutor.
Klasa zajmuje się dynamicznym tworzeniem wątków (które będą przydzielane do wykonania zadań) i prowadzeniem ich puli - zsynchronizowanej, blokującej kolejki wątków.

Pula wątków charakteryzuje się następującymi parametrami:

które można ustalać w konstruktorze, pobierać za pomocą metod get.. i (niektóre) zmieniać w trakcie działania programu za pomocą metod set...

Znaczenie tych parametrów jest następujące.

Niech n oznacza aktualną liczbę wątków w puli (niektóre mogą być nieaktywne) i niech zlecane jest nowe zadanie do wykonania (exec()).
Co zwracają fabryczne metody klasy Executors?

Widać to wyraźnie na wyciągu z kodu źródłowego:
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    }

    public static ExecutorService newSingleThreadExecutor() {
        return new DelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

    public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
        return new DelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(),
                                    threadFactory));
    }

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
Źródło: kod JDK, Sun Microsystem

Przykład:

import java.awt.*;
import java.awt.event.*;
import java.util.concurrent.*;
import javax.swing.*;
import java.lang.reflect.*;


class Code implements Callable<Integer> {

  String name;
  int num;

  public Code(String n, int sleep) {
     name = n;
     num = sleep;
  }

  public Integer call() throws Exception {
    Thread.sleep(num*1000);
    return num;
  }

  public String getName() { return name; }
}


class Task extends FutureTask<Integer> {
  Code code;

  public Task(Code c) {
    super(c);
    code = c;
  }

  public String getName() {
    return code.getName();
  }
}



public class ThreadPoolMethods extends JFrame implements ActionListener {

  ThreadPoolExecutor exec = (ThreadPoolExecutor) Executors.newFixedThreadPool(2);

  public ThreadPoolMethods() {
    setLayout(new FlowLayout());
    JButton b = new JButton("Start");
    b.addActionListener(this);
    add(b);
    b = new JButton("Show");
    b.addActionListener(this);
    add(b);
    pack();
    setVisible(true);
  }

  public void actionPerformed(ActionEvent e) {
    String cmd = e.getActionCommand();
    if (cmd.equals("Start")) {
      int sleep = 5, step = 3;
      for (int k=1; k <= 10; k++) {
        sleep += step;
        exec.execute(new Task(new Code("Task " + k, sleep)));
      }
      System.out.println("All tasks submitted");
    }
    else {
      try {
        printStat();
      } catch(Exception exc) { exc.printStackTrace(); }
      BlockingQueue<Runnable> que = exec.getQueue();
      System.out.println("Que size " + que.size());
      for (Runnable r : que) {
        Task t = (Task) r;
        System.out.println(t.getName());
      }
    }
  }

  void printStat() throws Exception {
    Method[] mets = ThreadPoolExecutor.class.getDeclaredMethods();
    for (Method m : mets) {
      String name = m.getName();
      if (name.startsWith("get")) {
        Object res = null;
        try {
          res = m.invoke(exec);
        } catch(Exception exc) {
          System.out.println("Unable to call " + name);
        }
        System.out.println(name + " = " + res);
      }
    }
  }

  public static void main(String[] args) {
    new ThreadPoolMethods();
  }
}

Wydruk:

All tasks submitted
getQueue = [exec3.Task@ee22f7, exec3.Task@39ab89, exec3.Task@2cb49d, exec3.Task@105d88a, exec3.Task@cb6009, exec3.Task@e28b9, exec3.Task@193a66f, exec3.Task@93d6bc]
getActiveCount = 2
getCompletedTaskCount = 0
getCorePoolSize = 2
Unable to call getKeepAliveTime
getKeepAliveTime = null
getLargestPoolSize = 2
getMaximumPoolSize = 2
getPoolSize = 2
getRejectedExecutionHandler = java.util.concurrent.ThreadPoolExecutor$AbortPolicy@be0e27
Unable to call getTask
getTask = null
getTaskCount = 10
getThreadFactory = java.util.concurrent.Executors$DefaultThreadFactory@193385d
Que size 8
Task 3
Task 4
Task 5
Task 6
Task 7
Task 8
Task 9
Task 10

 i po kilku sekundach

getQueue = [exec3.Task@2cb49d, exec3.Task@105d88a, exec3.Task@cb6009, exec3.Task@e28b9, exec3.Task@193a66f, exec3.Task@93d6bc]
getActiveCount = 2
getCompletedTaskCount = 2
getCorePoolSize = 2
Unable to call getKeepAliveTime
getKeepAliveTime = null
getLargestPoolSize = 2
getMaximumPoolSize = 2
getPoolSize = 2
getRejectedExecutionHandler = java.util.concurrent.ThreadPoolExecutor$AbortPolicy@be0e27
Unable to call getTask
getTask = null
getTaskCount = 10
getThreadFactory = java.util.concurrent.Executors$DefaultThreadFactory@193385d
Que size 6
Task 5
Task 6
Task 7
Task 8
Task 9
Task 10



W pakiecie java.util.concurrent dostępne są też klasy wykonawców, zapewniających wykonanie zadań w sposób rytmiczny  bądź  z podanym opóźnieniem.
Proszę zobaczyć dokumentację.


8. Podsumowanie

W podsumowaniu - rysunek, pokazujący powiązania pomiędzy klasami i interfejsami.

r


I niektóre wnioski:


9. Zadania

Zadanie 1.
Napisać program, w którym uruchamiane zadania pokazywane są na liście. Zadania z listy możemy odpytywac o ich stan, anulować, pokazywac ich wyniki, gdy są gotowe itp.

Zadanie 2.
Stworzyć mechanizm obsługi wyników zadań, działający na zasadzie słuchaczy. Przetestować na przykładach.

Zadanie 3.
Przetestowac działanie metod scheduledExecutor....

Zadanie 4.
Napisać w sposób właściwy serwer wielowątkowy TCP/IP. Obsługę zleceń zrealizowac w postaci FutureTask. Zadbać o możliwość przerywania obsługi w każdym momencie.

Zadanie 5.
Stworzyć klasę ponawialnych zadań (takich, które mogą być ponownie wykonane przez Wykonawców).
To jest łatwe, gdy zadanie jest opakowane przez FutureTask i jest Runnable, a co jeśli jest ono Callable i chcemy odczytywac jego wyniki?