Wielowątkowość w programowaniu to nie jest łatwy temat, źle użyta jest przyczyną bardzo wielu kłopotów.

Ten temat będzie podzielony na parę części:

Przykład program napisany jednowątkowo działa wyśmienicie, po zmianach i podzieleniu go na kilka wątków przestaje działać i nie wiadomo dlaczego. Okazuje się ze wątki zaczynają walczyć o dostęp do zasobów i dochodzi do zakleszczenia, tzw. deadlock’a. Nie mówiąc o tym że aplikacje wielowątkowe dużo ciężej się testuje i debuguje.

Dlatego przy pisaniu aplikacji wielowątkowych warto stosować kilka praktyk:

  • operacje na danych powinny być jak najbardziej atomowe
  • dobrze jest stosować obiekty immutable (obiekty niezmienialne), które z założenia są thereadsafe (bezpieczne dla wątków). Ponieważ po inicjacji obiektu jego stan nie zmienia się nie musimy się martwić o synchronizacje, deathlock’i. Ich największa wada: duża liczba obiektów jest kosztowna dla garbage collector’a oraz kosztowny jest ich tworzenie. Obiekty immutable dobrze jest oznaczać jako finalne, wtedy z automatu są bezpieczne dla wątków.
  • synchronizacja – jeżeli jest możliwość to jej unikajmy, bo albo stosujmy tylko na atomowych metodach. Jeżeli będziemy synchronizować za dużo metod, to defakto będziemy mieli aplikację jednowątkową działającą na kilku wątkach
  • jeżeli planujemy przeprowadzanie operacji na większej ilości danych, to można zrobić tak. Każdy proces dostaję swoją porcję danych, ale ich składaniem wyników do jednej struktury zajmuje się osobny wątek. Do tego celu nadaje się framework Fork/Join

Tworzenie wątków w Javie:

Generalnie możemy stworzyć wątki na 3 sposoby:

  • implementacja interfejsu Runnable
  • rozszerzenie klasy Thread
  • jako wyrażenie lambda

Synchronizacja:

W Javie można synchronizować:

  • metody,
  • fragmenty kodów
  • niekiedy jeżeli mamy po użyciu synchronizacji bardzo wyraźny spadek wydajności, można spróbować użyć wzorca: double-checked locking. Polega on na tym, że najpierw sprawdzamy warunek czy nie trzeba wykonać kodu w bloku synchronizowanym.
  • można użyć do synchronizacji klas: ReentrantLock i ReentrantReadWriteLock.

 

Przykłady

Tworzenie wątków

package pl.jclab.examples.thread;
 
public class Thread1{
 
   /**
    * Wątek roboczy rozszezenie klasy Thread
    */
   private class ExampleWorker extends Thread{
 
      public ExampleWorker(String name){
         super(name);
      }
 
      @Override
      public void run(){
         for (int i = 0; i < 4; i++){
            System.out.println("ExampleWorker run -> name: " + this.getName());
            try{
               Thread.sleep(200);
            } catch (InterruptedException e){
               e.printStackTrace();
            }
         }
      }
   }
 
   /**
    * Wątek roboczy implementacja interfejsu Runnable
    */
   private class ExampleWorker2 implements Runnable{
 
      @Override
      public void run(){
         for (int i = 0; i < 5; i++){
            System.out.println("ExampleWorker2 run -> name: " + Thread.currentThread().getName());
            try{
               Thread.sleep(500);
            } catch (InterruptedException e){
               e.printStackTrace();
            }
         }
      }
   }
 
   /**
    * Wątek roboczy jako wyrażenie lambda
    */
   Runnable exampleWorker3 = () -> {
      for (int i = 0; i < 5; i++){
         System.out.println("ExampleWorker3 run -> name: " + Thread.currentThread().getName());
         try{
            Thread.sleep(400);
         } catch (InterruptedException e){
            e.printStackTrace();
         }
      }
   };
 
   /**
    * Główna metoda uruchomieniowa
    * @param args
    */
   public static void main(String[] args){
 
      Thread1 thread1 = new Thread1();
      thread1.test();
   }
 
   /**
    * Metoda testująca
    */
   private void test(){
 
      //
      // Tworzenie i ruchamianie wątków
      //
      Thread thread = new ExampleWorker("Worker1");
      thread.start();
 
      Thread thread2 = new Thread(new ExampleWorker2(), "Worker2");
      thread2.start();
 
      Thread thread3 = new Thread(exampleWorker3, "Worker3");
      thread3.start();
   }
 
}

Synchronizacja

package pl.jclab.examples.thread;
 
public class Thread2{
 
   /**
    * Interfejs
    */
   abstract class Message{
      public abstract void message(String message);
 
      void print(String message){
         System.out.print('[');
         System.out.print(Thread.currentThread().getName());
         System.out.print(']');
         System.out.print(message);
         System.out.println();
         System.out.flush();
      }
 
   }
 
 
   /**
    * Nie synchronizowana metoda wyświetlania komunikatu
    */
   private class NonSynchronizedMessage extends Message{
      public void message(String message){
         print(message);
      }
   }
 
   /**
    * Synchronizowana metoda wyświetlenia komunikatu
    */
   private class SynchronizedMessage extends Message{
      public synchronized void message(String message){
         print(message);
      }
   }
 
   /**
    * Synchronizowana metoda wyświetlenia komunikatu
    */
   private class BlockSynchronizedMessage extends Message{
      public void message(String message){
         synchronized (this){
            print(message);
         }
      }
   }
 
   /**
    * Klasa robocza
    */
   private class Worker extends Thread{
 
      private String text;
      private Message message;
 
      public Worker(String name, String text, Message message){
         super(name);
         this.text = text;
         this.message = message;
      }
 
      @Override
      public void run(){
         message.message(text);
      }
   }
 
   /**
    * Główna metoda uruchomieniowa
    *
    * @param args
    */
   public static void main(String[] args){
 
      Thread2 thread2 = new Thread2();
      thread2.test();
   }
 
   /**
    * Metoda testująca
    */
   private void test(){
 
      System.out.println("Start not synchronized methods");
 
      Worker worker = new Worker("NonSync1", "Hello", new NonSynchronizedMessage());
      Worker worker2 = new Worker("NonSync2", "World", new NonSynchronizedMessage());
      Worker worker3 = new Worker("NonSync3", "!!!!!", new NonSynchronizedMessage());
 
      worker.start();
      worker2.start();
      worker3.start();
 
      try{
         worker.join();
         worker2.join();
         worker3.join();
         Thread.sleep(2000);
      } catch (InterruptedException e){
         e.printStackTrace();
      }
 
      System.out.println();
      System.out.println("Start synchronized methods");
 
      Worker syncWorker = new Worker("Sync1", "Hello", new SynchronizedMessage());
      Worker syncWorker2 = new Worker("Sync2", "World", new SynchronizedMessage());
      Worker syncWorker3 = new Worker("Sync3", "!!!!!", new SynchronizedMessage());
 
      syncWorker.start();
      syncWorker2.start();
      syncWorker3.start();
 
      try{
         syncWorker.join();
         syncWorker2.join();
         syncWorker3.join();
         Thread.sleep(2000);
      } catch (InterruptedException e){
         e.printStackTrace();
      }
 
      System.out.println();
      System.out.println("Start synchronized block");
 
      Worker blockSyncWorker = new Worker("BlockSync1", "Hello", new BlockSynchronizedMessage());
      Worker blockSyncWorker2 = new Worker("BlockSync2", "World", new BlockSynchronizedMessage());
      Worker blockSyncWorker3 = new Worker("BlockSync3", "!!!!!", new BlockSynchronizedMessage());
 
      blockSyncWorker.start();
      blockSyncWorker2.start();
      blockSyncWorker3.start();
 
      try{
         blockSyncWorker.join();
         blockSyncWorker2.join();
         blockSyncWorker3.join();
      } catch (InterruptedException e){
         e.printStackTrace();
      }
   }
 
}

Podwójne sprawdzanie i synchronizowanie

package pl.jclab.examples.thread;
 
/**
 * Przykładowa klasa realizująca podwójną blokadę
 * przy pomocy patenu Double Checked Locking
 */
class DoubleCheckedLocking{
 
   //
   // Instancja klasy
   // Słowo kluczowe volatile oznacza ze zawsze będzie odczytywana najświęzsza wersja
   //
   private volatile static DoubleCheckedLocking instance;
 
   private DoubleCheckedLocking(){
   }
 
   /**
    * @return
    */
   public static DoubleCheckedLocking getInstance(){
 
      if (instance == null){
         synchronized (DoubleCheckedLocking.class){
            if (instance == null){
               instance = new DoubleCheckedLocking();
            }
         }
      }
 
      return instance;
   }
 
   /**
    *
    */
   public synchronized void print(){
      System.out.println("Caused by: " + Thread.currentThread().getName()+" hash: "+ Integer.toHexString(hashCode()));
      System.out.flush();
   }
}
 
public class Thread3{
 
   /**
    * Klasa robocza
    */
   private class Worker extends Thread{
 
      public Worker(String name){
         super(name);
      }
 
      @Override
      public void run(){
         final DoubleCheckedLocking instance = DoubleCheckedLocking.getInstance();
         instance.print();
      }
   }
 
 
   /**
    * Główna metoda uruchomieniowa
    *
    * @param args
    */
   public static void main(String[] args){
      Thread3 thread3 = new Thread3();
      thread3.test();
   }
 
   /**
    * Metoda testująca
    */
   private void test(){
 
      Worker worker = new Worker("Worker1");
      Worker worker2 = new Worker("Worker2");
      Worker worker3 = new Worker("Worker3");
 
      worker.start();
      worker2.start();
      worker3.start();
 
      try{
         worker.join();
         worker2.join();
         worker3.join();
      } catch (InterruptedException e){
         e.printStackTrace();
      }
   }
}