8  CompletableFuture Framework

8.1 Monte-Carlo-Berechnung von \(\pi\)

Wie \(\pi\) hier berechnet wird, ist nicht prüfungsrelevant.

8.1.1 Exkurs: Monte-Carlo-Algorithmen

Auch dieser Exkurs ist nicht prüfungsrelevant. Monte-Carlo-Algorithmen sind randomisierte Algorithmen, die durch fortgesetzte Wahl von Zufallszahlen versuchen, ein Ergebnis anzunähern. Es gibt bei ihnen aber keine Garantie für die Korrektheit des Ergebnisses. Das genau unterscheidet Monte-Carlo-Algorithmen von Las-Vegas-Algorithmen. Letztere kommen immer garantiert zum korrekten Ergebnis, verwenden die Zufallskomponente aber dazu, den Weg mglw. abzukürzen.

8.1.2 Monte-Carlo-Berechnung von \(\pi\)

Ein bekannter Monte-Carlo-Algorithmus zur Annäherung von \(\pi\) basiert darauf, dass zufällig Punkte (x y) im Koordinatenbereich von (0, 0) bis (1, 1) gezogen werden. Für jeden gezogenen Punkt wird vermerkt, ob er innerhalb eines Einheitsviertelkreises (Radius=1) liegt. Durch Anwendung des Satzes des Pythagoras, kann dies geprüft werden.

Von zufällig gewählten Punkten mit Koordinaten ([0..1], [0..1]) kann man leicht feststellen, ob Sie genau auf, innerhalb oder außerhalb des Einheitsviertelkreis liegen:

eigene Darstellung

eigene Darstellung

Die Koordinaten eines zufällig gewählten Punkts sind \((x, y)\) mit

\[0 \le x \le 1\] und \[0 \le y \le 1\]

Der Punkt liegt aufgrund des Satzes von Pythagoras genau auf dem Kreisbogen, wenn Folgendes gilt (in der obigen Abbildung links): \[\sqrt{x^2 + y ^2} = 1\]

Der Punkt liegt aufgrund des Satzes von Pythagoras innerhalb des Kreises, wenn Folgendes gilt (in der obigen Abbildung in der Mitte): \[\sqrt{x^2 + y ^2} < 1\]

Der Punkt liegt aufgrund des Satzes von Pythagoras außerhalb des Kreises, wenn Folgendes gilt (in der obigen Abbildung rechts): \[\sqrt{x^2 + y ^2} > 1\]

Werden sehr viele Punkte gezogen, die sich innerhalb der Koordinaten (0, 0) und (1, 1) gleichmäßig verteilen, sollte sich für das Verhältnis der Anzahl der Punkte innerhalb des Kreises (einschließlich auf dem Kreisbogen) zu allen innerhalb des Quadrats das Verhältnis der Fläche des Viertelkreises (Radius 1) zur Fläche des Quadrats (Kantenlänge 1) ergeben.

\[\frac{\mbox{Anzahl der Punkte innerhalb und auf dem Kreis}}{\mbox{Anzahl aller Punkte}} = \frac{\frac{\pi r^2}{4}}{r^2}\]

Da \(r=1\) kann \(\pi\) angenähert werden als pi = 4.0 * in / (in + out), wobei in die Anzahl der Punkte innerhalb des Kreises ist (sqrt(x*x + y*y) <= 1) und out die restlichen, die außerhalb des Kreises liegen.

Falls die zufällig gezogenen Punkte gleichmäßig über die Fläche verteilt sind, nähert sich pi der Konstante \(\pi\)1 an. Es liegt in der Natur der Sache von Monte-Carlo-Algorithmen, dass dies nicht in jedem Fall zu einem korrekten Ergebnis führt.

Je mehr Punkte gezogen werden, desto eher sollte im Mittel der Wert von \(\pi\) angenähert werden. Es ist also sinnvoll, so viele Punkte wie möglich zu ziehen.

8.2 Überblick und Grundlagen von “CompletableFuture”

8.2.1 Zweck und grundlegende Struktur von “CompletableFuture”

  • Pipelining: funktionale Verkettung mehrerer Future/Callable
    • Output eines Callable wird Input für nächstes Callable
    • verwendet Regeln, um zu entscheiden, wo nächstes Callable ausgeführt wird:
      • im vorigen Thread
      • im Aufrufer-Thread oder
      • einem neuen Thread
  • Verwendung des commonPool
  • asynchrone Methodenaufrufe
    • Standard für verteilte Architekturen
    • reaktive Programmierung / reaktive Architektur \(\to\) “hängt” nicht, (vertikal) skalierbar
  • CompletableFuture wirkt wie ein Versprechen, dass zukünftig ein Ergebnis vorliegt (“Promise”)
  • Future: read-only Container für zukünftiges Ergebnis
  • CompletionStage: triggert weitere Verarbeitung

8.2.1.1 Exkurs: mit CompletableFuture vergleichbare Konzepte

(nicht prüfungsrelevant)

  • JavaScript: Promise (.then(onFulfillment) .catch(onRejection))
  • Clojure: promise/deliver/deref
  • Guava (populäre Java-Library von Google): SettableFuture (Interface ListenableFuture)
  • async/await in C#, Dart, JavaScript, Scala, Rust

8.2.2 CompletionStage (Push-API)

  • bietet eine Reihe von Dreier-Paaren:
    • “nicht-asynchrone” Ausführung
    • asynchrone Ausführung in commonPool
    • asynchrone Ausführung mit eigenem Executor statt commonPool
  • Ergebnis ist immer eine neue CompletionStage

Consumer, Function, BiFunction sind Functional Interfaces (“single abstract method Interfaces”), die es seit Java 8 im Zuge der Einführung von Lambda-Ausdrücken im Package java.util.function gibt:

@FunctionalInterface
public interface Consumer<T> {
    public void accept(T t);
}

@FunctionalInterface
public interface Function<T,R>
    public R apply(T t);
}

@FunctionalInterface
public interface BiFunction<T,U,R> {
    public R apply(T t, U u);
}

8.2.3 Überblick über das API von CompletableFuture

Das Application Programming Interface (API), also die Benutzungsschnittstelle von CompletableFuture, kann neben dem bereits vorgestellten Push-API in die folgenden weiteren Bereiche gegliedert werden:

Hettel und Tran (2016, 240) (modifiziert: Annotationen und supplyAsync statt applyAsync in Start-API)

Hettel und Tran (2016, 240) (modifiziert: Annotationen und supplyAsync statt applyAsync in Start-API)

8.2.3.1 Pull/Poll-API

Einige der im Vergleich zu Future bei Pull/Poll aus dem Bereich der Steuerung der asynchronen Verarbeitung hinzugekommene Methoden sind:

  • T join(): Liefert das Ergebnis der asynchronen Verarbeitung, wirft aber im Gegensatz zu get() im Fehlerfall eine unchecked Exception: Sowohl get() als auch join() blockieren, bis das Ergebnis vorliegt. Beide werfen im Abbruchfall (Aufruf von cancel()) eine CancellationException. Im Fehlerfall wirft …
    • get() eine (checked) ExecutionException, die also auf jeden Fall mit einem try/catch behandelt oder mit throws deklariert werden muss (“if this future completed exceptionally”).
    • join() eine (unchecked) CompletionException, die also nicht unbedingt mit try/catch behandelt werden muss (“if this future completed exceptionally or a completion computation threw an exception”).
  • T getNow(T valueIfAbsent): Fragt nach dem Ergebnis. Ist (noch) keines vorhanden, wird valueIfAbsent zurückgegeben. Auch diese Methode wirft im Fehlerfall ebenfalls CancellationException oder CompletionException.
  • boolean isCompletedExceptionally(): Liefert true, falls während der Verarbeitung eine Ausnahme aufgetreten ist.

8.2.4 internes API

Mit den Methoden complete und completeExecptionally des internen API von CompletableFuture kann man “low level” ein CompletableFuture befüllen. Im folgenden Beispiel wird in der Methodde calculateAsync zuerst ein “leeres” CompletableFutre-Objekt für den Typ-Parameter Integer erzeugt.

Danach wird es asynchron befüllt, indem ein Task (vom Typ Runnable) an den commonPool “submitted” wird. In einem Thread des commonPool wird asynchron zum Aufrifer-Thread (hier der main-Thread) ein Ergebnis berechnet (im Beispiel ist das einfach 42) und dem bereits erzeugten CompletableFuture als Ergebnis mitgeteilt. Dies könnte an dem CompletableFuture eine Kette weiterer Verarbeitungsschritte triggern.

static CompletableFuture<Integer> calculateAsync() {
    var result = new CompletableFuture<Integer>();
    ForkJoinPool.commonPool().submit(() -> {
        try {
            var res = /* aufwändige Berechnung, Ergebnis z.B. */ 42;
            result.complete(res);
        } catch (Exception ex) {
            result.completeExceptionally(ex);
        }
    });
    return result;
}

public static void main(String... args) {
    var cf = calculateAsync();
    try {
        System.out.println(cf.get());
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    } catch (ExecutionException e) {
        System.err.print(e);
    }
}

Sollte es aber eine Exception bei der Berechnung von res geben, kann sie dadurch “für später aufgehoben” werden, indem sie mit completeExceptionally dem CompletableFuture anstatt des Ergebnisse mitgeteilt wird.

8.2.5 Start-API

Normalerweise wird man eine CompletableFuture-Verarbeitungskette aber nicht durch Instantiieren eines “leeren” CompletableFuture-Objekts und darauffolgende Befüllung über das interne API starten wie im vorigen Beispiel, sondern Methoden des Start-API zum Starten einer CompletableFuture-Kette nutzen. Im folgenden Beispiel wird dazu supplyAsync verwendet:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;

public class SimpleCompletableFuture {
  static class Task implements Supplier<Integer> {
    @Override
    public Integer get() {
      return /* aufwändige Berechnung, Ergebnis z.B. */ 42;
    }
  }

  public static void main(String... args)
      throws InterruptedException, ExecutionException {
    var future = CompletableFuture.supplyAsync(new Task());
    System.out.println(future.get());
  }
}

CompletableFuture hat eine Reihe Klassenmethoden im “Start-API”, die jeweils bestimmte Single Abstract Method bzw. Functional Interfaces unterstützen. Im obigen Beispiel wird supplyAsync zum Start einer Verarbeitungskette benutzt. Diese Klassenmethode erwatret ein Objekt, das das Supplier<T>-Interface implementiert. Das Resultat ist dann ein CompletableFuture<T>, an dem weitere (asynchrone) Verabreitungsschritte verkettet werden können. Liegt ein Ergebnis vom Aufruf der Single Abstract Method get des Supplier-Objekts vor, kann dies in der Verarbeitungskette weitere Methoden triggern, die das Ergebnis von get, das asynchron im commonPool berechnet wurde, als Input nehmen.

8.2.6 Start-API

In der Praxis würde man das aber eher als Lambda-Ausdruck notieren (hier mit {}, damit es näher am Code des vorigen Beispiels ist – stattdessen ganz kurz: () -> 42).

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class SimpleCompletableFuture {
  public static void main(String... args)
      throws InterruptedException, ExecutionException {
    var future = CompletableFuture.supplyAsync(() -> {
      return /* aufwändige Berechnung, Ergebnis z.B. */ 42;
    });
    System.out.println(future.get());
  }
}

Mit supplyAsync wird ein Callable asynchron berechnet. Sobald das Ergebnis im CompltableFuture vorliegt, kann eine über das Push-API vereinbarte Folgefunktion getriggert werden, die das Ergebnis des Suppliers (Ergebnis des Methodenaufrufs get des Supplier-Objekts) als Eingangsparameter erhält.

Auch wenn es wegen der bisherigen Betonung des Push-API nicht sinnvoll erscheinen mag: Man kann ein CompletableFuture auch mit einem Runnable statt eines Callable starten. Die Methode dafür heißt runAsync. Die Verwendung von Runnable statt Callable bedeutet, dass es kein Ergebnis gibt. Lediglich die Information, dass das Ende der run-Methode erreicht ist, und ggf. “für später aufgehobene” Exceptions können im CompletableFuture repräsentiert werden. Statt die Berechnung der nächsten CompletionStage zu triggern gibt es aber noch andere Verwendungsmöglichkeiten, die später vorgestellt werden.

Der Typ-Parameter für das CompletableFuture ist im Fall von runAsync Void (in Anlehnung an das Schlüsselword void, das anzeigt, dass eine Methode keinen Rückgabewert hat). Bei supplyAsync entspricht der Typ-Parameter des CompletableFuture dem Typ-Parameter des Callable, also dem Ergebnis-Typ von get.

Im Gegensatz zu den Dreierpaaren des CompletionStage-Interface (Push-API) gibt es bei deisen beiden Methoden des Start-API nur jeweils zwei Varianten. Die Start-Berechnung des CompletableFuture kann nämlich entweder im commonPool oder einem anderen Threadpool (Executor-Interface) erfolgen. Die Signaturen der vier sich ergebene Methoden sind:

CompletableFuture<U> supplyAsync(Supplier<U> supplier)
CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

CompletableFuture<Void> runAsync(Runnable runnable)
CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)

8.3 Push-API von CompletableFuture: lineare Ketten und Verzweigungen

8.3.1 asynchrone Verarbeitungskette

Die Namen der Methoden des Push-API’s beginnen in der Regel mit then. Mit ihnen wird dem CompletableFuture-Objekt, an dem sie aufgerufen werden, jeweils ein Trigger hinzugefügt. Wenn die Bedingung erfüllt ist (in der Regel ist dies, dass das Ergebnis fertig berechnet ist), wird eine weitere Methode asynchron zu dem Thread, der den Trigger durch den ``.then”-Aufruf installiert hatte, ausgeführt.

Bei vielen der then-Methoden ist das Ergebnis der vorigen CompletionStage der Input für die getriggerte Methode. Es gibt aber auch den Fall, dass bei thenRun ein Runnable gestartet wird, dessen run-Methode keinen Input-Parameter hat.

CompletableFuture<U> thenApplyAsync(Function<? super T, ? extends U> f)
CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
  • thenApplyAsync: f ist eine Funktion, die einen Parameter vom Typ T (oder “allgemeiner”) erwartet und ein Ergebnis vom Typ U (oder “spezifischer”) hat.
    • Function ist ein “Functional Interface” aus dem Package java.util.function2
  • thenConsumeAsync: action ist eine Funktion, die einen Parameter vom Typ ``T (oder “allgemeiner”) erwartet und kein Ergebnis hat.
    • Consumer ist ein “Functional Interface” aus dem Package java.util.function3

Für jedes thenApplyAsync und theAcceptAsync wird ggf. ein eigener Thread aus dem Threadpool benutzt.

Hettel und Tran (2016, 246)

Hettel und Tran (2016, 246)

Der Sinn von CompletableFutures und dessen Push-API ist, dass man auf elegante Weise lange Ketten asynchroner Aufrufe in einem “fluent”-Stil programmieren kann:

var cf = CompletableFuture
    .supplyAsync(()-> fn1())
    .thenApplyAsync((in) -> in.fn2())
    .thenApplyAsync((in) -> in.fn3())
    .thenAcceptAsync((in) -> in.fn4());

Das ist die asynchrone Variante der folgenden sequenziellen Verkettung:

fn1().fn2().fn3().fn4();

Bzw. wenn das API funktional und nicht objekt-orientiert gestaltet wäre:

var cf = CompletableFuture
    .supplyAsync(()-> fn1())
    .thenApplyAsync((in) -> fn2(in))
    .thenApplyAsync((in) -> fn3(in))
    .thenAcceptAsync((in) -> in4(in));

fn4(fn3(fn2(fn1())));

8.3.2 asynchrone Verarbeitungskette

Da Java eine statisch getypte Sprache ist und mit den Generics ein umfassendes Typ-System zur Verfügung steht, liegt es in der Natur der Sache, dass die asynchronen Verarbeitungsketten, die mit dem CompletableFuture-Framework und seinem Push-API aufgebaut werden können, typisiert sind.

In der folgenden Grafik wird die Typisierung an einem Beispiel demonstriert (der Typ CompletableFuture wird durch CF abgekürzt):

Hettel und Tran (2016, 252 (modifiziert))

Hettel und Tran (2016, 252 (modifiziert))
  • Task1 soll ein Ergebnis vom Typ T liefern.
  • Mit thenApplyAsync wird eine Function (z.B. in Form eines Lambda-Ausdrucks) übergeben, deren Single Abstract Method einen Eingangsparameter vom Typ T hat und als Ergebnis den Typ U liefert: Das ist Task2.
    • Das Ergebnis des Aufrufs von thenApplyAsync ist ein CompletableFuture<U>.
  • Wenn das Ergebnis dieser Funktion vorliegt, soll das die Berechnung von Task3 triggern. Beim Methodenaufruf von thenApplyAsync wurde dementsprechend wieder ein Objekt vom Typ Function übergeben. Diesmal ist die Signatur der Single Abstract Method dieses Objekts aber so, dass ein Eingangsparameter vom Typ U benötigt wird (dem Typ des Ergebnisses der vorigen CompletionStage) und das Resultat ist vom Typ V.
    • Das Ergebnis des Aufrufs von theApplyAsync ist ein CompletableFuture<V>.

8.3.3 Beispiel für eine asynchrone Verarbeitungskette

Das Prinzip der asynchronen linearen Verkettung mit dem Push-API soll hier an einem Beispiel deutlich gemacht werden. Dazu nehmen wir an, es gäbe ein Interface namens Service, in dem mindestens die folgenden drei Methoden mit diesen Signaturen definiert wären. Außerdem soll es die drei Typen User, Profile und AccessRight geben, die in diesen Signaturen benutzt werden.

public class Service {
    static User getUser(int userId) { ... }
    static Profile getProfile(User user) { ... }
    static AccessRight getAccessRight(Profile profile) { ... }
}

Dann könnte beispielsweise die folgende asynchrone lineare Verarbeitungskette (“Pipeline”) mit dem Push-API im “fluent”-Stil programmiert werden:

CompletableFuture<Void> cf = CompletableFuture
    .supplyAsync(() -> Service.getUser(42))
    .thenApplyAsync((user) -> Service.getProfile(user))
    .thenApplyAsync((profile) -> Service.getAccessRight(profile))
    .thenAcceptAsync((access) -> System.out.println(access));

cf.join(); // wartet auf das Ende der Berechnung von cf,
           // hier also auf die erfolgte Ausgabe

Diese Methodenaufrufe asynchron zu machen, ist speziell in verteilten Systemen sinnvoll, da hier die Netzwerkkommunikation immer zu Verzögerungen führen kann. Passiert die Netzwerkkommunikation asynchron, also im Hintergrund, wird der sonstige Ablauf des Programms (insb. die Bedienung der grafischen Benutzungsschnittstelle) nicht blockiert. Das Programm bleibt auch bei Netzwerkaussetzern “reaktiv”.

8.3.4 Beispiel für eine asynchrone Verarbeitungskette

public class Service {
    public static User getUser(int userId) { ... }
    public static Profile getProfile(User user) { ... }
    public static AccessRight getAccessRight(Profile profile) { ... }
}

Im Folgenden werden für das obige Beispiel die Typen von Input und Output der Lambda-Ausdrücke deutlich gemacht. Achtung: Das ist keine korrekte Java-Notation, sondern ist nur zur Verdeutlichung gedacht.

CompletableFuture<Void> cf = CompletableFuture
    .supplyAsync(() -> T)
    .thenApplyAsync((T t) -> U)
    .thenApplyAsync((U u) -> V)
    .thenAcceptAsync((V v) -> Void); // void (kleingeschrieben)
      // ist in Java kein Typ, sondern ein Schlüsselwort in Signaturen
cf.join(); 

8.3.5 “nicht-asynchrone” Verarbeitungsschritte

Die asynchronen Methoden des Push-API’s haben immer den Namen then*Async (statt * z.B. Apply, Accept, Run, Combine, Compose). Es gibt sie dank Überladung jeweils in zwei Versionen:

  • Bei der einen wird ein Threadpool in Form eines Executor übergeben, in dem der Task ausgeführt wird, wenn die vorige CompletionStage zu einem Ergebnis gekommen ist.
  • Bei der anderen Version wird immer der commonPool verwendet, um die Tasks abzuarbeiten.

Zu jeder dieser asynchronen Methodenpaare gibt es noch eine “nicht-asynchrone” Variante. Sie unterscheidet sich von ihren asynchronen Verwandten durch die Benennung: Dem Methodennamen fehlt immer das Async am Ende:

Methode Task-Typ Resultat
thenRun Runnable: () -> void CompletableFuture<Void>
thenAccept Consumer: (T) -> void CompletableFuture<Void>
thenApply Function: (T) -> U CompletableFuture<U>

Die “nicht-asynchrone” Variante heißt nicht “synchron”, da das Verhalten (bzw. der Thread, in dem der dazugehörende Task ausgeführt wird) etwas vielfältiger ist:

  • Falls der “vorige” CompletableFuture-Task schon fertig ist, wird der nächste Task im Aufrufer-Thread ausgeführt.
  • Falls der “vorige” CompletableFuture-Task noch nicht fertig ist, wird der nächste Task anschließend im Thread des vorigen Tasks ausgeführt.

Letztlich kann man sich zur Entwicklungszeit nicht darauf verlassen, in welchem Thread bzw. in welchem Threadpool ein “nicht-asynchron” übergebener Task ausgeführt wird:

“Actions supplied for dependent completions of non-async methods may be performed by the thread that completes the current CompletableFuture, or by any other caller of a completion method.”4

8.3.6 Split-Pattern

An einer CompletionStage können mit dem Push-API beliebig viele folgende CompletionStage-Objekte und ihre Tasks getriggert werden. Das heißt nicht, dass die damit verbundenen Tasks auch augenblicklich beim Eintreten des Ereignisses begonnen werden, sondern nur, dass sie augenblicklich im jeweils vorgesehenen Threadpool (z.B. dem commonPool) “submitted” werden.

Die folgende Situation ist ein Beispiel dafür:

var task1 = CompletableFuture.supplyAsync(() -> "4711");
var task2 = task1.thenApplyAsync((in) -> in.length());
var task3 = task1.thenApplyAsync((in) -> in.equals("42"));
  • T: String
    • '4711'
  • U: Integer
    • 4
  • V: Boolean
    • false

Hettel und Tran (2016, 252)

Hettel und Tran (2016, 252)

Hier wird im CompletableFuture<String> task1 der Task den String '4711' zu “berechnen” repräsentiert. An dieses CompletableFuture werden zwei weitere CompletionStages gehangen:

  • Der Lambda-Ausdruck (in) -> in.length() wird hier zu einem Objekt vom Typ Function<String, Integer>: Der Eingangsparameter in die Methode ist vom Typ String und das Ergebnis ist vom Typ Integer (es findet eine automatische Umwandlung vom eigentlichen Resultattyp int zum Wrapper-Typ Integer statt).
  • Der Lambda-Ausdruck (in) -> in.equals("42") wird hier zu einem Objekt vom Typ Function<String, Boolean>: Der Eingangsparameter in die Methode ist vom Typ String und das Ergebnis ist vom Typ Boolean (es findet eine automatische Umwandlung vom eigentlichen Resultattyp boolean zum Wrapper-Typ Boolean statt).

Sobald das Resultat von task1 vorliegt (Typ String) werden die beiden Tasks der Folge-CompletionStage-Instanzen “submitted”. Sie können parallel berechnet werden.

8.3.7 Vereinen von Verarbeitungsketten

Wenn zwei Tasks den Input für einen dritten Task liefern oder es eine Kontroll-logische Abhängigkeit zwischen ihnen gibt, können Methoden zum Vereinen von Verabreitungsketten, die das Push-API anbietet, verwendet werden.

Sie werden an einem CompletionStage-Objekt aufgerufen und haben ein zweites CompletionStage-Objekt als ersten Parameter. Die Verarbeitungsketten dieser beiden CompletionStage-Objekte werden dadurch vereinigt (“Task1” und “Task2”). Der zweite Parameter (BiFunction, BiConsumer, Runnable, Function oder Consumer) ist der Task, der zum Vereinen verwendet wird (“Task3”).

Hettel und Tran (2016, 252)

Hettel und Tran (2016, 252)

Methoden zur Vereinigung von Abläufen. “CF” steht für CompletableFuture und “CS” für CompletionStage

Man kann mit diesen Methoden des Push-API jeweils zwei Verarbeitungsketten zu einem CompletableFuture vereinen. Bei den Methoden thenCombine, thenAcceptBoth und runAfterBoth müssen die Resultate beider vorangehenden CompletionStages vorliegen. Bei den Methoden mit Either im Namen, braucht nur eine der beiden vorangehenden CompletionStages bereit sein. Die andere Kette wird dann verworfen.

Diese Methoden gibt es immer in zwei Varianten: Je nachdem, ob der Task zum Vereinen asynchron oder nicht-asynchron ausgeführt werden soll, gibt es die beiden Varianten mit und ohne Async am Ende des Methodennamens.

8.3.8 Zusammenführen durch thenCombine (Verrechnen)

Bei thenCombine werden zwei vorangehende CompletionStages kombiniert, indem deren Resultate mit einer BiFunction<T, U, V> “zusammengerechnet” werden. BiFunction ist ein Functional Interface aus dem Package java.util.function. Die Single Abstract Method dieses Interfaces hat zwei Parameter: Der erste ist vom Typ T, der zweite vom Typ U (Typ-Parameter entsprechend der Spezifikation BiFunction<T, U, V>). Der Rückgabewert der Methode ist dann vom Typ V.

Je nachdem, ob der Task zum Kombinieren asynchron oder nicht-asynchron ausgeführt werden soll, gibt es die beiden Varianten thenCombine und thenCombineAsync.

Man kann also leicht einen Lambda-Ausdruck formulieren, der die Ergebnisse der beiden vorangegangenen Tasks in einem dritten Task zusammenführt. Die folgende Situation ist ein Beispiel dafür:

var task1 = CompletableFuture.supplyAsync(() -> 47);
var task2 = CompletableFuture.supplyAsync(() -> "11");
var task3 = task1.thenCombineAsync(task2, (n, s)->String.valueOf(n)+s);
  • T: Integer
    • 47
  • U: String
    • '11'
  • V: String
    • '4711'

Hettel und Tran (2016, 253)

Hettel und Tran (2016, 253)

Hier wird im CompletableFuture<Integer> task1 der Task das Integer 47 zu “berechnen” repräsentiert. An diesem CompletableFuture wird thenCombine aufgerufen. Diese Methode hat als ersten Parameter das zweite CompletableFuture für Task2. Diese beiden CompletionStages werden durch Task3 asynchron kombiniert. Hier wird dazu der Lambda-Ausdruck (n, s) -> String.valueOf(n) + s genutzt, der aus dem ersten Parameter (ein Integer) eine String-Repräsentation macht. Der resultierende String wird mit dem zweiten Parameter (wieder ein String) konkateniert.

task3 kann erst asynchron begonnen werden, wenn die beiden Ergebnisse der vorangehenden CompletionStages vorliegen, da sie ja kombiniert werden sollen.

8.3.9 Zusammenführen durch applyToEither (“ODER”)

Bei applyToEither bzw. applyToEitherAsync werden zwei Verarbeitungsketten über die entsprechenden CompletionStages zusammengeführt, indem das Resultat, das zuerst vorliegt, mit einer Function<T, U> asynchron oder nicht-asynchron weiterverarbeitet wird. Der Rückgabewert der Methode ist dann vom Typ U. Je nachdem, ob der Task zum Weiterverarbeiten asynchron oder nicht-asynchron ausgeführt werden soll, gibt es die beiden Varianten applyToEitherAsync und applyToEither.

Man kann also leicht einen Lambda-Ausdruck formulieren, der das Ergebnis des Tasks, der zuerst fertig ist, in einem dritten Task weiterverarbeitet. Die folgende Situation ist ein Beispiel dafür:

var task1 = CompletableFuture.supplyAsync(() -> 47);
var task2 = CompletableFuture.supplyAsync(() -> 11);
var task3 = task1.applyToEitherAsync(task2, (n) -> n > 20);
  • T: Integer
    • 47
  • U: Integer
    • 11
  • V: Boolean
    • true oder false

Hettel und Tran (2016, 255)

Hettel und Tran (2016, 255)

Hier wird im CompletableFuture<Integer> task1 der Task, das Integer 47 zu “berechnen”, repräsentiert. An diesem CompletableFuture wird applyToEitherAsync aufgerufen. Diese Methode hat als ersten Parameter das zweite CompletableFuture für Task2. Diese beiden CompletionStages werden durch Task3 asynchron vereint. Das Ergebnis ist enweder true oder false – je nachdem, welcher vorangegangene Task zuerst berechnet ist.

task3 kann erst asynchron begonnen werden, wenn mindestens ein Ergebnis der vorangehenden CompletionStages vorliegt.

8.3.10 Barrieren und Zusammenführen

Im Start-API des CompletableFuture-Frameworks gibt es noch einige Methoden, die mit “Barrieren” arbeiten. allOf und anyOf haben beide eine variable Parameterliste. Sie erwarten eine beliebige Anzahl von Tasks in der Form von CompletableFuture-Objekten:

  • allOf benötigt irgendein CompletableFuture<?>, also z.B. mit runAsync gestartete Runnable-Tasks. Das Ergebnis von allOf ist nämlich ein CompletableFuture<Void>, d.h. die Ergebnisse der Tasks können nicht weiterverwendet werden.
  • anyOf benötigt CompletableFuture<? extends T>, also z.B. mit supplyAsync gestartete Callable<? extends T>-Tasks. Das Ergebnis von anyOf ist dann ein CompletableFuture<T>.

Hettel und Tran (2016, 257) (modifiziert)

Hettel und Tran (2016, 257) (modifiziert)

Die Semantik von der Start-API-Methoden allOf und anyOf, die sie zu “Barrieren” macht, ist, dass das CompletableFuture, das Ergebnis von allOf ist, seine nächste CompletionStage erst startet (oder bei getoder join entblockiert), wenn alle Tasks der übergebenen CompletableFuture-Objekte fertig berechnet sind. Ergebnisse der Tasks könnten nicht weiterverarbeitet werden. Allerdings funktioniert allOf mit allen CompletableFuture<?>-Objekten als Parameter.

anyOf hingegen nimmt das Ergebnis des CompletableFuture, welches zuerst fertig ist, als Input der nächsten CompletionStage. Daher kann als Inhaltstyp des resultierenden CompletableFuture irgendein Obertyp der beteiligten Task-Resultate verwendet werden.

8.3.11 Barrieren und Zusammenführen

Es folgen zwei Beispiele für die Anwendung dieser “Barrieren”, bei denen jeweils drei Tasks übergeben werden:

CompletableFuture.allOf(     // alle müssen beendet werden
    CompletableFuture.runAsync( () -> { /*...*/ } ),
    CompletableFuture.runAsync( () -> { /*...*/ } ),
    CompletableFuture.runAsync( () -> { /*...*/ } )
).thenAccept((Void) -> System.out.println("done") );


CompletableFuture.anyOf(    // das frühest fertige
    CompletableFuture.supplyAsync( () -> { /*...*/ } ),
    CompletableFuture.supplyAsync( () -> { /*...*/ } ),
    CompletableFuture.supplyAsync( () -> { /*...*/ } )
).thenAccept((first) -> System.out.println(first));                                    

Man beachte: Je nach Parallelität des commonPool, bzw. der Executor-Threadpools der übergebenen CompletableFuture-Objekte, werden nicht alle Tasks tatsächlich gleichzeitig und parallel abgearbeitet.

Das folgende Programmfragment liefert 1 als result demzufolge auch nur auf einem System mit einer Parallelität des commonPool von mindestens 10.

static void sleep(int sek) {
  try {
    Thread.sleep(sek * 1000);
  } catch (InterruptedException e) {
    Thread.currentThread().interrupt();
  }
}
var result = CompletableFuture
  .anyOf(CompletableFuture.supplyAsync(() -> {
      sleep(10);
      return 10;
  }), CompletableFuture.supplyAsync(() -> {
      sleep(9);
      return 9;
  }), CompletableFuture.supplyAsync(() -> {
      sleep(8);
      return 8;
  }), CompletableFuture.supplyAsync(() -> {
      sleep(7);
      return 7;
  }), CompletableFuture.supplyAsync(() -> {
      sleep(6);
      return 6;
  }), CompletableFuture.supplyAsync(() -> {
      sleep(5);
      return 5;
  }), CompletableFuture.supplyAsync(() -> {
      sleep(4);
      return 4;
  }), CompletableFuture.supplyAsync(() -> {
      sleep(3);
      return 3;
  }), CompletableFuture.supplyAsync(() -> {
      sleep(2);
      return 2;
  }), CompletableFuture.supplyAsync(() -> {
      sleep(1);
      return 1;
  })).join();

Außerdem werden bei anyOf die Tasks, die nicht als erstes fertig berechnet wurden, nicht abgebrochen, obwohl sie nicht mehr relevant sind und ihr Ergebnis nicht mehr benötigt wird. Der Grund ist die Architektur des CompletableFuture-Frameworks. CompletableFuture-Objekte sind nur ein Wrapper zu Threads, die in einem Executor wie dem commonPool ausgeführt werden. Eine Steuerung dieser Threads erfolgt nicht aus dem CompletableFuture-Framework. Selbst eine Methode wie cancel() aus dem Pull/Poll-API-Teil steuert nicht den Thread, sondern wirkt nur als Trigger für folgende CompletionStages (und zwar mit einem vergleichbaren Effekt wie completeExecptionally(...)), wie im folgenden Beispiel verdeutlicht wird (sleep wie oben):

 /*
  * Nachweis, dass nicht mehr gebrauchte Tasks bei anyOf weiterlaufen
  */

var cf1 = CompletableFuture.runAsync(() -> {
    sleep(3);
});
var cf2 = CompletableFuture.runAsync(() -> {
    while (true) {
        sleep(1);
        System.out.println(Thread.currentThread().getName() + ": 2");
    }
});
var cf3 = CompletableFuture.runAsync(() -> {
    while (!Thread.interrupted()) {
        sleep(1);
        System.out.println(Thread.currentThread().getName() + ": 3");
    }
    System.out.println("cf3 end");
});

 /*
  * Darauf warten, dass cf1 nach 3 Sek. fertig ist. cf2 und cf3 laufen
  * weiter, obwohl sie nicht mehr gebraucht werden...
  * 
  * Auch mit .cancel() kann man sie nicht beenden (triggert nur
  * Exception-Kette, falls gepusht).
  */
CompletableFuture.anyOf(cf1, cf2, cf3).join();
System.out.println("cf1 ist fertig");

 /*
  * abwarten, ob sonst noch etwas passiert...
  */
sleep(5);

Unter der Annahme, dass die Parallelität des commonPool mindestens 3 ist, passiert hier das Folgende: cf1, cf2 und cf3 werden an den commonPool submitted. cf1 wartet 3 Sekunden und ist dann fertig. cf2 und cf3 laufen weiter und geben jede Sekunde 2 bzw. 3 auf der Konsole aus. Obwohl cf1 fertig ist, laufen cf2 und cf3 weiter, wie man an den Konsolenausgaben in den verbleibenden 5 Sekunden des Aufrufer-Threads sehen kann. Auch wenn man mit cancel() versucht cf2 oder cf3 abzubrechen hat das trotzdem keinen Effekt auf die Threads, die diese Tasks abarbeiten, außer dass eine CancellationException die nächste CompletionStage triggert.

8.3.12 Fehlerbehandlung und Abbruch

Tasks können nicht nur getriggert werden, wenn ein Ergebnis für die vorangehende CompletionStage vorliegt, sondern auch wenn eine Exception geworfen wird. Die Ausnahme wird nicht sofort realisiert, sondern “für später aufgehoben”.

Wird für die CompletionStage, die die Ausnahme wirft, nicht direkt ein Exception-Handler vereinbart, wird das Throwable durch die Kette durchgereicht. Spätestens am Ende bei get oder join wird die Exception in Form einer (checked) ExecutionException (bei get) oder einer (unchecked) CompletionException (bei join) realsiert (s. Pull/Poll-API). Auch irgendwo vorher kann an einer CompletionStage mit handle (nicht-asynchron) oder handleAsync (asynchron) auf eine Fehlersituation reagiert werden.

var cf = CompletableFuture 
    .supplyAsync(() -> 42)
    .thenApplyAsync(r -> r / 0) 
    .thenApplyAsync(r -> r * r)
    .thenApplyAsync(r -> r > 0)
    .handle((r, th) -> {
        if (r != null) {
            System.out.println("Resultat: " + r);
            return r;
        } else {
            System.err.println("error: " + th);
            return false;
        }
    });
System.out.println(cf.join());

Hettel und Tran (2016, 258 (modifiziert))

Hettel und Tran (2016, 258 (modifiziert))

Durch die Division durch Null wird eine Exception provoziert. Die folgenden CompletionStages werden übersprungen und es wird direkt nicht-asynchron die BiFunction<T, Throwable, U> getriggert.

Diese Funktion bekommt zwei Parameter als Input. Es ist aber nur entweder der eine oder der andere gesetzt. Das ist entweder der Typ T des regulären Resultats der vorangehenden CompletionStage<T> (im gezeigten Beispiel Integer) oder ein Throwable, das eine in irgendeiner vorangegangenen CompletionStage aufgetretenen Exception repräsentiert. Das Ergebnis dieser Funktion kann von einem anderen Typ U (im gezeigten Beispiel Boolean) sein.

8.3.13 Fehlerbehandlung und Abbruch

Wenn als Ergebnis kein Rückgabewert benötigt wird, gibt es eine Alternative mit einem BiConsumer statt einer BiFunction. Die Methode, um solch einen Handler zu installieren, heißt whenComplete (nicht-asynchron) bzw. whenCompleteAsync (asynchron).

var cf = CompletableFuture 
    .supplyAsync(() -> 42)
    .thenApplyAsync(r -> r / 0) 
    .thenApplyAsync(r -> r * r)
    .thenApplyAsync(r -> r > 0)
    .whenComplete((r, th) -> {
        if (r == null) {
            System.err.println("error: " + th);
        }
    });
cf.join();

Hettel und Tran (2016, 258 (modifiziert))

Hettel und Tran (2016, 258 (modifiziert))

Durch die Division durch Null wird wieder eine Exception provoziert. Die folgenden CompletionStages werden übersprungen und es wird direkt nicht-asynchron der BiConsumer<T, Throwable> getriggert.

Diese Funktion bekommt zwei Parameter als Input. Es ist aber nur entweder der eine oder der andere gesetzt. Das ist entweder der Typ T des regulären Resultats der vorangehenden CompletionStage<T> (im gezeigten Beispiel Integer) oder ein Throwable, das eine in irgendeiner vorangegangenen CompletionStage aufgetretenen Exception repräsentiert. Der BiConsumer liefert kein Ergebnis zurück. Dementsprechend führt cf.join() zu einer “zusätzlichen” (unchecked) CompletionException.