8 CompletableFuture Framework
8.1 Monte-Carlo-Berechnung von \(\pi\)
Wie \(\pi\) hier berechnet wird, ist nicht prüfungsrelevant.
8.1.1 Exkurs: Monte-Carlo-Algorithmen
Auch dieser Exkurs ist nicht prüfungsrelevant. Monte-Carlo-Algorithmen sind randomisierte Algorithmen, die durch fortgesetzte Wahl von Zufallszahlen versuchen, ein Ergebnis anzunähern. Es gibt bei ihnen aber keine Garantie für die Korrektheit des Ergebnisses. Das genau unterscheidet Monte-Carlo-Algorithmen von Las-Vegas-Algorithmen. Letztere kommen immer garantiert zum korrekten Ergebnis, verwenden die Zufallskomponente aber dazu, den Weg mglw. abzukürzen.
8.1.2 Monte-Carlo-Berechnung von \(\pi\)
Ein bekannter Monte-Carlo-Algorithmus zur Annäherung von \(\pi\) basiert darauf, dass zufällig Punkte (x y) im Koordinatenbereich von (0, 0) bis (1, 1) gezogen werden. Für jeden gezogenen Punkt wird vermerkt, ob er innerhalb eines Einheitsviertelkreises (Radius=1) liegt. Durch Anwendung des Satzes des Pythagoras, kann dies geprüft werden.
Von zufällig gewählten Punkten mit Koordinaten ([0..1], [0..1]) kann man leicht feststellen, ob Sie genau auf, innerhalb oder außerhalb des Einheitsviertelkreis liegen:
Die Koordinaten eines zufällig gewählten Punkts sind \((x, y)\) mit
\[0 \le x \le 1\] und \[0 \le y \le 1\]
Der Punkt liegt aufgrund des Satzes von Pythagoras genau auf dem Kreisbogen, wenn Folgendes gilt (in der obigen Abbildung links): \[\sqrt{x^2 + y ^2} = 1\]
Der Punkt liegt aufgrund des Satzes von Pythagoras innerhalb des Kreises, wenn Folgendes gilt (in der obigen Abbildung in der Mitte): \[\sqrt{x^2 + y ^2} < 1\]
Der Punkt liegt aufgrund des Satzes von Pythagoras außerhalb des Kreises, wenn Folgendes gilt (in der obigen Abbildung rechts): \[\sqrt{x^2 + y ^2} > 1\]
Werden sehr viele Punkte gezogen, die sich innerhalb der Koordinaten (0, 0) und (1, 1) gleichmäßig verteilen, sollte sich für das Verhältnis der Anzahl der Punkte innerhalb des Kreises (einschließlich auf dem Kreisbogen) zu allen innerhalb des Quadrats das Verhältnis der Fläche des Viertelkreises (Radius 1) zur Fläche des Quadrats (Kantenlänge 1) ergeben.
\[\frac{\mbox{Anzahl der Punkte innerhalb und auf dem Kreis}}{\mbox{Anzahl aller Punkte}} = \frac{\frac{\pi r^2}{4}}{r^2}\]
Da \(r=1\) kann \(\pi\) angenähert werden als pi = 4.0 * in / (in + out), wobei in die Anzahl der Punkte innerhalb des Kreises ist (sqrt(x*x + y*y) <= 1) und out die restlichen, die außerhalb des Kreises liegen.
Falls die zufällig gezogenen Punkte gleichmäßig über die Fläche verteilt sind, nähert sich pi der Konstante \(\pi\)1 an. Es liegt in der Natur der Sache von Monte-Carlo-Algorithmen, dass dies nicht in jedem Fall zu einem korrekten Ergebnis führt.
Je mehr Punkte gezogen werden, desto eher sollte im Mittel der Wert von \(\pi\) angenähert werden. Es ist also sinnvoll, so viele Punkte wie möglich zu ziehen.
8.2 Überblick und Grundlagen von “CompletableFuture”
8.2.1 Zweck und grundlegende Struktur von “CompletableFuture”
- Pipelining: funktionale Verkettung mehrerer
Future/Callable- Output eines
Callablewird Input für nächstesCallable - verwendet Regeln, um zu entscheiden, wo nächstes
Callableausgeführt wird:- im vorigen Thread
- im Aufrufer-Thread oder
- einem neuen Thread
- Output eines
- Verwendung des
commonPool - asynchrone Methodenaufrufe
- Standard für verteilte Architekturen
- reaktive Programmierung / reaktive Architektur \(\to\) “hängt” nicht, (vertikal) skalierbar
CompletableFuturewirkt wie ein Versprechen, dass zukünftig ein Ergebnis vorliegt (“Promise”)
Future: read-only Container für zukünftiges ErgebnisCompletionStage: triggert weitere Verarbeitung
8.2.1.1 Exkurs: mit CompletableFuture vergleichbare Konzepte
(nicht prüfungsrelevant)
- JavaScript: Promise (
.then(onFulfillment).catch(onRejection)) - Clojure:
promise/deliver/deref - Guava (populäre Java-Library von Google):
SettableFuture(InterfaceListenableFuture) async/awaitin C#, Dart, JavaScript, Scala, Rust
8.2.2 CompletionStage (Push-API)
- bietet eine Reihe von Dreier-Paaren:
- “nicht-asynchrone” Ausführung
- asynchrone Ausführung in
commonPool - asynchrone Ausführung mit eigenem
ExecutorstattcommonPool
- Ergebnis ist immer eine neue
CompletionStage
Consumer, Function, BiFunction sind Functional Interfaces (“single abstract method Interfaces”), die es seit Java 8 im Zuge der Einführung von Lambda-Ausdrücken im Package java.util.function gibt:
8.2.3 Überblick über das API von CompletableFuture
Das Application Programming Interface (API), also die Benutzungsschnittstelle von CompletableFuture, kann neben dem bereits vorgestellten Push-API in die folgenden weiteren Bereiche gegliedert werden:
8.2.3.1 Pull/Poll-API
Einige der im Vergleich zu Future bei Pull/Poll aus dem Bereich der Steuerung der asynchronen Verarbeitung hinzugekommene Methoden sind:
T join(): Liefert das Ergebnis der asynchronen Verarbeitung, wirft aber im Gegensatz zuget()im Fehlerfall eine unchecked Exception: Sowohlget()als auchjoin()blockieren, bis das Ergebnis vorliegt. Beide werfen im Abbruchfall (Aufruf voncancel()) eineCancellationException. Im Fehlerfall wirft …get()eine (checked)ExecutionException, die also auf jeden Fall mit einemtry/catchbehandelt oder mitthrowsdeklariert werden muss (“if this future completed exceptionally”).join()eine (unchecked)CompletionException, die also nicht unbedingt mittry/catchbehandelt werden muss (“if this future completed exceptionally or a completion computation threw an exception”).
T getNow(T valueIfAbsent): Fragt nach dem Ergebnis. Ist (noch) keines vorhanden, wirdvalueIfAbsentzurückgegeben. Auch diese Methode wirft im Fehlerfall ebenfallsCancellationExceptionoderCompletionException.boolean isCompletedExceptionally(): Lieferttrue, falls während der Verarbeitung eine Ausnahme aufgetreten ist.
8.2.4 internes API
Mit den Methoden complete und completeExecptionally des internen API von CompletableFuture kann man “low level” ein CompletableFuture befüllen. Im folgenden Beispiel wird in der Methodde calculateAsync zuerst ein “leeres” CompletableFutre-Objekt für den Typ-Parameter Integer erzeugt.
Danach wird es asynchron befüllt, indem ein Task (vom Typ Runnable) an den commonPool “submitted” wird. In einem Thread des commonPool wird asynchron zum Aufrifer-Thread (hier der main-Thread) ein Ergebnis berechnet (im Beispiel ist das einfach 42) und dem bereits erzeugten CompletableFuture als Ergebnis mitgeteilt. Dies könnte an dem CompletableFuture eine Kette weiterer Verarbeitungsschritte triggern.
static CompletableFuture<Integer> calculateAsync() {
var result = new CompletableFuture<Integer>();
ForkJoinPool.commonPool().submit(() -> {
try {
var res = /* aufwändige Berechnung, Ergebnis z.B. */ 42;
result.complete(res);
} catch (Exception ex) {
result.completeExceptionally(ex);
}
});
return result;
}
public static void main(String... args) {
var cf = calculateAsync();
try {
System.out.println(cf.get());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
System.err.print(e);
}
}Sollte es aber eine Exception bei der Berechnung von res geben, kann sie dadurch “für später aufgehoben” werden, indem sie mit completeExceptionally dem CompletableFuture anstatt des Ergebnisse mitgeteilt wird.
8.2.5 Start-API
Normalerweise wird man eine CompletableFuture-Verarbeitungskette aber nicht durch Instantiieren eines “leeren” CompletableFuture-Objekts und darauffolgende Befüllung über das interne API starten wie im vorigen Beispiel, sondern Methoden des Start-API zum Starten einer CompletableFuture-Kette nutzen. Im folgenden Beispiel wird dazu supplyAsync verwendet:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;
public class SimpleCompletableFuture {
static class Task implements Supplier<Integer> {
@Override
public Integer get() {
return /* aufwändige Berechnung, Ergebnis z.B. */ 42;
}
}
public static void main(String... args)
throws InterruptedException, ExecutionException {
var future = CompletableFuture.supplyAsync(new Task());
System.out.println(future.get());
}
}CompletableFuture hat eine Reihe Klassenmethoden im “Start-API”, die jeweils bestimmte Single Abstract Method bzw. Functional Interfaces unterstützen. Im obigen Beispiel wird supplyAsync zum Start einer Verarbeitungskette benutzt. Diese Klassenmethode erwatret ein Objekt, das das Supplier<T>-Interface implementiert. Das Resultat ist dann ein CompletableFuture<T>, an dem weitere (asynchrone) Verabreitungsschritte verkettet werden können. Liegt ein Ergebnis vom Aufruf der Single Abstract Method get des Supplier-Objekts vor, kann dies in der Verarbeitungskette weitere Methoden triggern, die das Ergebnis von get, das asynchron im commonPool berechnet wurde, als Input nehmen.
8.2.6 Start-API
In der Praxis würde man das aber eher als Lambda-Ausdruck notieren (hier mit { … }, damit es näher am Code des vorigen Beispiels ist – stattdessen ganz kurz: () -> 42).
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class SimpleCompletableFuture {
public static void main(String... args)
throws InterruptedException, ExecutionException {
var future = CompletableFuture.supplyAsync(() -> {
return /* aufwändige Berechnung, Ergebnis z.B. */ 42;
});
System.out.println(future.get());
}
}Mit supplyAsync wird ein Callable asynchron berechnet. Sobald das Ergebnis im CompltableFuture vorliegt, kann eine über das Push-API vereinbarte Folgefunktion getriggert werden, die das Ergebnis des Suppliers (Ergebnis des Methodenaufrufs get des Supplier-Objekts) als Eingangsparameter erhält.
Auch wenn es wegen der bisherigen Betonung des Push-API nicht sinnvoll erscheinen mag: Man kann ein CompletableFuture auch mit einem Runnable statt eines Callable starten. Die Methode dafür heißt runAsync. Die Verwendung von Runnable statt Callable bedeutet, dass es kein Ergebnis gibt. Lediglich die Information, dass das Ende der run-Methode erreicht ist, und ggf. “für später aufgehobene” Exceptions können im CompletableFuture repräsentiert werden. Statt die Berechnung der nächsten CompletionStage zu triggern gibt es aber noch andere Verwendungsmöglichkeiten, die später vorgestellt werden.
Der Typ-Parameter für das CompletableFuture ist im Fall von runAsync Void (in Anlehnung an das Schlüsselword void, das anzeigt, dass eine Methode keinen Rückgabewert hat). Bei supplyAsync entspricht der Typ-Parameter des CompletableFuture dem Typ-Parameter des Callable, also dem Ergebnis-Typ von get.
Im Gegensatz zu den Dreierpaaren des CompletionStage-Interface (Push-API) gibt es bei deisen beiden Methoden des Start-API nur jeweils zwei Varianten. Die Start-Berechnung des CompletableFuture kann nämlich entweder im commonPool oder einem anderen Threadpool (Executor-Interface) erfolgen. Die Signaturen der vier sich ergebene Methoden sind:
8.3 Push-API von CompletableFuture: lineare Ketten und Verzweigungen
8.3.1 asynchrone Verarbeitungskette
Die Namen der Methoden des Push-API’s beginnen in der Regel mit then. Mit ihnen wird dem CompletableFuture-Objekt, an dem sie aufgerufen werden, jeweils ein Trigger hinzugefügt. Wenn die Bedingung erfüllt ist (in der Regel ist dies, dass das Ergebnis fertig berechnet ist), wird eine weitere Methode asynchron zu dem Thread, der den Trigger durch den ``.then”-Aufruf installiert hatte, ausgeführt.
Bei vielen der then-Methoden ist das Ergebnis der vorigen CompletionStage der Input für die getriggerte Methode. Es gibt aber auch den Fall, dass bei thenRun ein Runnable gestartet wird, dessen run-Methode keinen Input-Parameter hat.
thenApplyAsync:fist eine Funktion, die einen Parameter vom TypT(oder “allgemeiner”) erwartet und ein Ergebnis vom TypU(oder “spezifischer”) hat.Functionist ein “Functional Interface” aus dem Packagejava.util.function2
thenConsumeAsync:actionist eine Funktion, die einen Parameter vom Typ ``T (oder “allgemeiner”) erwartet und kein Ergebnis hat.Consumerist ein “Functional Interface” aus dem Packagejava.util.function3
Für jedes thenApplyAsync und theAcceptAsync wird ggf. ein eigener Thread aus dem Threadpool benutzt.
Der Sinn von CompletableFutures und dessen Push-API ist, dass man auf elegante Weise lange Ketten asynchroner Aufrufe in einem “fluent”-Stil programmieren kann:
Das ist die asynchrone Variante der folgenden sequenziellen Verkettung:
Bzw. wenn das API funktional und nicht objekt-orientiert gestaltet wäre:
8.3.2 asynchrone Verarbeitungskette
Da Java eine statisch getypte Sprache ist und mit den Generics ein umfassendes Typ-System zur Verfügung steht, liegt es in der Natur der Sache, dass die asynchronen Verarbeitungsketten, die mit dem CompletableFuture-Framework und seinem Push-API aufgebaut werden können, typisiert sind.
In der folgenden Grafik wird die Typisierung an einem Beispiel demonstriert (der Typ CompletableFuture wird durch CF abgekürzt):
Task1soll ein Ergebnis vom TypTliefern.- Mit
thenApplyAsyncwird eineFunction(z.B. in Form eines Lambda-Ausdrucks) übergeben, deren Single Abstract Method einen Eingangsparameter vom TypThat und als Ergebnis den TypUliefert: Das istTask2.- Das Ergebnis des Aufrufs von
thenApplyAsyncist einCompletableFuture<U>.
- Das Ergebnis des Aufrufs von
- Wenn das Ergebnis dieser Funktion vorliegt, soll das die Berechnung von
Task3triggern. Beim Methodenaufruf vonthenApplyAsyncwurde dementsprechend wieder ein Objekt vom TypFunctionübergeben. Diesmal ist die Signatur der Single Abstract Method dieses Objekts aber so, dass ein Eingangsparameter vom TypUbenötigt wird (dem Typ des Ergebnisses der vorigenCompletionStage) und das Resultat ist vom TypV.- Das Ergebnis des Aufrufs von
theApplyAsyncist einCompletableFuture<V>.
- Das Ergebnis des Aufrufs von
8.3.3 Beispiel für eine asynchrone Verarbeitungskette
Das Prinzip der asynchronen linearen Verkettung mit dem Push-API soll hier an einem Beispiel deutlich gemacht werden. Dazu nehmen wir an, es gäbe ein Interface namens Service, in dem mindestens die folgenden drei Methoden mit diesen Signaturen definiert wären. Außerdem soll es die drei Typen User, Profile und AccessRight geben, die in diesen Signaturen benutzt werden.
Dann könnte beispielsweise die folgende asynchrone lineare Verarbeitungskette (“Pipeline”) mit dem Push-API im “fluent”-Stil programmiert werden:
CompletableFuture<Void> cf = CompletableFuture
.supplyAsync(() -> Service.getUser(42))
.thenApplyAsync((user) -> Service.getProfile(user))
.thenApplyAsync((profile) -> Service.getAccessRight(profile))
.thenAcceptAsync((access) -> System.out.println(access));
cf.join(); // wartet auf das Ende der Berechnung von cf,
// hier also auf die erfolgte AusgabeDiese Methodenaufrufe asynchron zu machen, ist speziell in verteilten Systemen sinnvoll, da hier die Netzwerkkommunikation immer zu Verzögerungen führen kann. Passiert die Netzwerkkommunikation asynchron, also im Hintergrund, wird der sonstige Ablauf des Programms (insb. die Bedienung der grafischen Benutzungsschnittstelle) nicht blockiert. Das Programm bleibt auch bei Netzwerkaussetzern “reaktiv”.
8.3.4 Beispiel für eine asynchrone Verarbeitungskette
Im Folgenden werden für das obige Beispiel die Typen von Input und Output der Lambda-Ausdrücke deutlich gemacht. Achtung: Das ist keine korrekte Java-Notation, sondern ist nur zur Verdeutlichung gedacht.
8.3.5 “nicht-asynchrone” Verarbeitungsschritte
Die asynchronen Methoden des Push-API’s haben immer den Namen then*Async (statt * z.B. Apply, Accept, Run, Combine, Compose). Es gibt sie dank Überladung jeweils in zwei Versionen:
- Bei der einen wird ein Threadpool in Form eines
Executorübergeben, in dem der Task ausgeführt wird, wenn die vorigeCompletionStagezu einem Ergebnis gekommen ist. - Bei der anderen Version wird immer der
commonPoolverwendet, um die Tasks abzuarbeiten.
Zu jeder dieser asynchronen Methodenpaare gibt es noch eine “nicht-asynchrone” Variante. Sie unterscheidet sich von ihren asynchronen Verwandten durch die Benennung: Dem Methodennamen fehlt immer das Async am Ende:
| Methode | Task-Typ | Resultat |
|---|---|---|
thenRun |
Runnable: () -> void |
CompletableFuture<Void> |
thenAccept |
Consumer: (T) -> void |
CompletableFuture<Void> |
thenApply |
Function: (T) -> U |
CompletableFuture<U> |
Die “nicht-asynchrone” Variante heißt nicht “synchron”, da das Verhalten (bzw. der Thread, in dem der dazugehörende Task ausgeführt wird) etwas vielfältiger ist:
- Falls der “vorige”
CompletableFuture-Task schon fertig ist, wird der nächste Task im Aufrufer-Thread ausgeführt. - Falls der “vorige”
CompletableFuture-Task noch nicht fertig ist, wird der nächste Task anschließend im Thread des vorigen Tasks ausgeführt.
Letztlich kann man sich zur Entwicklungszeit nicht darauf verlassen, in welchem Thread bzw. in welchem Threadpool ein “nicht-asynchron” übergebener Task ausgeführt wird:
“Actions supplied for dependent completions of non-async methods may be performed by the thread that completes the current CompletableFuture, or by any other caller of a completion method.”4
8.3.6 Split-Pattern
An einer CompletionStage können mit dem Push-API beliebig viele folgende CompletionStage-Objekte und ihre Tasks getriggert werden. Das heißt nicht, dass die damit verbundenen Tasks auch augenblicklich beim Eintreten des Ereignisses begonnen werden, sondern nur, dass sie augenblicklich im jeweils vorgesehenen Threadpool (z.B. dem commonPool) “submitted” werden.
Die folgende Situation ist ein Beispiel dafür:
T:String'4711'
U:Integer4
V:Booleanfalse
Hier wird im CompletableFuture<String> task1 der Task den String '4711' zu “berechnen” repräsentiert. An dieses CompletableFuture werden zwei weitere CompletionStages gehangen:
- Der Lambda-Ausdruck
(in) -> in.length()wird hier zu einem Objekt vom TypFunction<String, Integer>: Der Eingangsparameter in die Methode ist vom TypStringund das Ergebnis ist vom TypInteger(es findet eine automatische Umwandlung vom eigentlichen Resultattypintzum Wrapper-TypIntegerstatt). - Der Lambda-Ausdruck
(in) -> in.equals("42")wird hier zu einem Objekt vom TypFunction<String, Boolean>: Der Eingangsparameter in die Methode ist vom TypStringund das Ergebnis ist vom TypBoolean(es findet eine automatische Umwandlung vom eigentlichen Resultattypbooleanzum Wrapper-TypBooleanstatt).
Sobald das Resultat von task1 vorliegt (Typ String) werden die beiden Tasks der Folge-CompletionStage-Instanzen “submitted”. Sie können parallel berechnet werden.
8.3.7 Vereinen von Verarbeitungsketten
Wenn zwei Tasks den Input für einen dritten Task liefern oder es eine Kontroll-logische Abhängigkeit zwischen ihnen gibt, können Methoden zum Vereinen von Verabreitungsketten, die das Push-API anbietet, verwendet werden.
Sie werden an einem CompletionStage-Objekt aufgerufen und haben ein zweites CompletionStage-Objekt als ersten Parameter. Die Verarbeitungsketten dieser beiden CompletionStage-Objekte werden dadurch vereinigt (“Task1” und “Task2”). Der zweite Parameter (BiFunction, BiConsumer, Runnable, Function oder Consumer) ist der Task, der zum Vereinen verwendet wird (“Task3”).
Methoden zur Vereinigung von Abläufen. “CF” steht für CompletableFuture und “CS” für CompletionStage
Man kann mit diesen Methoden des Push-API jeweils zwei Verarbeitungsketten zu einem CompletableFuture vereinen. Bei den Methoden thenCombine, thenAcceptBoth und runAfterBoth müssen die Resultate beider vorangehenden CompletionStages vorliegen. Bei den Methoden mit Either im Namen, braucht nur eine der beiden vorangehenden CompletionStages bereit sein. Die andere Kette wird dann verworfen.
Diese Methoden gibt es immer in zwei Varianten: Je nachdem, ob der Task zum Vereinen asynchron oder nicht-asynchron ausgeführt werden soll, gibt es die beiden Varianten mit und ohne Async am Ende des Methodennamens.
8.3.8 Zusammenführen durch thenCombine (Verrechnen)
Bei thenCombine werden zwei vorangehende CompletionStages kombiniert, indem deren Resultate mit einer BiFunction<T, U, V> “zusammengerechnet” werden. BiFunction ist ein Functional Interface aus dem Package java.util.function. Die Single Abstract Method dieses Interfaces hat zwei Parameter: Der erste ist vom Typ T, der zweite vom Typ U (Typ-Parameter entsprechend der Spezifikation BiFunction<T, U, V>). Der Rückgabewert der Methode ist dann vom Typ V.
Je nachdem, ob der Task zum Kombinieren asynchron oder nicht-asynchron ausgeführt werden soll, gibt es die beiden Varianten thenCombine und thenCombineAsync.
Man kann also leicht einen Lambda-Ausdruck formulieren, der die Ergebnisse der beiden vorangegangenen Tasks in einem dritten Task zusammenführt. Die folgende Situation ist ein Beispiel dafür:
T:Integer47
U:String'11'
V:String'4711'
Hier wird im CompletableFuture<Integer> task1 der Task das Integer 47 zu “berechnen” repräsentiert. An diesem CompletableFuture wird thenCombine aufgerufen. Diese Methode hat als ersten Parameter das zweite CompletableFuture für Task2. Diese beiden CompletionStages werden durch Task3 asynchron kombiniert. Hier wird dazu der Lambda-Ausdruck (n, s) -> String.valueOf(n) + s genutzt, der aus dem ersten Parameter (ein Integer) eine String-Repräsentation macht. Der resultierende String wird mit dem zweiten Parameter (wieder ein String) konkateniert.
task3 kann erst asynchron begonnen werden, wenn die beiden Ergebnisse der vorangehenden CompletionStages vorliegen, da sie ja kombiniert werden sollen.
8.3.9 Zusammenführen durch applyToEither (“ODER”)
Bei applyToEither bzw. applyToEitherAsync werden zwei Verarbeitungsketten über die entsprechenden CompletionStages zusammengeführt, indem das Resultat, das zuerst vorliegt, mit einer Function<T, U> asynchron oder nicht-asynchron weiterverarbeitet wird. Der Rückgabewert der Methode ist dann vom Typ U. Je nachdem, ob der Task zum Weiterverarbeiten asynchron oder nicht-asynchron ausgeführt werden soll, gibt es die beiden Varianten applyToEitherAsync und applyToEither.
Man kann also leicht einen Lambda-Ausdruck formulieren, der das Ergebnis des Tasks, der zuerst fertig ist, in einem dritten Task weiterverarbeitet. Die folgende Situation ist ein Beispiel dafür:
T:Integer47
U:Integer11
V:Booleantrueoderfalse
Hier wird im CompletableFuture<Integer> task1 der Task, das Integer 47 zu “berechnen”, repräsentiert. An diesem CompletableFuture wird applyToEitherAsync aufgerufen. Diese Methode hat als ersten Parameter das zweite CompletableFuture für Task2. Diese beiden CompletionStages werden durch Task3 asynchron vereint. Das Ergebnis ist enweder true oder false – je nachdem, welcher vorangegangene Task zuerst berechnet ist.
task3 kann erst asynchron begonnen werden, wenn mindestens ein Ergebnis der vorangehenden CompletionStages vorliegt.
8.3.10 Barrieren und Zusammenführen
Im Start-API des CompletableFuture-Frameworks gibt es noch einige Methoden, die mit “Barrieren” arbeiten. allOf und anyOf haben beide eine variable Parameterliste. Sie erwarten eine beliebige Anzahl von Tasks in der Form von CompletableFuture-Objekten:
allOfbenötigt irgendeinCompletableFuture<?>, also z.B. mitrunAsyncgestarteteRunnable-Tasks. Das Ergebnis vonallOfist nämlich einCompletableFuture<Void>, d.h. die Ergebnisse der Tasks können nicht weiterverwendet werden.anyOfbenötigtCompletableFuture<? extends T>, also z.B. mitsupplyAsyncgestarteteCallable<? extends T>-Tasks. Das Ergebnis vonanyOfist dann einCompletableFuture<T>.
Die Semantik von der Start-API-Methoden allOf und anyOf, die sie zu “Barrieren” macht, ist, dass das CompletableFuture, das Ergebnis von allOf ist, seine nächste CompletionStage erst startet (oder bei getoder join entblockiert), wenn alle Tasks der übergebenen CompletableFuture-Objekte fertig berechnet sind. Ergebnisse der Tasks könnten nicht weiterverarbeitet werden. Allerdings funktioniert allOf mit allen CompletableFuture<?>-Objekten als Parameter.
anyOf hingegen nimmt das Ergebnis des CompletableFuture, welches zuerst fertig ist, als Input der nächsten CompletionStage. Daher kann als Inhaltstyp des resultierenden CompletableFuture irgendein Obertyp der beteiligten Task-Resultate verwendet werden.
8.3.11 Barrieren und Zusammenführen
Es folgen zwei Beispiele für die Anwendung dieser “Barrieren”, bei denen jeweils drei Tasks übergeben werden:
CompletableFuture.allOf( // alle müssen beendet werden
CompletableFuture.runAsync( () -> { /*...*/ } ),
CompletableFuture.runAsync( () -> { /*...*/ } ),
CompletableFuture.runAsync( () -> { /*...*/ } )
).thenAccept((Void) -> System.out.println("done") );
CompletableFuture.anyOf( // das frühest fertige
CompletableFuture.supplyAsync( () -> { /*...*/ } ),
CompletableFuture.supplyAsync( () -> { /*...*/ } ),
CompletableFuture.supplyAsync( () -> { /*...*/ } )
).thenAccept((first) -> System.out.println(first)); Man beachte: Je nach Parallelität des commonPool, bzw. der Executor-Threadpools der übergebenen CompletableFuture-Objekte, werden nicht alle Tasks tatsächlich gleichzeitig und parallel abgearbeitet.
Das folgende Programmfragment liefert 1 als result demzufolge auch nur auf einem System mit einer Parallelität des commonPool von mindestens 10.
static void sleep(int sek) {
try {
Thread.sleep(sek * 1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
var result = CompletableFuture
.anyOf(CompletableFuture.supplyAsync(() -> {
sleep(10);
return 10;
}), CompletableFuture.supplyAsync(() -> {
sleep(9);
return 9;
}), CompletableFuture.supplyAsync(() -> {
sleep(8);
return 8;
}), CompletableFuture.supplyAsync(() -> {
sleep(7);
return 7;
}), CompletableFuture.supplyAsync(() -> {
sleep(6);
return 6;
}), CompletableFuture.supplyAsync(() -> {
sleep(5);
return 5;
}), CompletableFuture.supplyAsync(() -> {
sleep(4);
return 4;
}), CompletableFuture.supplyAsync(() -> {
sleep(3);
return 3;
}), CompletableFuture.supplyAsync(() -> {
sleep(2);
return 2;
}), CompletableFuture.supplyAsync(() -> {
sleep(1);
return 1;
})).join();Außerdem werden bei anyOf die Tasks, die nicht als erstes fertig berechnet wurden, nicht abgebrochen, obwohl sie nicht mehr relevant sind und ihr Ergebnis nicht mehr benötigt wird. Der Grund ist die Architektur des CompletableFuture-Frameworks. CompletableFuture-Objekte sind nur ein Wrapper zu Threads, die in einem Executor wie dem commonPool ausgeführt werden. Eine Steuerung dieser Threads erfolgt nicht aus dem CompletableFuture-Framework. Selbst eine Methode wie cancel() aus dem Pull/Poll-API-Teil steuert nicht den Thread, sondern wirkt nur als Trigger für folgende CompletionStages (und zwar mit einem vergleichbaren Effekt wie completeExecptionally(...)), wie im folgenden Beispiel verdeutlicht wird (sleep wie oben):
/*
* Nachweis, dass nicht mehr gebrauchte Tasks bei anyOf weiterlaufen
*/
var cf1 = CompletableFuture.runAsync(() -> {
sleep(3);
});
var cf2 = CompletableFuture.runAsync(() -> {
while (true) {
sleep(1);
System.out.println(Thread.currentThread().getName() + ": 2");
}
});
var cf3 = CompletableFuture.runAsync(() -> {
while (!Thread.interrupted()) {
sleep(1);
System.out.println(Thread.currentThread().getName() + ": 3");
}
System.out.println("cf3 end");
});
/*
* Darauf warten, dass cf1 nach 3 Sek. fertig ist. cf2 und cf3 laufen
* weiter, obwohl sie nicht mehr gebraucht werden...
*
* Auch mit .cancel() kann man sie nicht beenden (triggert nur
* Exception-Kette, falls gepusht).
*/
CompletableFuture.anyOf(cf1, cf2, cf3).join();
System.out.println("cf1 ist fertig");
/*
* abwarten, ob sonst noch etwas passiert...
*/
sleep(5);Unter der Annahme, dass die Parallelität des commonPool mindestens 3 ist, passiert hier das Folgende: cf1, cf2 und cf3 werden an den commonPool submitted. cf1 wartet 3 Sekunden und ist dann fertig. cf2 und cf3 laufen weiter und geben jede Sekunde 2 bzw. 3 auf der Konsole aus. Obwohl cf1 fertig ist, laufen cf2 und cf3 weiter, wie man an den Konsolenausgaben in den verbleibenden 5 Sekunden des Aufrufer-Threads sehen kann. Auch wenn man mit cancel() versucht cf2 oder cf3 abzubrechen hat das trotzdem keinen Effekt auf die Threads, die diese Tasks abarbeiten, außer dass eine CancellationException die nächste CompletionStage triggert.
8.3.12 Fehlerbehandlung und Abbruch
Tasks können nicht nur getriggert werden, wenn ein Ergebnis für die vorangehende CompletionStage vorliegt, sondern auch wenn eine Exception geworfen wird. Die Ausnahme wird nicht sofort realisiert, sondern “für später aufgehoben”.
Wird für die CompletionStage, die die Ausnahme wirft, nicht direkt ein Exception-Handler vereinbart, wird das Throwable durch die Kette durchgereicht. Spätestens am Ende bei get oder join wird die Exception in Form einer (checked) ExecutionException (bei get) oder einer (unchecked) CompletionException (bei join) realsiert (s. Pull/Poll-API). Auch irgendwo vorher kann an einer CompletionStage mit handle (nicht-asynchron) oder handleAsync (asynchron) auf eine Fehlersituation reagiert werden.
var cf = CompletableFuture
.supplyAsync(() -> 42)
.thenApplyAsync(r -> r / 0)
.thenApplyAsync(r -> r * r)
.thenApplyAsync(r -> r > 0)
.handle((r, th) -> {
if (r != null) {
System.out.println("Resultat: " + r);
return r;
} else {
System.err.println("error: " + th);
return false;
}
});
System.out.println(cf.join());Durch die Division durch Null wird eine Exception provoziert. Die folgenden CompletionStages werden übersprungen und es wird direkt nicht-asynchron die BiFunction<T, Throwable, U> getriggert.
Diese Funktion bekommt zwei Parameter als Input. Es ist aber nur entweder der eine oder der andere gesetzt. Das ist entweder der Typ T des regulären Resultats der vorangehenden CompletionStage<T> (im gezeigten Beispiel Integer) oder ein Throwable, das eine in irgendeiner vorangegangenen CompletionStage aufgetretenen Exception repräsentiert. Das Ergebnis dieser Funktion kann von einem anderen Typ U (im gezeigten Beispiel Boolean) sein.
8.3.13 Fehlerbehandlung und Abbruch
Wenn als Ergebnis kein Rückgabewert benötigt wird, gibt es eine Alternative mit einem BiConsumer statt einer BiFunction. Die Methode, um solch einen Handler zu installieren, heißt whenComplete (nicht-asynchron) bzw. whenCompleteAsync (asynchron).
Durch die Division durch Null wird wieder eine Exception provoziert. Die folgenden CompletionStages werden übersprungen und es wird direkt nicht-asynchron der BiConsumer<T, Throwable> getriggert.
Diese Funktion bekommt zwei Parameter als Input. Es ist aber nur entweder der eine oder der andere gesetzt. Das ist entweder der Typ T des regulären Resultats der vorangehenden CompletionStage<T> (im gezeigten Beispiel Integer) oder ein Throwable, das eine in irgendeiner vorangegangenen CompletionStage aufgetretenen Exception repräsentiert. Der BiConsumer liefert kein Ergebnis zurück. Dementsprechend führt cf.join() zu einer “zusätzlichen” (unchecked) CompletionException.










