pp.03.02-BoundedQueueWaitNotify

Anwendungsbeispiel Ringpuffer

In diesem Anwendungsbeispiel wird die bedingungsabhängige Steuerung von Threads am Beispiel eines Ringpuffers demonstriert. Solch ein Ringpuffer (engl. “Bounded Buffer”) hat eine begrenzte Kapazität und unterstützt mindestens zwei Operationen: Elemente hinzufügen (put) sowie holen mit gleichzeitigem Entfernen (take).

Es gibt zwei Bedingungen, die einen Zusammenhang mit der Thread-Steuerung haben:

  • Element holen mit gleichzeitigem Entfernen (take), wenn der Ringpuffer leer ist: Der Thread, der die take-Methode aufruft wird so lange blockiert, bis ein anderer Thread mit put ein neues Element im Ringpuffer gespeichert hat. Ist dieses neue Element im Ringpuffer vorhanden, wird die Blockierung beendet und dieser Thread liefert dann das neue Element als Ergebnis des take-Aufrufs und entfernt dieses Element gleichzeitig aus dem Ringpuffer.
  • Hinzufügen eines Elements (put), wenn der Ringpuffer voll ist: Der Thread, der die put-Methode aufruft wird so lange blockiert, bis ein anderer Thread mit take ein Element aus dem Ringpuffer gelöscht hat und damit wieder Platz geschaffen wurde. Ist das Element aus dem Ringpuffer entfernt, wird die Blockierung beendet und dieser Thread kann dann das neue Element in den Ringpuffer speichern.

Unabhängig von dieser anwendungsspezifischen gegenseitigen Steuerung ist die Verwendung des Ringpuffers aus mehreren Threads heraus ein kritischer Abschnitt, der gegenseitigen Ausschluss bedingt.

Die gespeicherten Elemente sind konzeptionell in einem Kreis angeordnet und es werden zwei Positionen verwaltet:

  • Der Index, an dem das nächste Element mit put hinzugefügt wird (putptr). Nachdem ein Element hinzugefügt wurde, wird dieser Index um eins weiter vor geschoben.
  • Der Index, von dem das nächste Element mit take gelesen und entfernt wird (takeptr). Nachdem ein Element gelesen und entfernt wurde, wird der Index um eins weiter vor geschoben.

Diese beiden Indizes bewegen sich unabhängig voneinander, aber der Lese-/Entnahmeindex (takeptr) kann den Einfügeindex (putptr) aufgrund der ersten Bedingung nicht überholen. Ebenso kann aufgrund der zweiten Bedingung der Einfügeindex (putptr) den Lese-/Entnahmeindex (takeptr) nicht überholen.

Quellcode von pp.BoundedBuffer

Dieser Quellcode funktioniert noch nicht bei Nebenläufigkeit, denn die Bedingungen und die gegenseitige Steuerung sind hier noch nicht umgesetzt. In dieser Übungsaufgabe werden Sie dieses Programm so modifizieren, dass Nebenläufigkeit berücksichtigt wird.

package pp;

public class BoundedBuffer<T> {

    final Object[] items = new Object[8];
    int putptr, takeptr, count;

    public void put(T x) throws InterruptedException {
        this.items[this.putptr] = x;
        if (++this.putptr == this.items.length) {
            this.putptr = 0;
        }
        ++this.count;
    }

    public T take() throws InterruptedException {
        var x = (T) this.items[this.takeptr];
        if (++this.takeptr == this.items.length) {
            this.takeptr = 0;
        }
        --this.count;
        return x;
    }
}

Beispielhafter Ablauf

Im folgenden beispielhaften Ablauf der Benutzung des Ringpuffers wird das korrekte Verhalten bei Nebenläufigkeit durchgespielt. Die Steuerung der Threads ist aber noch nicht im oben angegebenen Quellcode von pp.BoundedBuffer enthalten.

Start

Der Ringpuffer in diesem Beispiel hat die Kapazität 8. Er kann 8 also Elemente speichern. Intern wird in unserer Implementierung ein Array für 8 Objekte benutzt. Diese Ringpufferimplementierung verwendet Java-Generics, man kann den Inhaltstyp also einschränken.

Im initialen Zustand ist das Array leer; putptr und takeptr weisen auf den Index 0, also das erste Element des Puffers. Es sind noch keine Elemente enthalten, also ist count == 0.

items.length 8 Kapazität von items
count 0 aktuelle Anzahl Elemente
takeptr 0 Index nächstes take()
putptr 0 Index nächstes put()
  • Ringpuffer ist “leer” (count == 0)
  • Solange dies so ist, müssen take()-Aufrufe warten.

Hettel und Tran (2016, 65 (modifiziert))

Hettel und Tran (2016, 65 (modifiziert))

put(42)

Das Element 42 wird im Ringpuffer gespeichert. Wir haben es also mit einem BoundedBuffer mit dem an den Typ Integer gebundenen Typ-Parameter T zu tun.

Eigentlich ist das Literal 42 vom primitiven Typ int, der bei generischen Typen in Java aber nicht verwendbar ist. Wird put(42) aufgerufen, wird durch den sog. Auto-Boxing Mechanismus der Wrapper-Typ Integer von int verwendet und der ensprechende Wert von Integer (also Integer.valueOf(42) statt 42) verwendet.

items.length 8 Kapazität von items
count 1 aktuelle Anzahl Elemente
takeptr 0 Index nächstes take()
putptr 1 Index nächstes put()
  • 42 ist an Index 0 von items
  • im Moment einziges Element in items

Nachdem die put-Operation durchgeführt wurde, befindet sich ein Element an Index 0 von items, putptr ist nun 1, denn die nächste put-Operation soll an Index 1 passieren; der takeptr ist noch 0, denn die nächste take-Operation betrifft das erste Element (an Index 0 von items). Außerdem ist count == 1.

Hettel und Tran (2016, 65 (modifiziert))

Hettel und Tran (2016, 65 (modifiziert))

put(99)

Nach einer weiteren put-Operation ist putptr == 2, count == 2 (da zwei Elemente im Ringpuffer gespeichert sind) und takeptr nach wie vor 0, da noch kein Element aus dem Ringpuffer entnommen wurde.

items.length 8 Kapazität von items
count 2 aktuelle Anzahl Elemente
takeptr 0 Index nächstes take()
putptr 2 Index nächstes put()
  • 99 wurde als nächstes gespeichert
  • es wurde noch kein Element entfernt

Hettel und Tran (2016, 65 (modifiziert))

Hettel und Tran (2016, 65 (modifiziert))

put(80)

Nach einer weiteren put-Operation ist putptr == 3, count == 3 (da nun drei Elemente im Ringpuffer gespeichert sind) und takeptr nach wie vor 0, da noch kein Element aus dem Ringpuffer entnommen wurde.

items.length 8 Kapazität von items
count 3 aktuelle Anzahl Elemente
takeptr 0 Index nächstes take()
putptr 3 Index nächstes put()
  • 80 wurde als nächstes gespeichert
  • es wurde noch kein Element entfernt

Hettel und Tran (2016, 65 (modifiziert))

Hettel und Tran (2016, 65 (modifiziert))

put(12)

Nach einer weiteren put-Operation ist putptr == 4, count == 4 (da nun vier Elemente im Ringpuffer gespeichert sind) und takeptr nach wie vor 0, da noch kein Element aus dem Ringpuffer entnommen wurde.

items.length 8 Kapazität von items
count 4 aktuelle Anzahl Elemente
takeptr 0 Index nächstes take()
putptr 4 Index nächstes put()
  • 12 wurde als nächstes gespeichert
  • es wurde noch kein Element entfernt

Hettel und Tran (2016, 65 (modifiziert))

Hettel und Tran (2016, 65 (modifiziert))

take() \(\to\) 42

Durch das erste take() in diesem Beispiellauf wird der putptr nicht verändert. Der takeptr wird hingegen weiterbewegt auf 1. Dabei wird das Element, das auf der Position des vorigen Wertes von takeptr, also items[0] mit dem Wert 42 zurückgeliefert und aus dem Ringpuffer entfernt. Die Anzahl der dort gespeicherten Elemente count ist danach um 1 geringer als vorher, nämlich 3.

items.length 8 Kapazität von items
count 3 aktuelle Anzahl Elemente
takeptr 1 Index nächstes take()
putptr 4 Index nächstes put()
  • das erste Element wird gelöscht
  • und als Ergebnis zurückgeliefert

Hettel und Tran (2016, 65 (modifiziert))

Hettel und Tran (2016, 65 (modifiziert))

put(20)

Als nächstes wird 20 im Ringpuffer gespeichert. Der putptr bewegt sich weiter während der takeptr stehenbleibt.

items.length 8 Kapazität von items
count 4 aktuelle Anzahl Elemente
takeptr 1 Index nächstes take()
putptr 5 Index nächstes put()
  • 20 wird gespeichert und putptr wird weiterbewegt
  • takeptr bleibt unverändert

Hettel und Tran (2016, 65 (modifiziert))

Hettel und Tran (2016, 65 (modifiziert))

put(14)

Als nächstes wird 14 im Ringpuffer gespeichert. Der putptr bewegt sich weiter während der takeptr stehenbleibt.

items.length 8 Kapazität von items
count 5 aktuelle Anzahl Elemente
takeptr 1 Index nächstes take()
putptr 6 Index nächstes put()
  • 14 wird gespeichert und putptr wird weiterbewegt
  • takeptr bleibt unverändert

Hettel und Tran (2016, 65 (modifiziert))

Hettel und Tran (2016, 65 (modifiziert))

put(44)

Als nächstes wird 44 im Ringpuffer gespeichert. Der putptr bewegt sich weiter während der takeptr stehenbleibt.

items.length 8 Kapazität von items
count 6 aktuelle Anzahl Elemente
takeptr 1 Index nächstes take()
putptr 7 Index nächstes put()
  • 44 wird gespeichert und putptr wird weiterbewegt
  • takeptr bleibt unverändert

Hettel und Tran (2016, 65 (modifiziert))

Hettel und Tran (2016, 65 (modifiziert))

put(75)

Als nächstes wird 75 im Ringpuffer gespeichert. Der putptr bewegt sich weiter während der takeptr stehenbleibt. Der putptr wird hierbei aber von 7 auf 0 weiterbewegt.

items.length 8 Kapazität von items
count 7 aktuelle Anzahl Elemente
takeptr 1 Index nächstes take()
putptr 0 Index nächstes put()
  • 75 wird gespeichert und putptr wird weiterbewegt
  • takeptr bleibt unverändert

Hettel und Tran (2016, 65 (modifiziert))

Hettel und Tran (2016, 65 (modifiziert))

put(13)

Als nächstes wird 13 im Ringpuffer gespeichert. Der putptr bewegt sich weiter während der takeptr stehenbleibt. Durch diese put-Operation wird der Ringpuffer im Beispiellauf voll, da items.length == count.

items.length 8 Kapazität von items
count 8 aktuelle Anzahl Elemente
takeptr 1 Index nächstes take()
putptr 1 Index nächstes put()
  • Ringpufer ist voll (items.length == count)
  • weitere put()-Operationen müssen warten

Hettel und Tran (2016, 65 (modifiziert))

Hettel und Tran (2016, 65 (modifiziert))

take() \(\to\) 99

Im Beispielhaften Ablauf wird nun ein Element mit take aus dem Ringpuffer gelesen und dabei entfernt. Dafür wird in items der Index takeptr benutzt, der vor dem Aufruf auf 1 stand. Daher wird das Element am Index 1 als Ergebnis von take() zurückgeliefert (also 99). Dieses Element wird asu dem Ringpuffer entfernt und danach der Zeiger takeptrum eins weiterbewegt, also aud 2. Im Ringpuffer ist nun wieder für ein neues Element Platz (items.length - count \(\to\) 1) und Threads, die möglicherweise wegen eines put-Versuchs warten mussten, weil der Ringpuffer vorher voll war, würden nun weiterarbeiten dürfen.

items.length 8 Kapazität von items
count 7 aktuelle Anzahl Elemente
takeptr 2 Index nächstes take()
putptr 1 Index nächstes put()
  • take() entfernt Element an Index 1
  • schafft Platz (items.length > count)

Hettel und Tran (2016, 65 (modifiziert))

Hettel und Tran (2016, 65 (modifiziert))

put(22)

Als nächstes wird 22 an der im vorigen Schritt gerade frei gewordenen Stelle im Ringpuffer gespeichert. Der putptr bewegt sich weiter während der takeptr stehenbleibt.

items.length 8 Kapazität von items
count 8 aktuelle Anzahl Elemente
takeptr 2 Index nächstes take()
putptr 2 Index nächstes put()
  • 22 wird im gerade frei gewordenen Element des Ringpuffers gespeichert
  • Ringpuffer ist nun wieder voll
  • für den Ringpuffer gibt es zwei Bedingungen:
    • “Ringpuffer voll”
      \(\to\) count == items.length
      \(\to\) put() muss warten
    • “Ringpuffer leer”
      \(\to\) count == 0
      \(\to\) take() muss warten

Hettel und Tran (2016, 65 (modifiziert))

Hettel und Tran (2016, 65 (modifiziert))

wait/notifyAll für Ringpuffer

  • Projekt: pp.03.02-BoundedQueueWaitNotify
  • Bearbeitungszeit: 15 Minuten
  • Musterlösung: 15 Minuten
  • Kompatibilität: mindestens Java SE 10

Quellcode von main in pp.BoundedBuffer

Die main-Methode von BoundedBuffer instantiiert einen Ringpuffer (mem) für Integer und erzeugt drei Threads mit p1, p2 und c1.

public static void main(String... args) throws InterruptedException {
    var mem = new BoundedBuffer<Integer>();
    var p1 = new Thread(() -> {
        try {
            for (var j = 1; j <= 10; j++) {
                mem.put(j);
                System.out.println(Thread.currentThread().getName() + ": put=" + j);
                Thread.sleep(1000);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }, "Producer-1");
    var p2 = new Thread(() -> {
        try {
            for (var j = 1; j <= 10; j++) {
                mem.put(j);
                System.out.println(Thread.currentThread().getName() + ": put=" + j);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }, "Producer-2");
    var c1 = new Thread(() -> {
        try {
            for (var j = 1; j <= 20; j++) {
                System.out.println(Thread.currentThread().getName() + ": taken=" + mem.take());
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }, "Consumer-1");
    var now = System.currentTimeMillis();
    p1.start();
    p2.start();
    c1.start();
    p1.join();
    p2.join();
    c1.join();
    System.out.println("Runtime: " + (System.currentTimeMillis() - now) + "ms");
}

Die drei Threads bekommen beim Aufruf des Thread-Konstruktors Namen:

  • p1: Producer-1
  • p2: Producer-2
  • c1: Consumer-1

Sie werden gestartet und im Hauptprogramm main wird auf das Ende von allen drei Threads gewartet. Die Zeit zwischen dem Start (Zeitpunkt in Millisekunden seit 01.01.1970 00:00 Uhr wird in der Variablen now gespeichert) und dem Ende (Zeitpunkte werden voneinander subtrahiert) wird ausgegeben.

Die drei Threads sind über den Ringpuffer mem miteinander verbunden:

Hettel und Tran (2016, 66 (modifiziert))

Hettel und Tran (2016, 66 (modifiziert))

Die beiden Producer schreiben in unterschiedlicher Frequenz jeweils die Zahlen von 1 bis 10 in den Ringpuffer. Producer-1 wartet jeweils 1000 ms nach dem Schreiben bis zum nächsten put-Aufruf. Producer-2 ist hingegen ungebremst. Der Consumer holt mit take so schnell es geht 20 Objekte aus dem Ringpuffer mem.

Exkurs: Erzeuger-Verbraucher Entwurfsmuster

Dies ist ein häufig verwendetes Entwurfsmuster für parallele bzw. verteilte Systeme: Prozesse, die in einer Verkettung zusammenarbeiten (“Pipes and Filters”) werden über einen gemeinsam genutzten Puffer (Warteschlange) entkoppelt (vgl. “Erzeuger-Verbraucher-Problem”). Der Erzeuger liefert den Input für den Verbraucher. Dieses architekturelle Vorgehen ermöglicht problemlos zu skalieren:

  • vertikale Skalierung (scale up): Hinzufügen von weiteren Producern/Consumern, um die auf einem Rechner vorhandenen Parallelitäts-Ressourcen (z.B. Anzahl Cores) besser auszunutzen (bzw. Hinzufügen von CPU’s auf einem Rechner).
  • horizontale Skalierung (scale out): Bei verteilten Systemen (also mehrere Rechner in einem Netzwerk) bezeichnet der Begriff horizontale Skalierung das Hinzufügen von zusätzlichen Rechnern.

Im Kapitel zu “Communicating Sequential Processes” wird dieses Entwurfsmuster und die Voraussetzungen und Auswirkungen vertikaler Skalierung noch einmal ausführlicher behandelt.

Koordination des Zugriffs und gegenseitige Steuerung

Da die verwendete Ringpuffer-Implementierung die fest vorgegebenen Kapazität von 8 hat, tritt mit hoher Wahrscheinlichkeit die Situation ein, dass die Regeln zur Steuerung benutzt werden müssen:

  • “Ringpuffer voll”
    \(\to\) count == items.length
    \(\to\) put() muss warten
  • “Ringpuffer leer”
    \(\to\) count == 0
    \(\to\) take() muss warten

Aufgaben

  • Analysieren Sie den Quellcode.
  • Lassen Sie das Hauptprogramm mehrfach laufen und beobachten Sie das Verhalten. Entspricht dies der Spezifikation?
  • Modifizieren Sie die Methode put so, dass gewartet wird, falls der Puffer voll ist (this.count == this.items.length). Dazu muss take alle wartenden Threads aufwecken, wenn ein Element entfernt wurde, weil im Puffer danach wieder etwas Platz ist.
  • Modifizieren Sie die Methode take so, dass gewartet wird, falls der Puffer leer ist (this.count == 0). Dazu muss put alle wartenden Threads aufwecken, wenn ein Element hinzugefügt wurde, weil im Puffer danach wieder etwas zu holen ist.

Literatur

Hettel, Jörg und Manh Tien Tran. 2016. Nebenläufige Programmierung mit Java. Konzepte und Programmiermodelle für Multicore-Systeme. Heildelberg: dpunkt.verlag.