Musterlösungen
Laboraufgabe “CSP mit Java-Mitteln”
Die Kanäle werden als LinkedBlockingQueue
-Objekte umgesetzt, die Go-Routinen als Threads.
public static void main(String... args) throws InterruptedException {
var operand = new LinkedBlockingQueue<Integer>();
var result = new LinkedBlockingQueue<Integer>();
var done = new LinkedBlockingQueue<Boolean>();
(new Thread(() -> tasker(1, 2, operand, result, done))).start();
(new Thread(() -> add(operand, result))).start();
done.take();
}
(Blockierendes) Lesen aus einem Kanal wird durch die Methode take
simuliert, die ja den Aufrufer auch blockiert, bis ein Element in der Queue verfügbar ist.
Die LinkedBlockingQueue
-Instanzen haben wie die Kanäle im Go-Programm keine Kapazitätsgrenze. Im Go-Programm wird asynchrones Schreiben verwendet (der Sender blockiert nicht bis die gesendete Nachricht aus dem Kanal abgeholt wurde). Dies kann durch put
nachgeahmt werden.
static void tasker(int x, int y, BlockingQueue<Integer> o, BlockingQueue<Integer> r, BlockingQueue<Boolean> d) {
o.offer(x);
o.offer(y);
try {
System.out.println(r.take());
d.put(true);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
static void add(BlockingQueue<Integer> o, BlockingQueue<Integer> r) {
try {
r.put(o.take() + o.take());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
Laboraufgabe “Logging in Multi-Threadumgebungen”
Aufgabe 1: Gegenseitiger Ausschluss
Die Klasse StringBuilder
, die in SimpleLoggerUnsafe
verwendet wird, ist im Gegensatz zu StringBuffer
nicht threadsicher. Außerdem wäre es wünschenswert, dass die Properties SeverityLevel
und PrintStream
nicht geändert werden, während log
oder flush
gerade arbeiten. Damit diese vier Methoden im gegenseitigen Ausschluss arbeiten, kann synchronized
oder ein anderer Lock-Mechanismus verwendet werden.
Dazu wird mit SimpleLoggerSafe
eine Sub-Klasse von SimpleLoggerUnsafe
erzeugt, die alle Aufrufe an die Implementierungen der Elternklasse delegiert (Aufrufe von super
sowohl für die Konstruktoren, als auch für alle Methoden). Alle Methoden werden aber in der Unterklasse mit dem synchronized
-Schlüsselwort ausgezeichnet, so dass gegenseitiger Ausschluss sichergestellt ist.
public class SimpleLoggerSafe extends SimpleLoggerUnsafe
implements Logging, AutoCloseable {
public SimpleLoggerSafe() {
super();
}
public SimpleLoggerSafe(Severity level) {
super(level);
}
public SimpleLoggerSafe(Severity level, PrintStream out) {
super(level, out);
}
@Override
public synchronized void log(Severity level, String msg) {
super.log(level, msg);
}
@Override
public synchronized void flush() {
super.flush();
}
@Override
public synchronized void setSeverityLevel(Severity level) {
super.setSeverityLevel(level);
}
@Override
public synchronized void setPrintStream(PrintStream out) {
super.setPrintStream(out);
}
@Override
public synchronized void close() throws Exception {
super.close();
}
@Override
public synchronized Severity getSeverityLevel() {
return super.getSeverityLevel();
}
}
Aufgabe 2: threadlokale SimpleLoggerUnsafe
-Instanzen
Die Variable logger
soll den Zugriff auf eine threadlokale Instanz von SimpleLoggerUnsafe
ermöglichen. Dazu muss ein ThreadLocal
-Objekt als Wrapper erzeugt werden. Die Factory-Methode withInitial
erzeugt den Wrapper und führt die übergebene Methode (single abstract method eines Objekts, dessen Klasse das Interface Supplier
implementiert) zur Initialisierung aus.
Das Ergebnis der Methode muss ein neues Objekt vom Typ SimpleLoggerUnsafe
sein:
Dafür muss vorher noch der PrintStream
out
vorbereitet werden:
try {
// jeder Thread bekommt garantiert seinen eigenen PrintStream
out = new PrintStream(Thread.currentThread().threadId() + ".log");
} catch (FileNotFoundException e) {
// Abbruch mit Runtime statt Checked Exception:
throw new RuntimeException(e);
}
Damit bekommt jeder Thread garantiert seinen eigenen PrintStream
. Das ist auch notwendig, da sonst keine Koordination zwischen den Log-Ausgaben unterschiedlicher Threads vorgenommen wird. Etwaige Namenskollisionen von Log-Files gleich-benannter (threadId()
) Threads unterschiedlicher JVM-Prozesse werden hier nicht berücksichtigt.
Damit diese Trennung wirkungsvoll ist, muss dafür gesorgt werden, dass der PrintStream
des Loggers nicht nach der Initialisierung geändert werden kann:
@Override
public void setPrintStream(PrintStream out) {
// Threads könnten denselben PrintStream verwenden. Damit das nicht
// passiert, wird diese Methode "abgeschaltet".
throw new RuntimeException(new OperationNotSupportedException());
}
Anmerkung:
Die Verwendung von Runtime Errors statt Checked Exceptions hierbei und bei der Initialisierung des PrintStream
resultieren aus dem gemeinsamen Interface.
Aufgabe 3: Asynchrones Logging mit BlockingQueue
-Instanzen
Das eigentliche Logging soll von einer nicht threadsicheren Instanz von SimpleLoggerUnsafe
übernommen werden. Dazu wird eine Instanzvariable deklariert und (im Konstruktor - s.u.) initialisiert:
Für die Queues müssen Instanzvariablen deklariert und mit leeren LinkedBlockingQueues
initialisiert werden. Dabei ist zu beachten, dass es eine Queue für jedes Severity-Level gibt:
private final BlockingQueue<String> queueDebug;
private final BlockingQueue<String> queueInfo;
private final BlockingQueue<String> queueNotice;
private final BlockingQueue<String> queueWarning;
private final BlockingQueue<String> queueError;
private final BlockingQueue<String> queueCritical;
private final BlockingQueue<String> queueAlert;
private final BlockingQueue<String> queueEmergency;
public QueuedLogger() {
logger = new SimpleLoggerUnsafe();
queueDebug = new LinkedBlockingQueue<>();
queueInfo = new LinkedBlockingQueue<>();
queueNotice = new LinkedBlockingQueue<>();
queueWarning = new LinkedBlockingQueue<>();
queueError = new LinkedBlockingQueue<>();
queueCritical = new LinkedBlockingQueue<>();
queueAlert = new LinkedBlockingQueue<>();
queueEmergency = new LinkedBlockingQueue<>();
/* ... */
}
In der run
-Methode muss laufend geprüft werden, ob zu loggende Nachrichten in den Queues vorliegen. In Go gibt es dafür die select
-Schleife. In Java muss das nachgebildet werden. Mit poll
kann nicht-blockierend geprüft werden, ob Nachrichten in den Queues vorliegen. Etwaige Nachrichten werden über die nicht threadsichere private Instanz von SimpleLoggerUnsafe
geloggt. Da auf die Instanz von SimpleLoggerUnsafe
nur aus dieser run
-Methode zugegriffen wird, kann man sich sicher sein, dass nur aus dem QueuedLogger
-Thread - also nicht “multi-threaded” zugegriffen wird. Die “Unsafe”-Variante ist also ausreichend.
@Override
public void run() {
while (true) {
var msg = "";
if ((msg = this.queueEmergency.poll()) != null) {
logger.log(Severity.Emergency, msg);
} else if ((msg = this.queueAlert.poll()) != null) {
logger.log(Severity.Alert, msg);
} else if ((msg = this.queueCritical.poll()) != null) {
logger.log(Severity.Critical, msg);
} else if ((msg = this.queueError.poll()) != null) {
logger.log(Severity.Error, msg);
} else if ((msg = this.queueWarning.poll()) != null) {
logger.log(Severity.Warning, msg);
} else if ((msg = this.queueNotice.poll()) != null) {
logger.log(Severity.Notice, msg);
} else if ((msg = this.queueInfo.poll()) != null) {
logger.log(Severity.Info, msg);
} else if ((msg = this.queueDebug.poll()) != null) {
logger.log(Severity.Debug, msg);
}
}
}
Die Priorität der Severity-Levels wird durch die Reihenfolge der Kanäle im select
-Ausdruck abgebildet.
Die Methode log
ist in dieser Implementierung nicht dafür da, selber zu loggen, sondern schreibt zu loggende Nachrichten in die entsprechende BlockingQueue
, aus der sie dann später durch die run
-Methode abgeholt werden. Die Koordination der parallelen log
-Aufrufe übernehmen die BlockingQueue
-Implementierungen.
@Override
public void log(Severity level, String msg) {
if (logger.getSeverityLevel().ordinal() <= level.ordinal()) {
var success = false;
while (!success) { // falls die jeweilige Queue schon voll ist
// (success == false) nochmal probieren
switch (level) {
case Debug:
success = this.queueDebug.offer(msg);
case Info:
success = this.queueInfo.offer(msg);
case Notice:
success = this.queueNotice.offer(msg);
case Warning:
success = this.queueWarning.offer(msg);
case Error:
success = this.queueError.offer(msg);
case Critical:
success = this.queueCritical.offer(msg);
case Alert:
success = this.queueAlert.offer(msg);
case Emergency:
success = this.queueEmergency.offer(msg);
}
}
}
}