Wielowątkowość w programowaniu to nie jest łatwy temat, źle użyta jest przyczyną bardzo wielu kłopotów.
Ten temat będzie podzielony na parę części:
- Wielowątkowość cz1. – wstęp, theread, synchronizacja
- Wielowątkowość cz2. – komunikacja międzywątkowa, synchronizacja poprzez kolejkę
- Wielowątkowość cz3. – egzekutory, pule wątków, Future i Callable
- Wielowątkowość cz4. – framerwork Fork/Join
- Wielowątkowość cz5. – klasy wspomagające Semaphore, CountDownLatch, CyclicBarrier itd.
Przykład program napisany jednowątkowo działa wyśmienicie, po zmianach i podzieleniu go na kilka wątków przestaje działać i nie wiadomo dlaczego. Okazuje się ze wątki zaczynają walczyć o dostęp do zasobów i dochodzi do zakleszczenia, tzw. deadlock’a. Nie mówiąc o tym że aplikacje wielowątkowe dużo ciężej się testuje i debuguje.
Dlatego przy pisaniu aplikacji wielowątkowych warto stosować kilka praktyk:
- operacje na danych powinny być jak najbardziej atomowe
- dobrze jest stosować obiekty immutable (obiekty niezmienialne), które z założenia są thereadsafe (bezpieczne dla wątków). Ponieważ po inicjacji obiektu jego stan nie zmienia się nie musimy się martwić o synchronizacje, deathlock’i. Ich największa wada: duża liczba obiektów jest kosztowna dla garbage collector’a oraz kosztowny jest ich tworzenie. Obiekty immutable dobrze jest oznaczać jako finalne, wtedy z automatu są bezpieczne dla wątków.
- synchronizacja – jeżeli jest możliwość to jej unikajmy, bo albo stosujmy tylko na atomowych metodach. Jeżeli będziemy synchronizować za dużo metod, to defakto będziemy mieli aplikację jednowątkową działającą na kilku wątkach
- jeżeli planujemy przeprowadzanie operacji na większej ilości danych, to można zrobić tak. Każdy proces dostaję swoją porcję danych, ale ich składaniem wyników do jednej struktury zajmuje się osobny wątek. Do tego celu nadaje się framework Fork/Join
Tworzenie wątków w Javie:
Generalnie możemy stworzyć wątki na 3 sposoby:
- implementacja interfejsu Runnable
- rozszerzenie klasy Thread
- jako wyrażenie lambda
Synchronizacja:
W Javie można synchronizować:
- metody,
- fragmenty kodów
- niekiedy jeżeli mamy po użyciu synchronizacji bardzo wyraźny spadek wydajności, można spróbować użyć wzorca: double-checked locking. Polega on na tym, że najpierw sprawdzamy warunek czy nie trzeba wykonać kodu w bloku synchronizowanym.
- można użyć do synchronizacji klas: ReentrantLock i ReentrantReadWriteLock.
Przykłady
Tworzenie wątków
package pl.jclab.examples.thread; public class Thread1{ /** * Wątek roboczy rozszezenie klasy Thread */ private class ExampleWorker extends Thread{ public ExampleWorker(String name){ super(name); } @Override public void run(){ for (int i = 0; i < 4; i++){ System.out.println("ExampleWorker run -> name: " + this.getName()); try{ Thread.sleep(200); } catch (InterruptedException e){ e.printStackTrace(); } } } } /** * Wątek roboczy implementacja interfejsu Runnable */ private class ExampleWorker2 implements Runnable{ @Override public void run(){ for (int i = 0; i < 5; i++){ System.out.println("ExampleWorker2 run -> name: " + Thread.currentThread().getName()); try{ Thread.sleep(500); } catch (InterruptedException e){ e.printStackTrace(); } } } } /** * Wątek roboczy jako wyrażenie lambda */ Runnable exampleWorker3 = () -> { for (int i = 0; i < 5; i++){ System.out.println("ExampleWorker3 run -> name: " + Thread.currentThread().getName()); try{ Thread.sleep(400); } catch (InterruptedException e){ e.printStackTrace(); } } }; /** * Główna metoda uruchomieniowa * @param args */ public static void main(String[] args){ Thread1 thread1 = new Thread1(); thread1.test(); } /** * Metoda testująca */ private void test(){ // // Tworzenie i ruchamianie wątków // Thread thread = new ExampleWorker("Worker1"); thread.start(); Thread thread2 = new Thread(new ExampleWorker2(), "Worker2"); thread2.start(); Thread thread3 = new Thread(exampleWorker3, "Worker3"); thread3.start(); } } |
Synchronizacja
package pl.jclab.examples.thread; public class Thread2{ /** * Interfejs */ abstract class Message{ public abstract void message(String message); void print(String message){ System.out.print('['); System.out.print(Thread.currentThread().getName()); System.out.print(']'); System.out.print(message); System.out.println(); System.out.flush(); } } /** * Nie synchronizowana metoda wyświetlania komunikatu */ private class NonSynchronizedMessage extends Message{ public void message(String message){ print(message); } } /** * Synchronizowana metoda wyświetlenia komunikatu */ private class SynchronizedMessage extends Message{ public synchronized void message(String message){ print(message); } } /** * Synchronizowana metoda wyświetlenia komunikatu */ private class BlockSynchronizedMessage extends Message{ public void message(String message){ synchronized (this){ print(message); } } } /** * Klasa robocza */ private class Worker extends Thread{ private String text; private Message message; public Worker(String name, String text, Message message){ super(name); this.text = text; this.message = message; } @Override public void run(){ message.message(text); } } /** * Główna metoda uruchomieniowa * * @param args */ public static void main(String[] args){ Thread2 thread2 = new Thread2(); thread2.test(); } /** * Metoda testująca */ private void test(){ System.out.println("Start not synchronized methods"); Worker worker = new Worker("NonSync1", "Hello", new NonSynchronizedMessage()); Worker worker2 = new Worker("NonSync2", "World", new NonSynchronizedMessage()); Worker worker3 = new Worker("NonSync3", "!!!!!", new NonSynchronizedMessage()); worker.start(); worker2.start(); worker3.start(); try{ worker.join(); worker2.join(); worker3.join(); Thread.sleep(2000); } catch (InterruptedException e){ e.printStackTrace(); } System.out.println(); System.out.println("Start synchronized methods"); Worker syncWorker = new Worker("Sync1", "Hello", new SynchronizedMessage()); Worker syncWorker2 = new Worker("Sync2", "World", new SynchronizedMessage()); Worker syncWorker3 = new Worker("Sync3", "!!!!!", new SynchronizedMessage()); syncWorker.start(); syncWorker2.start(); syncWorker3.start(); try{ syncWorker.join(); syncWorker2.join(); syncWorker3.join(); Thread.sleep(2000); } catch (InterruptedException e){ e.printStackTrace(); } System.out.println(); System.out.println("Start synchronized block"); Worker blockSyncWorker = new Worker("BlockSync1", "Hello", new BlockSynchronizedMessage()); Worker blockSyncWorker2 = new Worker("BlockSync2", "World", new BlockSynchronizedMessage()); Worker blockSyncWorker3 = new Worker("BlockSync3", "!!!!!", new BlockSynchronizedMessage()); blockSyncWorker.start(); blockSyncWorker2.start(); blockSyncWorker3.start(); try{ blockSyncWorker.join(); blockSyncWorker2.join(); blockSyncWorker3.join(); } catch (InterruptedException e){ e.printStackTrace(); } } } |
Podwójne sprawdzanie i synchronizowanie
package pl.jclab.examples.thread; /** * Przykładowa klasa realizująca podwójną blokadę * przy pomocy patenu Double Checked Locking */ class DoubleCheckedLocking{ // // Instancja klasy // Słowo kluczowe volatile oznacza ze zawsze będzie odczytywana najświęzsza wersja // private volatile static DoubleCheckedLocking instance; private DoubleCheckedLocking(){ } /** * @return */ public static DoubleCheckedLocking getInstance(){ if (instance == null){ synchronized (DoubleCheckedLocking.class){ if (instance == null){ instance = new DoubleCheckedLocking(); } } } return instance; } /** * */ public synchronized void print(){ System.out.println("Caused by: " + Thread.currentThread().getName()+" hash: "+ Integer.toHexString(hashCode())); System.out.flush(); } } public class Thread3{ /** * Klasa robocza */ private class Worker extends Thread{ public Worker(String name){ super(name); } @Override public void run(){ final DoubleCheckedLocking instance = DoubleCheckedLocking.getInstance(); instance.print(); } } /** * Główna metoda uruchomieniowa * * @param args */ public static void main(String[] args){ Thread3 thread3 = new Thread3(); thread3.test(); } /** * Metoda testująca */ private void test(){ Worker worker = new Worker("Worker1"); Worker worker2 = new Worker("Worker2"); Worker worker3 = new Worker("Worker3"); worker.start(); worker2.start(); worker3.start(); try{ worker.join(); worker2.join(); worker3.join(); } catch (InterruptedException e){ e.printStackTrace(); } } } |
Możliwość komentowania jest wyłączona.