Musterlösungen

Laboraufgabe “CSP mit Java-Mitteln”

Die Kanäle werden als LinkedBlockingQueue-Objekte umgesetzt, die Go-Routinen als Threads.

public static void main(String... args) throws InterruptedException {
    var operand = new LinkedBlockingQueue<Integer>();
    var result = new LinkedBlockingQueue<Integer>();
    var done = new LinkedBlockingQueue<Boolean>();
    (new Thread(() -> tasker(1, 2, operand, result, done))).start();
    (new Thread(() -> add(operand, result))).start();
    done.take();
}

(Blockierendes) Lesen aus einem Kanal wird durch die Methode take simuliert, die ja den Aufrufer auch blockiert, bis ein Element in der Queue verfügbar ist.

Die LinkedBlockingQueue-Instanzen haben wie die Kanäle im Go-Programm keine Kapazitätsgrenze. Im Go-Programm wird asynchrones Schreiben verwendet (der Sender blockiert nicht bis die gesendete Nachricht aus dem Kanal abgeholt wurde). Dies kann durch put nachgeahmt werden.

static void tasker(int x, int y, BlockingQueue<Integer> o, BlockingQueue<Integer> r, BlockingQueue<Boolean> d) {
    o.offer(x);
    o.offer(y);
    try {
        System.out.println(r.take());
        d.put(true);
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
}

static void add(BlockingQueue<Integer> o, BlockingQueue<Integer> r) {
    try {
        r.put(o.take() + o.take());
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
}

Laboraufgabe “Logging in Multi-Threadumgebungen”

Aufgabe 1: Gegenseitiger Ausschluss

Die Klasse StringBuilder, die in SimpleLoggerUnsafe verwendet wird, ist im Gegensatz zu StringBuffer nicht threadsicher. Außerdem wäre es wünschenswert, dass die Properties SeverityLevel und PrintStream nicht geändert werden, während log oder flush gerade arbeiten. Damit diese vier Methoden im gegenseitigen Ausschluss arbeiten, kann synchronized oder ein anderer Lock-Mechanismus verwendet werden.

Dazu wird mit SimpleLoggerSafe eine Sub-Klasse von SimpleLoggerUnsafe erzeugt, die alle Aufrufe an die Implementierungen der Elternklasse delegiert (Aufrufe von super sowohl für die Konstruktoren, als auch für alle Methoden). Alle Methoden werden aber in der Unterklasse mit dem synchronized-Schlüsselwort ausgezeichnet, so dass gegenseitiger Ausschluss sichergestellt ist.

public class SimpleLoggerSafe extends SimpleLoggerUnsafe
        implements Logging, AutoCloseable {

    public SimpleLoggerSafe() {
        super();
    }

    public SimpleLoggerSafe(Severity level) {
        super(level);
    }

    public SimpleLoggerSafe(Severity level, PrintStream out) {
        super(level, out);
    }

    @Override
    public synchronized void log(Severity level, String msg) {
        super.log(level, msg);
    }

    @Override
    public synchronized void flush() {
        super.flush();
    }

    @Override
    public synchronized void setSeverityLevel(Severity level) {
        super.setSeverityLevel(level);
    }

    @Override
    public synchronized void setPrintStream(PrintStream out) {
        super.setPrintStream(out);
    }

    @Override
    public synchronized void close() throws Exception {
        super.close();
    }

    @Override
    public synchronized Severity getSeverityLevel() {
        return super.getSeverityLevel();
    }
}

Aufgabe 2: threadlokale SimpleLoggerUnsafe-Instanzen

Die Variable logger soll den Zugriff auf eine threadlokale Instanz von SimpleLoggerUnsafe ermöglichen. Dazu muss ein ThreadLocal-Objekt als Wrapper erzeugt werden. Die Factory-Methode withInitial erzeugt den Wrapper und führt die übergebene Methode (single abstract method eines Objekts, dessen Klasse das Interface Supplier implementiert) zur Initialisierung aus.

private ThreadLocal<Logging> logger = ThreadLocal.withInitial(() -> {
        /* ... */
});

Das Ergebnis der Methode muss ein neues Objekt vom Typ SimpleLoggerUnsafe sein:

return new SimpleLoggerUnsafe(Logging.Severity.Warning, out);

Dafür muss vorher noch der PrintStream out vorbereitet werden:

try {
    // jeder Thread bekommt garantiert seinen eigenen PrintStream
    out = new PrintStream(Thread.currentThread().threadId() + ".log");
} catch (FileNotFoundException e) {
    // Abbruch mit Runtime statt Checked Exception:
    throw new RuntimeException(e);
}

Damit bekommt jeder Thread garantiert seinen eigenen PrintStream. Das ist auch notwendig, da sonst keine Koordination zwischen den Log-Ausgaben unterschiedlicher Threads vorgenommen wird. Etwaige Namenskollisionen von Log-Files gleich-benannter (threadId()) Threads unterschiedlicher JVM-Prozesse werden hier nicht berücksichtigt.

Damit diese Trennung wirkungsvoll ist, muss dafür gesorgt werden, dass der PrintStream des Loggers nicht nach der Initialisierung geändert werden kann:

@Override
public void setPrintStream(PrintStream out) {
    // Threads könnten denselben PrintStream verwenden. Damit das nicht
    // passiert, wird diese Methode "abgeschaltet".
    throw new RuntimeException(new OperationNotSupportedException());
}

Anmerkung:

Die Verwendung von Runtime Errors statt Checked Exceptions hierbei und bei der Initialisierung des PrintStream resultieren aus dem gemeinsamen Interface.

Aufgabe 3: Asynchrones Logging mit BlockingQueue-Instanzen

Das eigentliche Logging soll von einer nicht threadsicheren Instanz von SimpleLoggerUnsafe übernommen werden. Dazu wird eine Instanzvariable deklariert und (im Konstruktor - s.u.) initialisiert:

private final Logging logger;

Für die Queues müssen Instanzvariablen deklariert und mit leeren LinkedBlockingQueues initialisiert werden. Dabei ist zu beachten, dass es eine Queue für jedes Severity-Level gibt:

private final BlockingQueue<String> queueDebug;
    private final BlockingQueue<String> queueInfo;
    private final BlockingQueue<String> queueNotice;
    private final BlockingQueue<String> queueWarning;
    private final BlockingQueue<String> queueError;
    private final BlockingQueue<String> queueCritical;
    private final BlockingQueue<String> queueAlert;
    private final BlockingQueue<String> queueEmergency;

    public QueuedLogger() {
        logger = new SimpleLoggerUnsafe();
        queueDebug = new LinkedBlockingQueue<>();
        queueInfo = new LinkedBlockingQueue<>();
        queueNotice = new LinkedBlockingQueue<>();
        queueWarning = new LinkedBlockingQueue<>();
        queueError = new LinkedBlockingQueue<>();
        queueCritical = new LinkedBlockingQueue<>();
        queueAlert = new LinkedBlockingQueue<>();
        queueEmergency = new LinkedBlockingQueue<>();
        /* ... */
    }

In der run-Methode muss laufend geprüft werden, ob zu loggende Nachrichten in den Queues vorliegen. In Go gibt es dafür die select-Schleife. In Java muss das nachgebildet werden. Mit poll kann nicht-blockierend geprüft werden, ob Nachrichten in den Queues vorliegen. Etwaige Nachrichten werden über die nicht threadsichere private Instanz von SimpleLoggerUnsafe geloggt. Da auf die Instanz von SimpleLoggerUnsafe nur aus dieser run-Methode zugegriffen wird, kann man sich sicher sein, dass nur aus dem QueuedLogger-Thread - also nicht “multi-threaded” zugegriffen wird. Die “Unsafe”-Variante ist also ausreichend.

@Override
public void run() {
    while (true) {
        var msg = "";
        if ((msg = this.queueEmergency.poll()) != null) {
            logger.log(Severity.Emergency, msg);
        } else if ((msg = this.queueAlert.poll()) != null) {
            logger.log(Severity.Alert, msg);
        } else if ((msg = this.queueCritical.poll()) != null) {
            logger.log(Severity.Critical, msg);
        } else if ((msg = this.queueError.poll()) != null) {
            logger.log(Severity.Error, msg);
        } else if ((msg = this.queueWarning.poll()) != null) {
            logger.log(Severity.Warning, msg);
        } else if ((msg = this.queueNotice.poll()) != null) {
            logger.log(Severity.Notice, msg);
        } else if ((msg = this.queueInfo.poll()) != null) {
            logger.log(Severity.Info, msg);
        } else if ((msg = this.queueDebug.poll()) != null) {
            logger.log(Severity.Debug, msg);
        }
    }
}

Die Priorität der Severity-Levels wird durch die Reihenfolge der Kanäle im select-Ausdruck abgebildet.

Die Methode log ist in dieser Implementierung nicht dafür da, selber zu loggen, sondern schreibt zu loggende Nachrichten in die entsprechende BlockingQueue, aus der sie dann später durch die run-Methode abgeholt werden. Die Koordination der parallelen log-Aufrufe übernehmen die BlockingQueue-Implementierungen.

@Override
public void log(Severity level, String msg) {
    if (logger.getSeverityLevel().ordinal() <= level.ordinal()) {
        var success = false;
        while (!success) { // falls die jeweilige Queue schon voll ist
                           // (success == false) nochmal probieren
            switch (level) {
            case Debug:
                success = this.queueDebug.offer(msg);
            case Info:
                success = this.queueInfo.offer(msg);
            case Notice:
                success = this.queueNotice.offer(msg);
            case Warning:
                success = this.queueWarning.offer(msg);
            case Error:
                success = this.queueError.offer(msg);
            case Critical:
                success = this.queueCritical.offer(msg);
            case Alert:
                success = this.queueAlert.offer(msg);
            case Emergency:
                success = this.queueEmergency.offer(msg);
            }
        }
    }
}