Musterlösungen

Laboraufgabe “in-situ Filterung als divide and conquer

Methode “compute()”

  • Fallunterscheidung zum Erkennen der “fork”-Bedingung
if ((this.end - this.start) <= SLICE_LEN) {
  • Aktion für Rekursionsanker
for (var i = this.start; i < this.end; i++) {
    if (this.array.get(i) > MAX) {
        this.array.set(i, MAX);
    }
}

Es bietet sich an eine for-Schleife statt eines Iterators bzw. enhanced for loop anzuwenden, da die Benutzung eines Iterators zu ConcurrentModificationException führen kann.

Hauptprogramm

  1. Array initialisieren
var array = new ArrayList<Integer>();
for (var i = 0; i < ARRAY_LEN; i++) {
    array.add(i + 1);
}
  1. initialen FilterTask erzeugen
var task = new FilterTask(array, 0, array.size());
  1. FilterTask im Common Thread Pool starten
ForkJoinPool.commonPool().invoke(task);

alternativ (ab Java 8): task.invoke()

  1. gefiltertes Array ausgeben
for (var number : array) {
    System.out.print(number + " ");
}
  • Wie oft wird bei 16 Elementen geforkt?

3x

Laboraufgabe “Reduce als divide and conquer

Summierungsfunktion

  1. Die Klasse ReduceTask muss von RecursiveTask<Integer> erben.
public class ReduceTask extends RecursiveTask<Integer> {
  1. Wie bei FilterTask werden die Instanzvariablen array, start und end benutzt:
private final ArrayList<Integer> array;
private final int start;
private final int end;

ReduceTask(ArrayList<Integer> array, int start, int end) {
    this.array = array;
    this.start = start;
    this.end = end;
}
  1. Wieder muss compute() überschrieben werden. Nun gibt es aber Integer als Rückgabewert (Typ-Parameter von Oberklasse):
@Override
protected Integer compute() {
  1. Die Elemente werden in der lokalen Variable sum aufaddiert:
var sum = 0;
if ( //...
    //...
        sum += this.array.get(i);
    //...
} else {
    //...
    sum = left.join() + right.join();
}
return sum;
  1. Rekursionsanker wie bei FilterTask:
if ((this.end - this.start) <= SLICE_LEN) {
  1. und es wird im Anker-Fall wieder mit einer for-Schleife über den Array-Abschnitt iteriert:
for (var i = this.start; i < this.end; i++) {
  1. Im Rekursionsfall wird das Array wie bei FilterTask halbiert und rekursiv bearbeitet:
var mid = this.start + ((this.end - this.start) / 2);
var left = new ReduceTask(this.array, this.start, mid);
var right = new ReduceTask(this.array, mid, this.end);
left.fork();
right.fork();
  1. nun gibt es aber bei left und right Ergebnisse, auf die gewartet werden muss und die dann miteinander verrechnet werden können (Addition):
sum = left.join() + right.join();
  1. Das Hauptprogramm ist ganz analog zu dem von FilterTask, außer dass nun das Ergebnis des “obersten” Task-Aufrufs abgeholt und ausgegeben wird:
var task = new ReduceTask(array, 0, array.size());
var sum = ForkJoinPool.commonPool().submit(task).join();
System.out.println("Summe: " + sum);

Performance-Messung (pp.ReduceTaskExperiment)

Die Variante des Programms in ReduceTaskExperiment beinhaltet Funktionen zur Messung der abhängigen Variable:

public class ReduceTaskExperiment extends RecursiveTask<Integer> {

    private static int ARRAY_LEN = 0;
    private static int SLICE_LEN = 0;

    private final ArrayList<Integer> array;
    private final int start;
    private final int end;

    ReduceTaskExperiment(ArrayList<Integer> array, int start, int end) {
        this.array = array;
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        var sum = 0;
        if ((this.end - this.start) <= SLICE_LEN) {
            for (var i = this.start; i < this.end; i++) {
                sum += this.array.get(i);
            }
        } else {
            var mid = this.start + ((this.end - this.start) / 2);
            var left = new ReduceTaskExperiment(this.array, this.start, mid);
            var right = new ReduceTaskExperiment(this.array, mid, this.end);
            left.fork();
            right.fork();
            sum = left.join() + right.join();
        }
        return sum;
    }

    public static void main(String... args) {
        for (ARRAY_LEN = 100; ARRAY_LEN < 10000000; ARRAY_LEN = ARRAY_LEN
                * 10) {
            for (SLICE_LEN = 4; SLICE_LEN < 128; SLICE_LEN = SLICE_LEN * 2) {
                var array = new ArrayList<Integer>();
                for (var i = 0; i < ARRAY_LEN; i++) {
                    array.add(i + 1);
                }
                var now = System.currentTimeMillis();
                var task = new ReduceTaskExperiment(array, 0, array.size());
                var sum = ForkJoinPool.commonPool().submit(task).join();
                System.out.printf(
                        "ARRAY_LEN: %d, SLICE_LEN: %d, Summe: %d, Zeit: %d \n",
                        ARRAY_LEN, SLICE_LEN, sum,
                        System.currentTimeMillis() - now);
            }
        }
    }

}

Ein beispielhaftes Ergebnis eines Experiments ist in der folgenden Tabelle enthalten:

eigene Darstellung

eigene Darstellung

Generische Lösung

In pp.ReduceTaskGeneric ist eine generische Lösung enthalten.

public class ReduceTaskGeneric<T> extends RecursiveTask<T> {
    // Der Typ der Elemente im Container ist T (statt Integer).
    // Dieser Typ wird Parameter von ReduceTaskGeneric.

    private static int ARRAY_LEN = 16;
    private static int SLICE_LEN = 4;

    private final ArrayList<T> array;
    private final int start;
    private final int end;

    // generische Lösung: statt + wird die BiFunction aggregateFunc benutzt
    // aggregateFunc ist ein Objekt, das die Funktion
    // T apply(T x, T y)
    // beinhaltet. Sie wird später statt + aufgerufen
    private final BiFunction<T, T, T> aggregFun;
    // als Start wird ein Ersatz für 0 benötigt. Dieser initiale Wert muss vom
    // Typ T
    // sein
    private final T initAcc;

    ReduceTaskGeneric(BiFunction<T, T, T> aggregFun, T initAcc,
            ArrayList<T> array, int start, int end) {
        this.array = array;
        this.start = start;
        this.end = end;
        // generische Lösung: aggregFun und initAcc müssen über den Konstruktor
        // übergeben werden
        this.aggregFun = aggregFun;
        this.initAcc = initAcc;
    }

    @Override
    protected T compute() { // satt Integer T als Rückgabetyp
        var accumulator = this.initAcc; // statt 0 initAcc
        if ((this.end - this.start) <= SLICE_LEN) {
            for (var i = this.start; i < this.end; i++) {
                // statt + aggregFun.apply
                accumulator = this.aggregFun.apply(accumulator,
                        this.array.get(i));
            }
        } else {
            var mid = this.start + ((this.end - this.start) / 2);
            var left = new ReduceTaskGeneric<>(this.aggregFun, this.initAcc,
                    this.array, this.start, mid);
            var right = new ReduceTaskGeneric<>(this.aggregFun, this.initAcc,
                    this.array, mid, this.end);
            left.fork();
            right.fork();

            // statt + aggregFun.apply
            accumulator = this.aggregFun.apply(left.join(), right.join());
        }
        return accumulator;
    }

    public static void main(String... args) {
        var array = new ArrayList<Integer>();
        for (var i = 0; i < ARRAY_LEN; i++) {
            array.add(i + 1);
        }
        // Hier wird die aggregFun als Lambda-Ausdruck übergeben: (a, b) -> a +
        // b
        // Der Typ-Parameter von ReduceTaskGeneric wird dementsprechend auf
        // Integer
        // festgelegt.
        // Der initiale Wert des +-Akkumulators ist 0
        var task = new ReduceTaskGeneric<>(//
                ((a, b) -> a + b), // Hier wird die aggregFun als
                                   // Lambda-Ausdruck übergeben: +
                0, // Der initiale Wert des +-Akkumulators ist 0.
                array, 0, array.size());
        ForkJoinPool.commonPool().invoke(task);
        // seit Java 8 alternativ: task.invoke();
        var sum = task.join();
        System.out.println("Summe: " + sum);
    }
}

Größe Threadpool

In pp.ReduceTaskThreadsNumber ist eine Lösung enthalten. Hier wird die Größe von SLICE_LEN auf einen Wert gesetzt, der abhängig von der Größe des Common Thread Pool ist.

private static int SLICE_LEN = ARRAY_LEN / ForkJoinPool.commonPool().getParallelism();