pp.03.02-BoundedQueueWaitNotify
Anwendungsbeispiel Ringpuffer
In diesem Anwendungsbeispiel wird die bedingungsabhängige Steuerung von Threads am Beispiel eines Ringpuffers demonstriert. Solch ein Ringpuffer (engl. “Bounded Buffer”) hat eine begrenzte Kapazität und unterstützt mindestens zwei Operationen: Elemente hinzufügen (put
) sowie holen mit gleichzeitigem Entfernen (take
).
Es gibt zwei Bedingungen, die einen Zusammenhang mit der Thread-Steuerung haben:
- Element holen mit gleichzeitigem Entfernen (
take
), wenn der Ringpuffer leer ist: Der Thread, der dietake
-Methode aufruft wird so lange blockiert, bis ein anderer Thread mitput
ein neues Element im Ringpuffer gespeichert hat. Ist dieses neue Element im Ringpuffer vorhanden, wird die Blockierung beendet und dieser Thread liefert dann das neue Element als Ergebnis destake
-Aufrufs und entfernt dieses Element gleichzeitig aus dem Ringpuffer. - Hinzufügen eines Elements (
put
), wenn der Ringpuffer voll ist: Der Thread, der dieput
-Methode aufruft wird so lange blockiert, bis ein anderer Thread mittake
ein Element aus dem Ringpuffer gelöscht hat und damit wieder Platz geschaffen wurde. Ist das Element aus dem Ringpuffer entfernt, wird die Blockierung beendet und dieser Thread kann dann das neue Element in den Ringpuffer speichern.
Unabhängig von dieser anwendungsspezifischen gegenseitigen Steuerung ist die Verwendung des Ringpuffers aus mehreren Threads heraus ein kritischer Abschnitt, der gegenseitigen Ausschluss bedingt.
Die gespeicherten Elemente sind konzeptionell in einem Kreis angeordnet und es werden zwei Positionen verwaltet:
- Der Index, an dem das nächste Element mit
put
hinzugefügt wird (putptr
). Nachdem ein Element hinzugefügt wurde, wird dieser Index um eins weiter vor geschoben. - Der Index, von dem das nächste Element mit
take
gelesen und entfernt wird (takeptr
). Nachdem ein Element gelesen und entfernt wurde, wird der Index um eins weiter vor geschoben.
Diese beiden Indizes bewegen sich unabhängig voneinander, aber der Lese-/Entnahmeindex (takeptr
) kann den Einfügeindex (putptr
) aufgrund der ersten Bedingung nicht überholen. Ebenso kann aufgrund der zweiten Bedingung der Einfügeindex (putptr
) den Lese-/Entnahmeindex (takeptr
) nicht überholen.
Quellcode von pp.BoundedBuffer
Dieser Quellcode funktioniert noch nicht bei Nebenläufigkeit, denn die Bedingungen und die gegenseitige Steuerung sind hier noch nicht umgesetzt. In dieser Übungsaufgabe werden Sie dieses Programm so modifizieren, dass Nebenläufigkeit berücksichtigt wird.
package pp;
public class BoundedBuffer<T> {
final Object[] items = new Object[8];
int putptr, takeptr, count;
public void put(T x) throws InterruptedException {
this.items[this.putptr] = x;
if (++this.putptr == this.items.length) {
this.putptr = 0;
}
++this.count;
}
public T take() throws InterruptedException {
var x = (T) this.items[this.takeptr];
if (++this.takeptr == this.items.length) {
this.takeptr = 0;
}
--this.count;
return x;
}
}
Beispielhafter Ablauf
Im folgenden beispielhaften Ablauf der Benutzung des Ringpuffers wird das korrekte Verhalten bei Nebenläufigkeit durchgespielt. Die Steuerung der Threads ist aber noch nicht im oben angegebenen Quellcode von pp.BoundedBuffer
enthalten.
Start
Der Ringpuffer in diesem Beispiel hat die Kapazität 8. Er kann 8 also Elemente speichern. Intern wird in unserer Implementierung ein Array für 8 Objekte benutzt. Diese Ringpufferimplementierung verwendet Java-Generics, man kann den Inhaltstyp also einschränken.
Im initialen Zustand ist das Array leer; putptr
und takeptr
weisen auf den Index 0
, also das erste Element des Puffers. Es sind noch keine Elemente enthalten, also ist count == 0
.
items.length |
8 |
Kapazität von items |
count |
0 |
aktuelle Anzahl Elemente |
takeptr |
0 |
Index nächstes take() |
putptr |
0 |
Index nächstes put() |
- Ringpuffer ist “leer” (
count == 0
) - Solange dies so ist, müssen
take()
-Aufrufe warten.
put(42)
Das Element 42
wird im Ringpuffer gespeichert. Wir haben es also mit einem BoundedBuffer
mit dem an den Typ Integer
gebundenen Typ-Parameter T
zu tun.
Eigentlich ist das Literal 42
vom primitiven Typ int
, der bei generischen Typen in Java aber nicht verwendbar ist. Wird put(42)
aufgerufen, wird durch den sog. Auto-Boxing Mechanismus der Wrapper-Typ Integer
von int
verwendet und der ensprechende Wert von Integer
(also Integer.valueOf(42)
statt 42
) verwendet.
items.length |
8 |
Kapazität von items |
count |
1 |
aktuelle Anzahl Elemente |
takeptr |
0 |
Index nächstes take() |
putptr |
1 |
Index nächstes put() |
42
ist an Index0
vonitems
- im Moment einziges Element in
items
Nachdem die put
-Operation durchgeführt wurde, befindet sich ein Element an Index 0
von items
, putptr
ist nun 1
, denn die nächste put
-Operation soll an Index 1
passieren; der takeptr
ist noch 0
, denn die nächste take
-Operation betrifft das erste Element (an Index 0
von items
). Außerdem ist count == 1
.
put(99)
Nach einer weiteren put
-Operation ist putptr == 2
, count == 2
(da zwei Elemente im Ringpuffer gespeichert sind) und takeptr
nach wie vor 0
, da noch kein Element aus dem Ringpuffer entnommen wurde.
items.length |
8 |
Kapazität von items |
count |
2 |
aktuelle Anzahl Elemente |
takeptr |
0 |
Index nächstes take() |
putptr |
2 |
Index nächstes put() |
99
wurde als nächstes gespeichert- es wurde noch kein Element entfernt
put(80)
Nach einer weiteren put
-Operation ist putptr == 3
, count == 3
(da nun drei Elemente im Ringpuffer gespeichert sind) und takeptr
nach wie vor 0
, da noch kein Element aus dem Ringpuffer entnommen wurde.
items.length |
8 |
Kapazität von items |
count |
3 |
aktuelle Anzahl Elemente |
takeptr |
0 |
Index nächstes take() |
putptr |
3 |
Index nächstes put() |
80
wurde als nächstes gespeichert- es wurde noch kein Element entfernt
put(12)
Nach einer weiteren put
-Operation ist putptr == 4
, count == 4
(da nun vier Elemente im Ringpuffer gespeichert sind) und takeptr
nach wie vor 0
, da noch kein Element aus dem Ringpuffer entnommen wurde.
items.length |
8 |
Kapazität von items |
count |
4 |
aktuelle Anzahl Elemente |
takeptr |
0 |
Index nächstes take() |
putptr |
4 |
Index nächstes put() |
12
wurde als nächstes gespeichert- es wurde noch kein Element entfernt
take()
\(\to\) 42
Durch das erste take()
in diesem Beispiellauf wird der putptr
nicht verändert. Der takeptr
wird hingegen weiterbewegt auf 1
. Dabei wird das Element, das auf der Position des vorigen Wertes von takeptr
, also items[0]
mit dem Wert 42
zurückgeliefert und aus dem Ringpuffer entfernt. Die Anzahl der dort gespeicherten Elemente count
ist danach um 1
geringer als vorher, nämlich 3
.
items.length |
8 |
Kapazität von items |
count |
3 |
aktuelle Anzahl Elemente |
takeptr |
1 |
Index nächstes take() |
putptr |
4 |
Index nächstes put() |
- das erste Element wird gelöscht
- und als Ergebnis zurückgeliefert
put(20)
Als nächstes wird 20
im Ringpuffer gespeichert. Der putptr
bewegt sich weiter während der takeptr
stehenbleibt.
items.length |
8 |
Kapazität von items |
count |
4 |
aktuelle Anzahl Elemente |
takeptr |
1 |
Index nächstes take() |
putptr |
5 |
Index nächstes put() |
20
wird gespeichert undputptr
wird weiterbewegttakeptr
bleibt unverändert
put(14)
Als nächstes wird 14
im Ringpuffer gespeichert. Der putptr
bewegt sich weiter während der takeptr
stehenbleibt.
items.length |
8 |
Kapazität von items |
count |
5 |
aktuelle Anzahl Elemente |
takeptr |
1 |
Index nächstes take() |
putptr |
6 |
Index nächstes put() |
14
wird gespeichert undputptr
wird weiterbewegttakeptr
bleibt unverändert
put(44)
Als nächstes wird 44
im Ringpuffer gespeichert. Der putptr
bewegt sich weiter während der takeptr
stehenbleibt.
items.length |
8 |
Kapazität von items |
count |
6 |
aktuelle Anzahl Elemente |
takeptr |
1 |
Index nächstes take() |
putptr |
7 |
Index nächstes put() |
44
wird gespeichert undputptr
wird weiterbewegttakeptr
bleibt unverändert
put(75)
Als nächstes wird 75
im Ringpuffer gespeichert. Der putptr
bewegt sich weiter während der takeptr
stehenbleibt. Der putptr
wird hierbei aber von 7
auf 0
weiterbewegt.
items.length |
8 |
Kapazität von items |
count |
7 |
aktuelle Anzahl Elemente |
takeptr |
1 |
Index nächstes take() |
putptr |
0 |
Index nächstes put() |
75
wird gespeichert undputptr
wird weiterbewegttakeptr
bleibt unverändert
put(13)
Als nächstes wird 13
im Ringpuffer gespeichert. Der putptr
bewegt sich weiter während der takeptr
stehenbleibt. Durch diese put
-Operation wird der Ringpuffer im Beispiellauf voll, da items.length == count
.
items.length |
8 |
Kapazität von items |
count |
8 |
aktuelle Anzahl Elemente |
takeptr |
1 |
Index nächstes take() |
putptr |
1 |
Index nächstes put() |
- Ringpufer ist voll (
items.length == count
) - weitere
put()
-Operationen müssen warten
take()
\(\to\) 99
Im Beispielhaften Ablauf wird nun ein Element mit take
aus dem Ringpuffer gelesen und dabei entfernt. Dafür wird in items
der Index takeptr
benutzt, der vor dem Aufruf auf 1
stand. Daher wird das Element am Index 1
als Ergebnis von take()
zurückgeliefert (also 99
). Dieses Element wird asu dem Ringpuffer entfernt und danach der Zeiger takeptr
um eins weiterbewegt, also aud 2
. Im Ringpuffer ist nun wieder für ein neues Element Platz (items.length - count
\(\to\) 1
) und Threads, die möglicherweise wegen eines put
-Versuchs warten mussten, weil der Ringpuffer vorher voll war, würden nun weiterarbeiten dürfen.
items.length |
8 |
Kapazität von items |
count |
7 |
aktuelle Anzahl Elemente |
takeptr |
2 |
Index nächstes take() |
putptr |
1 |
Index nächstes put() |
take()
entfernt Element an Index1
- schafft Platz (
items.length > count
)
put(22)
Als nächstes wird 22
an der im vorigen Schritt gerade frei gewordenen Stelle im Ringpuffer gespeichert. Der putptr
bewegt sich weiter während der takeptr
stehenbleibt.
items.length |
8 |
Kapazität von items |
count |
8 |
aktuelle Anzahl Elemente |
takeptr |
2 |
Index nächstes take() |
putptr |
2 |
Index nächstes put() |
- 22 wird im gerade frei gewordenen Element des Ringpuffers gespeichert
- Ringpuffer ist nun wieder voll
- für den Ringpuffer gibt es zwei Bedingungen:
- “Ringpuffer voll”
\(\to\)count == items.length
\(\to\)put()
muss warten - “Ringpuffer leer”
\(\to\)count == 0
\(\to\)take()
muss warten
- “Ringpuffer voll”
wait
/notifyAll
für Ringpuffer
- Projekt:
pp.03.02-BoundedQueueWaitNotify
- Bearbeitungszeit: 15 Minuten
- Musterlösung: 15 Minuten
- Kompatibilität: mindestens Java SE 10
Quellcode von main
in pp.BoundedBuffer
Die main
-Methode von BoundedBuffer
instantiiert einen Ringpuffer (mem
) für Integer
und erzeugt drei Threads mit p1
, p2
und c1
.
public static void main(String... args) throws InterruptedException {
var mem = new BoundedBuffer<Integer>();
var p1 = new Thread(() -> {
try {
for (var j = 1; j <= 10; j++) {
mem.put(j);
System.out.println(Thread.currentThread().getName() + ": put=" + j);
Thread.sleep(1000);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "Producer-1");
var p2 = new Thread(() -> {
try {
for (var j = 1; j <= 10; j++) {
mem.put(j);
System.out.println(Thread.currentThread().getName() + ": put=" + j);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "Producer-2");
var c1 = new Thread(() -> {
try {
for (var j = 1; j <= 20; j++) {
System.out.println(Thread.currentThread().getName() + ": taken=" + mem.take());
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "Consumer-1");
var now = System.currentTimeMillis();
p1.start();
p2.start();
c1.start();
p1.join();
p2.join();
c1.join();
System.out.println("Runtime: " + (System.currentTimeMillis() - now) + "ms");
}
Die drei Threads bekommen beim Aufruf des Thread
-Konstruktors Namen:
p1
:Producer-1
p2
:Producer-2
c1
:Consumer-1
Sie werden gestartet und im Hauptprogramm main
wird auf das Ende von allen drei Threads gewartet. Die Zeit zwischen dem Start (Zeitpunkt in Millisekunden seit 01.01.1970 00:00 Uhr wird in der Variablen now
gespeichert) und dem Ende (Zeitpunkte werden voneinander subtrahiert) wird ausgegeben.
Die drei Threads sind über den Ringpuffer mem
miteinander verbunden:
Die beiden Producer schreiben in unterschiedlicher Frequenz jeweils die Zahlen von 1
bis 10
in den Ringpuffer. Producer-1
wartet jeweils 1000 ms nach dem Schreiben bis zum nächsten put
-Aufruf. Producer-2
ist hingegen ungebremst. Der Consumer holt mit take
so schnell es geht 20
Objekte aus dem Ringpuffer mem
.
Exkurs: Erzeuger-Verbraucher Entwurfsmuster
Dies ist ein häufig verwendetes Entwurfsmuster für parallele bzw. verteilte Systeme: Prozesse, die in einer Verkettung zusammenarbeiten (“Pipes and Filters”) werden über einen gemeinsam genutzten Puffer (Warteschlange) entkoppelt (vgl. “Erzeuger-Verbraucher-Problem”). Der Erzeuger liefert den Input für den Verbraucher. Dieses architekturelle Vorgehen ermöglicht problemlos zu skalieren:
- vertikale Skalierung (scale up): Hinzufügen von weiteren Producern/Consumern, um die auf einem Rechner vorhandenen Parallelitäts-Ressourcen (z.B. Anzahl Cores) besser auszunutzen (bzw. Hinzufügen von CPU’s auf einem Rechner).
- horizontale Skalierung (scale out): Bei verteilten Systemen (also mehrere Rechner in einem Netzwerk) bezeichnet der Begriff horizontale Skalierung das Hinzufügen von zusätzlichen Rechnern.
Im Kapitel zu “Communicating Sequential Processes” wird dieses Entwurfsmuster und die Voraussetzungen und Auswirkungen vertikaler Skalierung noch einmal ausführlicher behandelt.
Koordination des Zugriffs und gegenseitige Steuerung
Da die verwendete Ringpuffer-Implementierung die fest vorgegebenen Kapazität von 8
hat, tritt mit hoher Wahrscheinlichkeit die Situation ein, dass die Regeln zur Steuerung benutzt werden müssen:
- “Ringpuffer voll”
\(\to\)count == items.length
\(\to\)put()
muss warten - “Ringpuffer leer”
\(\to\)count == 0
\(\to\)take()
muss warten
Aufgaben
- Analysieren Sie den Quellcode.
- Lassen Sie das Hauptprogramm mehrfach laufen und beobachten Sie das Verhalten. Entspricht dies der Spezifikation?
- Modifizieren Sie die Methode
put
so, dass gewartet wird, falls der Puffer voll ist (this.count == this.items.length
). Dazu musstake
alle wartenden Threads aufwecken, wenn ein Element entfernt wurde, weil im Puffer danach wieder etwas Platz ist. - Modifizieren Sie die Methode
take
so, dass gewartet wird, falls der Puffer leer ist (this.count == 0
). Dazu mussput
alle wartenden Threads aufwecken, wenn ein Element hinzugefügt wurde, weil im Puffer danach wieder etwas zu holen ist.