8 CompletableFuture Framework
8.1 Monte-Carlo-Berechnung von \(\pi\)
Wie \(\pi\) hier berechnet wird, ist nicht prüfungsrelevant.
8.1.1 Exkurs: Monte-Carlo-Algorithmen
Auch dieser Exkurs ist nicht prüfungsrelevant. Monte-Carlo-Algorithmen sind randomisierte Algorithmen, die durch fortgesetzte Wahl von Zufallszahlen versuchen, ein Ergebnis anzunähern. Es gibt bei ihnen aber keine Garantie für die Korrektheit des Ergebnisses. Das genau unterscheidet Monte-Carlo-Algorithmen von Las-Vegas-Algorithmen. Letztere kommen immer garantiert zum korrekten Ergebnis, verwenden die Zufallskomponente aber dazu, den Weg mglw. abzukürzen.
8.1.2 Monte-Carlo-Berechnung von \(\pi\)
Ein bekannter Monte-Carlo-Algorithmus zur Annäherung von \(\pi\) basiert darauf, dass zufällig Punkte (x y)
im Koordinatenbereich von (0, 0)
bis (1, 1)
gezogen werden. Für jeden gezogenen Punkt wird vermerkt, ob er innerhalb eines Einheitsviertelkreises (Radius=1) liegt. Durch Anwendung des Satzes des Pythagoras, kann dies geprüft werden.
Von zufällig gewählten Punkten mit Koordinaten ([0..1], [0..1]) kann man leicht feststellen, ob Sie genau auf, innerhalb oder außerhalb des Einheitsviertelkreis liegen:
Die Koordinaten eines zufällig gewählten Punkts sind \((x, y)\) mit
\[0 \le x \le 1\] und \[0 \le y \le 1\]
Der Punkt liegt aufgrund des Satzes von Pythagoras genau auf dem Kreisbogen, wenn Folgendes gilt (in der obigen Abbildung links): \[\sqrt{x^2 + y ^2} = 1\]
Der Punkt liegt aufgrund des Satzes von Pythagoras innerhalb des Kreises, wenn Folgendes gilt (in der obigen Abbildung in der Mitte): \[\sqrt{x^2 + y ^2} < 1\]
Der Punkt liegt aufgrund des Satzes von Pythagoras außerhalb des Kreises, wenn Folgendes gilt (in der obigen Abbildung rechts): \[\sqrt{x^2 + y ^2} > 1\]
Werden sehr viele Punkte gezogen, die sich innerhalb der Koordinaten (0, 0) und (1, 1) gleichmäßig verteilen, sollte sich für das Verhältnis der Anzahl der Punkte innerhalb des Kreises (einschließlich auf dem Kreisbogen) zu allen innerhalb des Quadrats das Verhältnis der Fläche des Viertelkreises (Radius 1) zur Fläche des Quadrats (Kantenlänge 1) ergeben.
\[\frac{\mbox{Anzahl der Punkte innerhalb und auf dem Kreis}}{\mbox{Anzahl aller Punkte}} = \frac{\frac{\pi r^2}{4}}{r^2}\]
Da \(r=1\) kann \(\pi\) angenähert werden als pi = 4.0 * in / (in + out)
, wobei in
die Anzahl der Punkte innerhalb des Kreises ist (sqrt(x*x + y*y) <= 1)
und out
die restlichen, die außerhalb des Kreises liegen.
Falls die zufällig gezogenen Punkte gleichmäßig über die Fläche verteilt sind, nähert sich pi
der Konstante \(\pi\)1 an. Es liegt in der Natur der Sache von Monte-Carlo-Algorithmen, dass dies nicht in jedem Fall zu einem korrekten Ergebnis führt.
Je mehr Punkte gezogen werden, desto eher sollte im Mittel der Wert von \(\pi\) angenähert werden. Es ist also sinnvoll, so viele Punkte wie möglich zu ziehen.
8.2 Überblick und Grundlagen von “CompletableFuture”
8.2.1 Zweck und grundlegende Struktur von “CompletableFuture”
- Pipelining: funktionale Verkettung mehrerer
Future
/Callable
- Output eines
Callable
wird Input für nächstesCallable
- verwendet Regeln, um zu entscheiden, wo nächstes
Callable
ausgeführt wird:- im vorigen Thread
- im Aufrufer-Thread oder
- einem neuen Thread
- Output eines
- Verwendung des
commonPool
- asynchrone Methodenaufrufe
- Standard für verteilte Architekturen
- reaktive Programmierung / reaktive Architektur \(\to\) “hängt” nicht, (vertikal) skalierbar
CompletableFuture
wirkt wie ein Versprechen, dass zukünftig ein Ergebnis vorliegt (“Promise”)
Future
: read-only Container für zukünftiges ErgebnisCompletionStage
: triggert weitere Verarbeitung
8.2.1.1 Exkurs: mit CompletableFuture
vergleichbare Konzepte
(nicht prüfungsrelevant)
- JavaScript: Promise (
.then(onFulfillment)
.catch(onRejection)
) - Clojure:
promise
/deliver
/deref
- Guava (populäre Java-Library von Google):
SettableFuture
(InterfaceListenableFuture
) async
/await
in C#, Dart, JavaScript, Scala, Rust
8.2.2 CompletionStage
(Push-API)
- bietet eine Reihe von Dreier-Paaren:
- “nicht-asynchrone” Ausführung
- asynchrone Ausführung in
commonPool
- asynchrone Ausführung mit eigenem
Executor
stattcommonPool
- Ergebnis ist immer eine neue
CompletionStage
Consumer
, Function
, BiFunction
sind Functional Interfaces (“single abstract method Interfaces”), die es seit Java 8 im Zuge der Einführung von Lambda-Ausdrücken im Package java.util.function
gibt:
8.2.3 Überblick über das API von CompletableFuture
Das Application Programming Interface (API), also die Benutzungsschnittstelle von CompletableFuture
, kann neben dem bereits vorgestellten Push-API in die folgenden weiteren Bereiche gegliedert werden:
8.2.3.1 Pull/Poll-API
Einige der im Vergleich zu Future
bei Pull/Poll aus dem Bereich der Steuerung der asynchronen Verarbeitung hinzugekommene Methoden sind:
T join()
: Liefert das Ergebnis der asynchronen Verarbeitung, wirft aber im Gegensatz zuget()
im Fehlerfall eine unchecked Exception: Sowohlget()
als auchjoin()
blockieren, bis das Ergebnis vorliegt. Beide werfen im Abbruchfall (Aufruf voncancel()
) eineCancellationException
. Im Fehlerfall wirft …get()
eine (checked)ExecutionException
, die also auf jeden Fall mit einemtry
/catch
behandelt oder mitthrows
deklariert werden muss (“if this future completed exceptionally”).join()
eine (unchecked)CompletionException
, die also nicht unbedingt mittry
/catch
behandelt werden muss (“if this future completed exceptionally or a completion computation threw an exception”).
T getNow(T valueIfAbsent)
: Fragt nach dem Ergebnis. Ist (noch) keines vorhanden, wirdvalueIfAbsent
zurückgegeben. Auch diese Methode wirft im Fehlerfall ebenfallsCancellationException
oderCompletionException
.boolean isCompletedExceptionally()
: Lieferttrue
, falls während der Verarbeitung eine Ausnahme aufgetreten ist.
8.2.4 internes API
Mit den Methoden complete
und completeExecptionally
des internen API von CompletableFuture
kann man “low level” ein CompletableFuture
befüllen. Im folgenden Beispiel wird in der Methodde calculateAsync
zuerst ein “leeres” CompletableFutre
-Objekt für den Typ-Parameter Integer
erzeugt.
Danach wird es asynchron befüllt, indem ein Task (vom Typ Runnable
) an den commonPool
“submitted” wird. In einem Thread des commonPool
wird asynchron zum Aufrifer-Thread (hier der main
-Thread) ein Ergebnis berechnet (im Beispiel ist das einfach 42
) und dem bereits erzeugten CompletableFuture
als Ergebnis mitgeteilt. Dies könnte an dem CompletableFuture
eine Kette weiterer Verarbeitungsschritte triggern.
static CompletableFuture<Integer> calculateAsync() {
var result = new CompletableFuture<Integer>();
ForkJoinPool.commonPool().submit(() -> {
try {
var res = /* aufwändige Berechnung, Ergebnis z.B. */ 42;
result.complete(res);
} catch (Exception ex) {
result.completeExceptionally(ex);
}
});
return result;
}
public static void main(String... args) {
var cf = calculateAsync();
try {
System.out.println(cf.get());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
System.err.print(e);
}
}
Sollte es aber eine Exception bei der Berechnung von res
geben, kann sie dadurch “für später aufgehoben” werden, indem sie mit completeExceptionally
dem CompletableFuture
anstatt des Ergebnisse mitgeteilt wird.
8.2.5 Start-API
Normalerweise wird man eine CompletableFuture
-Verarbeitungskette aber nicht durch Instantiieren eines “leeren” CompletableFuture
-Objekts und darauffolgende Befüllung über das interne API starten wie im vorigen Beispiel, sondern Methoden des Start-API zum Starten einer CompletableFuture
-Kette nutzen. Im folgenden Beispiel wird dazu supplyAsync
verwendet:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;
public class SimpleCompletableFuture {
static class Task implements Supplier<Integer> {
@Override
public Integer get() {
return /* aufwändige Berechnung, Ergebnis z.B. */ 42;
}
}
public static void main(String... args)
throws InterruptedException, ExecutionException {
var future = CompletableFuture.supplyAsync(new Task());
System.out.println(future.get());
}
}
CompletableFuture
hat eine Reihe Klassenmethoden im “Start-API”, die jeweils bestimmte Single Abstract Method bzw. Functional Interfaces unterstützen. Im obigen Beispiel wird supplyAsync
zum Start einer Verarbeitungskette benutzt. Diese Klassenmethode erwatret ein Objekt, das das Supplier<T>
-Interface implementiert. Das Resultat ist dann ein CompletableFuture<T>
, an dem weitere (asynchrone) Verabreitungsschritte verkettet werden können. Liegt ein Ergebnis vom Aufruf der Single Abstract Method get
des Supplier
-Objekts vor, kann dies in der Verarbeitungskette weitere Methoden triggern, die das Ergebnis von get
, das asynchron im commonPool
berechnet wurde, als Input nehmen.
8.2.6 Start-API
In der Praxis würde man das aber eher als Lambda-Ausdruck notieren (hier mit {
… }
, damit es näher am Code des vorigen Beispiels ist – stattdessen ganz kurz: () -> 42
).
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class SimpleCompletableFuture {
public static void main(String... args)
throws InterruptedException, ExecutionException {
var future = CompletableFuture.supplyAsync(() -> {
return /* aufwändige Berechnung, Ergebnis z.B. */ 42;
});
System.out.println(future.get());
}
}
Mit supplyAsync
wird ein Callable
asynchron berechnet. Sobald das Ergebnis im CompltableFuture
vorliegt, kann eine über das Push-API vereinbarte Folgefunktion getriggert werden, die das Ergebnis des Suppliers
(Ergebnis des Methodenaufrufs get
des Supplier
-Objekts) als Eingangsparameter erhält.
Auch wenn es wegen der bisherigen Betonung des Push-API nicht sinnvoll erscheinen mag: Man kann ein CompletableFuture
auch mit einem Runnable
statt eines Callable
starten. Die Methode dafür heißt runAsync
. Die Verwendung von Runnable
statt Callable
bedeutet, dass es kein Ergebnis gibt. Lediglich die Information, dass das Ende der run
-Methode erreicht ist, und ggf. “für später aufgehobene” Exceptions können im CompletableFuture
repräsentiert werden. Statt die Berechnung der nächsten CompletionStage
zu triggern gibt es aber noch andere Verwendungsmöglichkeiten, die später vorgestellt werden.
Der Typ-Parameter für das CompletableFuture
ist im Fall von runAsync
Void
(in Anlehnung an das Schlüsselword void
, das anzeigt, dass eine Methode keinen Rückgabewert hat). Bei supplyAsync
entspricht der Typ-Parameter des CompletableFuture
dem Typ-Parameter des Callable
, also dem Ergebnis-Typ von get
.
Im Gegensatz zu den Dreierpaaren des CompletionStage
-Interface (Push-API) gibt es bei deisen beiden Methoden des Start-API nur jeweils zwei Varianten. Die Start-Berechnung des CompletableFuture
kann nämlich entweder im commonPool
oder einem anderen Threadpool (Executor
-Interface) erfolgen. Die Signaturen der vier sich ergebene Methoden sind:
8.3 Push-API von CompletableFuture
: lineare Ketten und Verzweigungen
8.3.1 asynchrone Verarbeitungskette
Die Namen der Methoden des Push-API’s beginnen in der Regel mit then
. Mit ihnen wird dem CompletableFuture
-Objekt, an dem sie aufgerufen werden, jeweils ein Trigger hinzugefügt. Wenn die Bedingung erfüllt ist (in der Regel ist dies, dass das Ergebnis fertig berechnet ist), wird eine weitere Methode asynchron zu dem Thread, der den Trigger durch den ``.then”-Aufruf installiert hatte, ausgeführt.
Bei vielen der then
-Methoden ist das Ergebnis der vorigen CompletionStage
der Input für die getriggerte Methode. Es gibt aber auch den Fall, dass bei thenRun
ein Runnable
gestartet wird, dessen run
-Methode keinen Input-Parameter hat.
CompletableFuture<U> thenApplyAsync(Function<? super T, ? extends U> f)
CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
thenApplyAsync
:f
ist eine Funktion, die einen Parameter vom TypT
(oder “allgemeiner”) erwartet und ein Ergebnis vom TypU
(oder “spezifischer”) hat.Function
ist ein “Functional Interface” aus dem Packagejava.util.function
2
thenConsumeAsync
:action
ist eine Funktion, die einen Parameter vom Typ ``T (oder “allgemeiner”) erwartet und kein Ergebnis hat.Consumer
ist ein “Functional Interface” aus dem Packagejava.util.function
3
Für jedes thenApplyAsync
und theAcceptAsync
wird ggf. ein eigener Thread aus dem Threadpool benutzt.
Der Sinn von CompletableFutures
und dessen Push-API ist, dass man auf elegante Weise lange Ketten asynchroner Aufrufe in einem “fluent”-Stil programmieren kann:
var cf = CompletableFuture
.supplyAsync(()-> fn1())
.thenApplyAsync((in) -> in.fn2())
.thenApplyAsync((in) -> in.fn3())
.thenAcceptAsync((in) -> in.fn4());
Das ist die asynchrone Variante der folgenden sequenziellen Verkettung:
Bzw. wenn das API funktional und nicht objekt-orientiert gestaltet wäre:
8.3.2 asynchrone Verarbeitungskette
Da Java eine statisch getypte Sprache ist und mit den Generics ein umfassendes Typ-System zur Verfügung steht, liegt es in der Natur der Sache, dass die asynchronen Verarbeitungsketten, die mit dem CompletableFuture
-Framework und seinem Push-API aufgebaut werden können, typisiert sind.
In der folgenden Grafik wird die Typisierung an einem Beispiel demonstriert (der Typ CompletableFuture
wird durch CF
abgekürzt):
Task1
soll ein Ergebnis vom TypT
liefern.- Mit
thenApplyAsync
wird eineFunction
(z.B. in Form eines Lambda-Ausdrucks) übergeben, deren Single Abstract Method einen Eingangsparameter vom TypT
hat und als Ergebnis den TypU
liefert: Das istTask2
.- Das Ergebnis des Aufrufs von
thenApplyAsync
ist einCompletableFuture<U>
.
- Das Ergebnis des Aufrufs von
- Wenn das Ergebnis dieser Funktion vorliegt, soll das die Berechnung von
Task3
triggern. Beim Methodenaufruf vonthenApplyAsync
wurde dementsprechend wieder ein Objekt vom TypFunction
übergeben. Diesmal ist die Signatur der Single Abstract Method dieses Objekts aber so, dass ein Eingangsparameter vom TypU
benötigt wird (dem Typ des Ergebnisses der vorigenCompletionStage
) und das Resultat ist vom TypV
.- Das Ergebnis des Aufrufs von
theApplyAsync
ist einCompletableFuture<V>
.
- Das Ergebnis des Aufrufs von
8.3.3 Beispiel für eine asynchrone Verarbeitungskette
Das Prinzip der asynchronen linearen Verkettung mit dem Push-API soll hier an einem Beispiel deutlich gemacht werden. Dazu nehmen wir an, es gäbe ein Interface namens Service
, in dem mindestens die folgenden drei Methoden mit diesen Signaturen definiert wären. Außerdem soll es die drei Typen User
, Profile
und AccessRight
geben, die in diesen Signaturen benutzt werden.
public class Service {
static User getUser(int userId) { ... }
static Profile getProfile(User user) { ... }
static AccessRight getAccessRight(Profile profile) { ... }
}
Dann könnte beispielsweise die folgende asynchrone lineare Verarbeitungskette (“Pipeline”) mit dem Push-API im “fluent”-Stil programmiert werden:
CompletableFuture<Void> cf = CompletableFuture
.supplyAsync(() -> Service.getUser(42))
.thenApplyAsync((user) -> Service.getProfile(user))
.thenApplyAsync((profile) -> Service.getAccessRight(profile))
.thenAcceptAsync((access) -> System.out.println(access));
cf.join(); // wartet auf das Ende der Berechnung von cf,
// hier also auf die erfolgte Ausgabe
Diese Methodenaufrufe asynchron zu machen, ist speziell in verteilten Systemen sinnvoll, da hier die Netzwerkkommunikation immer zu Verzögerungen führen kann. Passiert die Netzwerkkommunikation asynchron, also im Hintergrund, wird der sonstige Ablauf des Programms (insb. die Bedienung der grafischen Benutzungsschnittstelle) nicht blockiert. Das Programm bleibt auch bei Netzwerkaussetzern “reaktiv”.
8.3.4 Beispiel für eine asynchrone Verarbeitungskette
public class Service {
public static User getUser(int userId) { ... }
public static Profile getProfile(User user) { ... }
public static AccessRight getAccessRight(Profile profile) { ... }
}
Im Folgenden werden für das obige Beispiel die Typen von Input und Output der Lambda-Ausdrücke deutlich gemacht. Achtung: Das ist keine korrekte Java-Notation, sondern ist nur zur Verdeutlichung gedacht.
8.3.5 “nicht-asynchrone” Verarbeitungsschritte
Die asynchronen Methoden des Push-API’s haben immer den Namen then*Async
(statt *
z.B. Apply
, Accept
, Run
, Combine
, Compose
). Es gibt sie dank Überladung jeweils in zwei Versionen:
- Bei der einen wird ein Threadpool in Form eines
Executor
übergeben, in dem der Task ausgeführt wird, wenn die vorigeCompletionStage
zu einem Ergebnis gekommen ist. - Bei der anderen Version wird immer der
commonPool
verwendet, um die Tasks abzuarbeiten.
Zu jeder dieser asynchronen Methodenpaare gibt es noch eine “nicht-asynchrone” Variante. Sie unterscheidet sich von ihren asynchronen Verwandten durch die Benennung: Dem Methodennamen fehlt immer das Async
am Ende:
Methode | Task-Typ | Resultat |
---|---|---|
thenRun |
Runnable: () -> void |
CompletableFuture<Void> |
thenAccept |
Consumer: (T) -> void |
CompletableFuture<Void> |
thenApply |
Function: (T) -> U |
CompletableFuture<U> |
Die “nicht-asynchrone” Variante heißt nicht “synchron”, da das Verhalten (bzw. der Thread, in dem der dazugehörende Task ausgeführt wird) etwas vielfältiger ist:
- Falls der “vorige”
CompletableFuture
-Task schon fertig ist, wird der nächste Task im Aufrufer-Thread ausgeführt. - Falls der “vorige”
CompletableFuture
-Task noch nicht fertig ist, wird der nächste Task anschließend im Thread des vorigen Tasks ausgeführt.
Letztlich kann man sich zur Entwicklungszeit nicht darauf verlassen, in welchem Thread bzw. in welchem Threadpool ein “nicht-asynchron” übergebener Task ausgeführt wird:
“Actions supplied for dependent completions of non-async methods may be performed by the thread that completes the current CompletableFuture, or by any other caller of a completion method.”4
8.3.6 Split-Pattern
An einer CompletionStage
können mit dem Push-API beliebig viele folgende CompletionStage
-Objekte und ihre Tasks getriggert werden. Das heißt nicht, dass die damit verbundenen Tasks auch augenblicklich beim Eintreten des Ereignisses begonnen werden, sondern nur, dass sie augenblicklich im jeweils vorgesehenen Threadpool (z.B. dem commonPool
) “submitted” werden.
Die folgende Situation ist ein Beispiel dafür:
var task1 = CompletableFuture.supplyAsync(() -> "4711");
var task2 = task1.thenApplyAsync((in) -> in.length());
var task3 = task1.thenApplyAsync((in) -> in.equals("42"));
T
:String
'4711'
U
:Integer
4
V
:Boolean
false
Hier wird im CompletableFuture<String>
task1
der Task den String
'4711'
zu “berechnen” repräsentiert. An dieses CompletableFuture
werden zwei weitere CompletionStages
gehangen:
- Der Lambda-Ausdruck
(in) -> in.length()
wird hier zu einem Objekt vom TypFunction<String, Integer>
: Der Eingangsparameter in die Methode ist vom TypString
und das Ergebnis ist vom TypInteger
(es findet eine automatische Umwandlung vom eigentlichen Resultattypint
zum Wrapper-TypInteger
statt). - Der Lambda-Ausdruck
(in) -> in.equals("42")
wird hier zu einem Objekt vom TypFunction<String, Boolean>
: Der Eingangsparameter in die Methode ist vom TypString
und das Ergebnis ist vom TypBoolean
(es findet eine automatische Umwandlung vom eigentlichen Resultattypboolean
zum Wrapper-TypBoolean
statt).
Sobald das Resultat von task1
vorliegt (Typ String
) werden die beiden Tasks der Folge-CompletionStage
-Instanzen “submitted”. Sie können parallel berechnet werden.
8.3.7 Vereinen von Verarbeitungsketten
Wenn zwei Tasks den Input für einen dritten Task liefern oder es eine Kontroll-logische Abhängigkeit zwischen ihnen gibt, können Methoden zum Vereinen von Verabreitungsketten, die das Push-API anbietet, verwendet werden.
Sie werden an einem CompletionStage
-Objekt aufgerufen und haben ein zweites CompletionStage
-Objekt als ersten Parameter. Die Verarbeitungsketten dieser beiden CompletionStage
-Objekte werden dadurch vereinigt (“Task1” und “Task2”). Der zweite Parameter (BiFunction
, BiConsumer
, Runnable
, Function
oder Consumer
) ist der Task, der zum Vereinen verwendet wird (“Task3”).
Methoden zur Vereinigung von Abläufen. “CF” steht für CompletableFuture
und “CS” für CompletionStage
Man kann mit diesen Methoden des Push-API jeweils zwei Verarbeitungsketten zu einem CompletableFuture
vereinen. Bei den Methoden thenCombine
, thenAcceptBoth
und runAfterBoth
müssen die Resultate beider vorangehenden CompletionStages
vorliegen. Bei den Methoden mit Either
im Namen, braucht nur eine der beiden vorangehenden CompletionStages
bereit sein. Die andere Kette wird dann verworfen.
Diese Methoden gibt es immer in zwei Varianten: Je nachdem, ob der Task zum Vereinen asynchron oder nicht-asynchron ausgeführt werden soll, gibt es die beiden Varianten mit und ohne Async
am Ende des Methodennamens.
8.3.8 Zusammenführen durch thenCombine
(Verrechnen)
Bei thenCombine
werden zwei vorangehende CompletionStage
s kombiniert, indem deren Resultate mit einer BiFunction<T, U, V>
“zusammengerechnet” werden. BiFunction
ist ein Functional Interface aus dem Package java.util.function
. Die Single Abstract Method dieses Interfaces hat zwei Parameter: Der erste ist vom Typ T
, der zweite vom Typ U
(Typ-Parameter entsprechend der Spezifikation BiFunction<T, U, V>
). Der Rückgabewert der Methode ist dann vom Typ V
.
Je nachdem, ob der Task zum Kombinieren asynchron oder nicht-asynchron ausgeführt werden soll, gibt es die beiden Varianten thenCombine
und thenCombineAsync
.
Man kann also leicht einen Lambda-Ausdruck formulieren, der die Ergebnisse der beiden vorangegangenen Tasks in einem dritten Task zusammenführt. Die folgende Situation ist ein Beispiel dafür:
var task1 = CompletableFuture.supplyAsync(() -> 47);
var task2 = CompletableFuture.supplyAsync(() -> "11");
var task3 = task1.thenCombineAsync(task2, (n, s)->String.valueOf(n)+s);
T
:Integer
47
U
:String
'11'
V
:String
'4711'
Hier wird im CompletableFuture<Integer>
task1
der Task das Integer
47
zu “berechnen” repräsentiert. An diesem CompletableFuture
wird thenCombine
aufgerufen. Diese Methode hat als ersten Parameter das zweite CompletableFuture
für Task2. Diese beiden CompletionStages
werden durch Task3 asynchron kombiniert. Hier wird dazu der Lambda-Ausdruck (n, s) -> String.valueOf(n) + s
genutzt, der aus dem ersten Parameter (ein Integer
) eine String
-Repräsentation macht. Der resultierende String
wird mit dem zweiten Parameter (wieder ein String
) konkateniert.
task3
kann erst asynchron begonnen werden, wenn die beiden Ergebnisse der vorangehenden CompletionStages
vorliegen, da sie ja kombiniert werden sollen.
8.3.9 Zusammenführen durch applyToEither
(“ODER”)
Bei applyToEither
bzw. applyToEitherAsync
werden zwei Verarbeitungsketten über die entsprechenden CompletionStage
s zusammengeführt, indem das Resultat, das zuerst vorliegt, mit einer Function<T, U>
asynchron oder nicht-asynchron weiterverarbeitet wird. Der Rückgabewert der Methode ist dann vom Typ U
. Je nachdem, ob der Task zum Weiterverarbeiten asynchron oder nicht-asynchron ausgeführt werden soll, gibt es die beiden Varianten applyToEitherAsync
und applyToEither
.
Man kann also leicht einen Lambda-Ausdruck formulieren, der das Ergebnis des Tasks, der zuerst fertig ist, in einem dritten Task weiterverarbeitet. Die folgende Situation ist ein Beispiel dafür:
var task1 = CompletableFuture.supplyAsync(() -> 47);
var task2 = CompletableFuture.supplyAsync(() -> 11);
var task3 = task1.applyToEitherAsync(task2, (n) -> n > 20);
T
:Integer
47
U
:Integer
11
V
:Boolean
true
oderfalse
Hier wird im CompletableFuture<Integer>
task1
der Task, das Integer
47
zu “berechnen”, repräsentiert. An diesem CompletableFuture
wird applyToEitherAsync
aufgerufen. Diese Methode hat als ersten Parameter das zweite CompletableFuture
für Task2. Diese beiden CompletionStages
werden durch Task3 asynchron vereint. Das Ergebnis ist enweder true
oder false
– je nachdem, welcher vorangegangene Task zuerst berechnet ist.
task3
kann erst asynchron begonnen werden, wenn mindestens ein Ergebnis der vorangehenden CompletionStages
vorliegt.
8.3.10 Barrieren und Zusammenführen
Im Start-API des CompletableFuture
-Frameworks gibt es noch einige Methoden, die mit “Barrieren” arbeiten. allOf
und anyOf
haben beide eine variable Parameterliste. Sie erwarten eine beliebige Anzahl von Tasks in der Form von CompletableFuture
-Objekten:
allOf
benötigt irgendeinCompletableFuture<?>
, also z.B. mitrunAsync
gestarteteRunnable
-Tasks. Das Ergebnis vonallOf
ist nämlich einCompletableFuture<Void>
, d.h. die Ergebnisse der Tasks können nicht weiterverwendet werden.anyOf
benötigtCompletableFuture<? extends T>
, also z.B. mitsupplyAsync
gestarteteCallable<? extends T>
-Tasks. Das Ergebnis vonanyOf
ist dann einCompletableFuture<T>
.
Die Semantik von der Start-API-Methoden allOf
und anyOf
, die sie zu “Barrieren” macht, ist, dass das CompletableFuture
, das Ergebnis von allOf
ist, seine nächste CompletionStage
erst startet (oder bei get
oder join
entblockiert), wenn alle Tasks der übergebenen CompletableFuture
-Objekte fertig berechnet sind. Ergebnisse der Tasks könnten nicht weiterverarbeitet werden. Allerdings funktioniert allOf
mit allen CompletableFuture<?>
-Objekten als Parameter.
anyOf
hingegen nimmt das Ergebnis des CompletableFuture
, welches zuerst fertig ist, als Input der nächsten CompletionStage
. Daher kann als Inhaltstyp des resultierenden CompletableFuture
irgendein Obertyp der beteiligten Task-Resultate verwendet werden.
8.3.11 Barrieren und Zusammenführen
Es folgen zwei Beispiele für die Anwendung dieser “Barrieren”, bei denen jeweils drei Tasks übergeben werden:
CompletableFuture.allOf( // alle müssen beendet werden
CompletableFuture.runAsync( () -> { /*...*/ } ),
CompletableFuture.runAsync( () -> { /*...*/ } ),
CompletableFuture.runAsync( () -> { /*...*/ } )
).thenAccept((Void) -> System.out.println("done") );
CompletableFuture.anyOf( // das frühest fertige
CompletableFuture.supplyAsync( () -> { /*...*/ } ),
CompletableFuture.supplyAsync( () -> { /*...*/ } ),
CompletableFuture.supplyAsync( () -> { /*...*/ } )
).thenAccept((first) -> System.out.println(first));
Man beachte: Je nach Parallelität des commonPool
, bzw. der Executor
-Threadpools der übergebenen CompletableFuture
-Objekte, werden nicht alle Tasks tatsächlich gleichzeitig und parallel abgearbeitet.
Das folgende Programmfragment liefert 1
als result
demzufolge auch nur auf einem System mit einer Parallelität des commonPool
von mindestens 10.
static void sleep(int sek) {
try {
Thread.sleep(sek * 1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
var result = CompletableFuture
.anyOf(CompletableFuture.supplyAsync(() -> {
sleep(10);
return 10;
}), CompletableFuture.supplyAsync(() -> {
sleep(9);
return 9;
}), CompletableFuture.supplyAsync(() -> {
sleep(8);
return 8;
}), CompletableFuture.supplyAsync(() -> {
sleep(7);
return 7;
}), CompletableFuture.supplyAsync(() -> {
sleep(6);
return 6;
}), CompletableFuture.supplyAsync(() -> {
sleep(5);
return 5;
}), CompletableFuture.supplyAsync(() -> {
sleep(4);
return 4;
}), CompletableFuture.supplyAsync(() -> {
sleep(3);
return 3;
}), CompletableFuture.supplyAsync(() -> {
sleep(2);
return 2;
}), CompletableFuture.supplyAsync(() -> {
sleep(1);
return 1;
})).join();
Außerdem werden bei anyOf
die Tasks, die nicht als erstes fertig berechnet wurden, nicht abgebrochen, obwohl sie nicht mehr relevant sind und ihr Ergebnis nicht mehr benötigt wird. Der Grund ist die Architektur des CompletableFuture
-Frameworks. CompletableFuture
-Objekte sind nur ein Wrapper zu Threads, die in einem Executor
wie dem commonPool
ausgeführt werden. Eine Steuerung dieser Threads erfolgt nicht aus dem CompletableFuture
-Framework. Selbst eine Methode wie cancel()
aus dem Pull/Poll-API-Teil steuert nicht den Thread, sondern wirkt nur als Trigger für folgende CompletionStages
(und zwar mit einem vergleichbaren Effekt wie completeExecptionally(...)
), wie im folgenden Beispiel verdeutlicht wird (sleep
wie oben):
/*
* Nachweis, dass nicht mehr gebrauchte Tasks bei anyOf weiterlaufen
*/
var cf1 = CompletableFuture.runAsync(() -> {
sleep(3);
});
var cf2 = CompletableFuture.runAsync(() -> {
while (true) {
sleep(1);
System.out.println(Thread.currentThread().getName() + ": 2");
}
});
var cf3 = CompletableFuture.runAsync(() -> {
while (!Thread.interrupted()) {
sleep(1);
System.out.println(Thread.currentThread().getName() + ": 3");
}
System.out.println("cf3 end");
});
/*
* Darauf warten, dass cf1 nach 3 Sek. fertig ist. cf2 und cf3 laufen
* weiter, obwohl sie nicht mehr gebraucht werden...
*
* Auch mit .cancel() kann man sie nicht beenden (triggert nur
* Exception-Kette, falls gepusht).
*/
CompletableFuture.anyOf(cf1, cf2, cf3).join();
System.out.println("cf1 ist fertig");
/*
* abwarten, ob sonst noch etwas passiert...
*/
sleep(5);
Unter der Annahme, dass die Parallelität des commonPool
mindestens 3 ist, passiert hier das Folgende: cf1
, cf2
und cf3
werden an den commonPool
submitted. cf1
wartet 3 Sekunden und ist dann fertig. cf2
und cf3
laufen weiter und geben jede Sekunde 2
bzw. 3
auf der Konsole aus. Obwohl cf1
fertig ist, laufen cf2
und cf3
weiter, wie man an den Konsolenausgaben in den verbleibenden 5 Sekunden des Aufrufer-Threads sehen kann. Auch wenn man mit cancel()
versucht cf2
oder cf3
abzubrechen hat das trotzdem keinen Effekt auf die Threads, die diese Tasks abarbeiten, außer dass eine CancellationException
die nächste CompletionStage
triggert.
8.3.12 Fehlerbehandlung und Abbruch
Tasks können nicht nur getriggert werden, wenn ein Ergebnis für die vorangehende CompletionStage
vorliegt, sondern auch wenn eine Exception geworfen wird. Die Ausnahme wird nicht sofort realisiert, sondern “für später aufgehoben”.
Wird für die CompletionStage
, die die Ausnahme wirft, nicht direkt ein Exception-Handler vereinbart, wird das Throwable
durch die Kette durchgereicht. Spätestens am Ende bei get
oder join
wird die Exception in Form einer (checked) ExecutionException
(bei get
) oder einer (unchecked) CompletionException
(bei join
) realsiert (s. Pull/Poll-API). Auch irgendwo vorher kann an einer CompletionStage
mit handle
(nicht-asynchron) oder handleAsync
(asynchron) auf eine Fehlersituation reagiert werden.
var cf = CompletableFuture
.supplyAsync(() -> 42)
.thenApplyAsync(r -> r / 0)
.thenApplyAsync(r -> r * r)
.thenApplyAsync(r -> r > 0)
.handle((r, th) -> {
if (r != null) {
System.out.println("Resultat: " + r);
return r;
} else {
System.err.println("error: " + th);
return false;
}
});
System.out.println(cf.join());
Durch die Division durch Null wird eine Exception provoziert. Die folgenden CompletionStages
werden übersprungen und es wird direkt nicht-asynchron die BiFunction<T, Throwable, U>
getriggert.
Diese Funktion bekommt zwei Parameter als Input. Es ist aber nur entweder der eine oder der andere gesetzt. Das ist entweder der Typ T
des regulären Resultats der vorangehenden CompletionStage<T>
(im gezeigten Beispiel Integer
) oder ein Throwable
, das eine in irgendeiner vorangegangenen CompletionStage
aufgetretenen Exception
repräsentiert. Das Ergebnis dieser Funktion kann von einem anderen Typ U
(im gezeigten Beispiel Boolean
) sein.
8.3.13 Fehlerbehandlung und Abbruch
Wenn als Ergebnis kein Rückgabewert benötigt wird, gibt es eine Alternative mit einem BiConsumer
statt einer BiFunction
. Die Methode, um solch einen Handler zu installieren, heißt whenComplete
(nicht-asynchron) bzw. whenCompleteAsync
(asynchron).
var cf = CompletableFuture
.supplyAsync(() -> 42)
.thenApplyAsync(r -> r / 0)
.thenApplyAsync(r -> r * r)
.thenApplyAsync(r -> r > 0)
.whenComplete((r, th) -> {
if (r == null) {
System.err.println("error: " + th);
}
});
cf.join();
Durch die Division durch Null wird wieder eine Exception provoziert. Die folgenden CompletionStages
werden übersprungen und es wird direkt nicht-asynchron der BiConsumer<T, Throwable>
getriggert.
Diese Funktion bekommt zwei Parameter als Input. Es ist aber nur entweder der eine oder der andere gesetzt. Das ist entweder der Typ T
des regulären Resultats der vorangehenden CompletionStage<T>
(im gezeigten Beispiel Integer
) oder ein Throwable
, das eine in irgendeiner vorangegangenen CompletionStage
aufgetretenen Exception
repräsentiert. Der BiConsumer
liefert kein Ergebnis zurück. Dementsprechend führt cf.join()
zu einer “zusätzlichen” (unchecked) CompletionException
.