Na wstępie można było by sobie zadać pytanie, po co wymyślono egzekutory. Przecież możemy odpalić wątki, pozarządzać, poczekać na wykonanie pracy, itd. Ale nie ma co „wymyślać koła na nowo”, a najlepszy kod to ten którego nie trzeba pisać. Należy również wspomnieć o tym, że tworzenie wątków w ramach puli jest mniej, zasobożerne niż tworzenie kilku lub kilkunastu (a niekiedy nawet kilkaset), pojedynczych instancji wątków i pilnowanie ich działania.
Egzekutory pozwalają nam na logiczne odseparowanie zadań od wątków, nie musimy wtedy zarządzać wątkami, uruchamiać nowe. Tym zajmują się pule wątków, które wraz z egzekutorami pozwalają na różne strategie uruchamiania i zarządzania wątkami. Myślimy wtedy bardziej nie o wątkach jako osobnych zadaniach, nie zajmując się ich zarządzaniem.
Podstawową implementacją puli wątków jest klasa ThreadPoolExecutor, która pozwala na określenie:
- jak mają być synchronizowane i wykonywane zadania,
- ile ma być odpalonych wątków,
- ile ma czekać.
- co należy zrobić gdy kolejka puli wątków się zapełni, możemy wtedy:
- odrzucić nowe rzeczy do wykonania ThreadPoolExecutor.AbortPolicy
- zignorować nowe zadania – ThreadPoolExecutor.DiscardPolicy
- zakończyć najstarsze zadania – hreadPoolExecutor.DiscardOldestPolicy
- wywołać wątek wywołujący aby on zdecydował co zrobić – ThreadPoolExecutor.CallerRunsPolicy
Do tego mamy również klasę Executors, która zawiera statyczne metody zwracające ExecutorService z gotowymi strategiami uruchamiania wątków, i tam mamy:
- newSingleThreadExecutor – zwraca egzekutora, który uruchamia jeden wątek
- newFixedThreadPool – pula wątków która ma określony maksymalny stały rozmiar. Nowe wątki są tworzone aż do uzyskania maksymalnego rozmiaru. Po jego osągnięciu nowe są tworzone tylko wtedy, gdy jakiś wątek zakończy swoją pracę.
- newCachedThreadPool – pula wątków która dynamicznie dostosowuje swój rozmiar do obciążenia. Wykonawca będzie usuwał wątki kiedy zmaleje obciążenie, a dodawał nowe gdy wzrośnie
- newSingleThreadScheduledExecutor – zwraca egzekutora, który odpala jeden wątek uruchamiany z opóźnieniem lub uruchamiany cykliczne, co jakiś czas
- newScheduledThreadPool – zwraca egzekutora, który ma ustaloną liczbę wątków, uruchamianych z opóźnieniem lub co jakiś czas
Warto trochę powiedzieć na temat interfejsu Future
Przykład wykorzystania puli wątków:
import java.util.concurrent.*; class SuperWorker implements Runnable{ private int counter = 0; private int countTo = 0; private String name; public SuperWorker(int countTo, String name){ this.countTo = countTo; this.name = name; } @Override public void run(){ System.out.println("Start wątku: " + name); System.out.flush(); try{ while (counter < countTo){ counter++; System.out.println("Wątek " + name + " liczy " + counter); System.out.flush(); Thread.sleep(100); } } catch (InterruptedException e){ e.printStackTrace(); } System.out.println("Koniec wątku: " + name); System.out.flush(); } } public class Thread5ThreadPool{ public static void main(String[] args){ //------------------------------------------------- // Przykład uruchamiania pojedyńczego wątku //------------------------------------------------- System.out.println("--- Start pojedynczego egzekutora -------------"); System.out.flush(); final ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor(); newSingleThreadExecutor.execute(new SuperWorker(10, "SingleThread")); // // Zakończenie pracy wątku // newSingleThreadExecutor.shutdown(); try{ newSingleThreadExecutor.awaitTermination(10, TimeUnit.SECONDS); } catch (InterruptedException e){ e.printStackTrace(); } newSingleThreadExecutor.shutdown(); System.out.println("--- Koniec pojedynczego egzekutora -------------"); System.out.flush(); //------------------------------------------------- // Przykład uruchamiania pojedyńczego // opóżnionego wątku //------------------------------------------------- System.out.println("--- Start pojedynczego opóżnionego egzekutora -------------"); System.out.flush(); final ScheduledExecutorService singleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor(); singleThreadScheduledExecutor.schedule(new SuperWorker(10, "SingleScheduledThread"), 1, TimeUnit.SECONDS); singleThreadScheduledExecutor.shutdown(); try{ singleThreadScheduledExecutor.awaitTermination(10, TimeUnit.SECONDS); } catch (InterruptedException e){ e.printStackTrace(); } singleThreadScheduledExecutor.shutdown(); System.out.println("--- Koniec pojedynczego opóżnionego egzekutora -------------"); System.out.flush(); //------------------------------------------------- // Przykład uruchamiania puli wątków, // gdzie uruchamiamy więcej wątków niż wielkość puli // Widać ze wątki powyżej // 5 startują wtedy gdy "stare" zakończą swoją pracę //------------------------------------------------- System.out.println("--- Start egzekutora o stałej liczbie wątków -------------"); System.out.flush(); final ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(5); for (int i = 0; i < 10; i++){ newFixedThreadPool.execute(new SuperWorker(5, "newFixedThreadPool_" + i)); } newFixedThreadPool.shutdown(); try{ newFixedThreadPool.awaitTermination(10, TimeUnit.SECONDS); } catch (InterruptedException e){ e.printStackTrace(); } newFixedThreadPool.shutdown(); System.out.println("--- Koniec egzekutora o stałej liczbie wątków -------------"); System.out.flush(); //------------------------------------------------- // Przykład uruchamiania puli wątków, // indywidualnie skonfigurowanej //------------------------------------------------- System.out.println("--- Start własnej pili wątków -------------"); System.out.flush(); final ExecutorService executorService= new ThreadPoolExecutor( 2, 4, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(20), new ThreadPoolExecutor.CallerRunsPolicy()); for (int i = 0; i < 10; i++){ executorService.execute(new SuperWorker(5, "newFixedThreadPool_" + i)); } executorService.shutdown(); try{ executorService.awaitTermination(10, TimeUnit.SECONDS); } catch (InterruptedException e){ e.printStackTrace(); } executorService.shutdown(); System.out.println("--- Koniec własnej pili wątków -------------"); System.out.flush(); } } |
Przykład pobrania rezultatów pracy wątku przy pomocy interfejsu Callable i Future:
import java.util.concurrent.*; /** * Klasa przetwarzająca dane i zwracająca je do głównego wątku * poprzez wywoałnie metody get() */ class SuperFutureWorker implements Callable<Integer>{ private int counter = 0; private int countTo = 0; private int result = 1; private String name; public SuperFutureWorker(int countTo, String name){ this.countTo = countTo; this.name = name; } @Override public Integer call(){ System.out.println("Start wątku: " + name); System.out.flush(); try{ while (counter < countTo){ counter++; this.result = this.result + this.result * counter; System.out.println("Wątek " + name + " liczy " + counter); System.out.flush(); Thread.sleep(100); } } catch (InterruptedException e){ e.printStackTrace(); } System.out.println("Koniec wątku: " + name + " zwracam " + result); System.out.flush(); return result; } } public class Thread6Future{ public static void main(String[] args){ //------------------------------------------------- // Przykład uruchamiania pojedyńczego wątku //------------------------------------------------- System.out.println("--- Start pojedynczego egzekutora -------------"); System.out.flush(); final ExecutorService executorService = Executors.newSingleThreadExecutor(); final Future<Integer> integerFuture = executorService.submit(new SuperFutureWorker(5, "Worker1")); executorService.shutdown(); try{ final Integer integer = integerFuture.get(); System.out.println("Wynik wątku: " + integer); } catch (InterruptedException | ExecutionException e){ e.printStackTrace(); } System.out.println("--- Koniec pojedynczego egzekutora -------------"); } } |
Inne artykuły z tej serii:
- Wielowątkowość cz1. – wstęp, theread, synchronizacja
- Wielowątkowość cz2. – komunikacja międzywątkowa, synchronizacja poprzez kolejkę
- Wielowątkowość cz3. – egzekutory, pule wątków, Future i Callable
- Wielowątkowość cz4. – framerwork Fork/Join
- Wielowątkowość cz5. – klasy wspomagające Semaphore, CountDownLatch, CyclicBarrier itd.
Możliwość komentowania jest wyłączona.