Musterlösungen

Laboraufgabe “Paralleles Suchen/Finden eines Suchstrings in einigen wenigen Textdateien”

neue Klasse ListenerActor

  • Die Signatur der Methode ist in der Aufgabe spezifiziert:
public class ListenerActor extends AbstractActor
  • Dies bedingt, dass die folgende Methode implementiert wird:
@Override
public Receive createReceive()
  • In dieser Methode muss ein “Match” für Objekte des Typs ResultMsg vorgesehen werden:
    .match(ResultMsg.class, /* ... */
  • Falls solch ein Objekt empfangen wird, muss sein Inhalt ausgegeben werden und das Akka-System terminiert werden (als Lambda-Ausdruck formuliert):
    (msg) -> {
        msg.result().forEach(System.out::println);
        getContext().getSystem().terminate();
    }

Konstruktor von MasterActor

  • Ein neuer Actor wird mit der Methode actorOf erzeugt. Er wird in die Hierarchie der Aktoren dadurch einsortiert, dass diese Methode im übergeordneten Kontext erzeugt wird. Das ist hier der Kontext der MasterActor-Instanz. Dieser Kontext wird durch getContext() von this (einem MasterActor) geholt.
this.listener = getContext().actorOf( /* ... */ );
  • Die actorOf-Methode bekomt hier zwei Parameter: Erstens eine Spezifikation des neuen Actors. Hier soll ein neuer ListenerActor erzeugt werden. Das komplexe Props-Framework ist ebenso wie Akka eine Entwicklung der Firma Lightbend. Nehmen Sie es hier einfach als idomatischen Gebrauch hin. Eine genauere Erklärung würde hier zu weit gehen:
Props.create(ListenerActor.class)
  • Der zweite Parameter für die actorOf-Methode ist ein symbolischer Name für den Actor, der ggf. auch zur Identifikation (anstatt des ActorRef-Handles) herangezogen werden kann. Er wird als String übergeben:
"listener"

FindMsg mit MasterActor verarbeiten

  • Dazu muss die folgende Methode ausformuliert werden. Sie wird aufgrund des ReceiveBuilders aufgerufen, wenn eine FindMsg eintrifft:
private void handleFindMsg(FindMsg msg)
  • Dazu wird der Inhalt der FindMsg analysiert:
var filenames = msg.filenames();
var searchword = msg.searchword();
  • Für jedes Element in filenames wird eine neue Nachricht erzeugt, die von Workern bearbeitet wird. Es ergibt sich für jede dieser Anfragen eine Antwort. Damit im laufenden Betrieb die Anzahl der jeweils noch zu erwartenden Antworten mitprotokolliert werden kann, wird die anfängliche Anzahl in der Instanzvariablen numOfChild gespeichert:
this.numOfChild = msg.filenames().size();
  • Dann wird für jedes Element in filenames eine neue Nachricht erzeugt, die das zu lösende Teilproblem enthält:
for (var filename : filenames) {
    var job = new WorkMsg(filename, searchword);
  • Jede dieser so erzeugten Nachrichten wird an einen jeweils neu und nur für diese Nachricht zuständigen WorkerActor gesendet (mit tell), der erst direkt zuvor erzeugt wird:
    var findActor = getContext().actorOf(Props.create(WorkerActor.class));
    findActor.tell(job, getSelf());
}

WorkerActor

  • Die Methode handleWorkMsg muss vervollständigt werden. Das Ergebnis liegt in der lokalen Variablen resultvor. Es muss in eine neue ResultMsg getan werden:
new ResultMsg(result)
  • Diese ResultMsg wird mit der Methode tell an den Absender der hier gerade verarbeiteten WorkMsg zurückgesendet. Dies ist der MasterActor. Dessen ActorRef kann mit der Methode getSender ermittelt werden. Das Akka-Framework sorgt dafür, dass dabei der Absender der Nachricht, die gerade verarbeitet wird, ermittelt wird.
getSender().tell( /* ... */ );
  • Als Absender dieser Nachricht wird der gerade arbeitende WorkerActor eingetragen (2. Parameter von tell), der mit der folgenden Methode ermittelt wird:
getSelf()

Laboraufgabe “Paralleles Suchen/Finden eines Suchstrings in sehr vielen Textdateien mit einem Router”

neue Instanzvariablen für MasterActor

  • Die folgenden nicht veränderliches (final) privaten Instanzvariablen werden in MasterActor neu eingeführt:
private final List<Routee> routees;
private final Router router;
  • Die Anzahl der zu erzeugenden Routee-Instanzen wird durch eine Konstante spezifiziert. Im Beispiel erhält Sie den Wert 5:
private static final int WORKER_NUM = 5;

Routees erzeugen

  • Die Anzahl von WORKER-NUM WorkerActor-Instanzen wird im Konstruktor von MasterActor mit einer Schleife erzeugt. Sie sollen als ArrayList in der neuen Instanzvariablen routee gespeichert werden:
this.routees = new ArrayList<>();
for (var i = 0; i < MasterActor.WORKER_NUM; i++) {
    var r = getContext().actorOf(Props.create(WorkerActor.class));
  • Da der MasterArctor diese Routees erzeugt, soll er sie auch (in seinem Kontext) im Sinne des Error Kernel Design Patterns überwachen:
    getContext().watch(r);
  • Die WorkerActor-Instanzen müssen über einen Wrapper vom Typ ActorRefRoutee zu Routees aufgewertet werden:
    this.routees.add(new ActorRefRoutee(r));
}

Router

  • Zum Schluss wird im Konstruktor von MasterActor ein Router erzeugt und in der neuen Instanzvariable router gespeichert:
this.router = new Router( /* ... */ );
  • Die Parameter sind eine Spezifikation des Routing-Verhaltens (hier Round-Robin) und die Liste der vom Router zu bedienenden Routees:
new RoundRobinRoutingLogic(), this.routees

Einbinden des Routers

  • Statt wie bisher beim Eintreffen von FindMsg alle Teilaufgaben (hier in der lokalen Laufvariablen job) zu extrahieren, für jede einen neue Worker zu erzeugen und ihm den job mit tell zu schicken …
var findActor = getContext().actorOf(Props.create(WorkerActor.class));
findActor.tell(job, getSelf());
  • … wird der job mit der Methode route an den Router übermittelt (dessen Routees ja schon im Konstruktor des MasterActor erzeugt und dem Router mitgeteilt wurden:
this.router.route(job, getSelf());
  • Da die Worker nun nicht mehr nach ihrer (Erst-) Benutzung gelöscht werden dürfen (sie bleiben ja als Routee im Verantwortungsbereich des Routers), muss handleResultMsg noch entsprechend angepasst werden. Bei jedem Eintreffen einer ResultMsg beim MasterActor muss die Anzahl der noch erwarteten Nachrichten um 1 heruntergezählt werden:
this.numOfChild--;
  • Das empfangene Teilergebnis muss nach wie vor in das Gesamtergebnis integriert werden:
this.result.addAll(msg.result());
  • Erst wenn keine Antworten mehr zu erwarten sind (numOfChild bis 0 heruntergezählt), werden alle Worker durch jeweils eine neue PleaseCleanupAndStop-Message heruntergefahren. Da die Worker als Routees in der Instanzvariablen routees zu finden sind, kann darüber iteriert werden. Da es sich hier nicht um ActorRef-Instanzen handelt (diese waren für die Speicherung in routees in ActorRefRoutee-Instanzen gewrappt worden, wird zum Senden send und nicht tell benutzt:
if (this.numOfChild == 0) {
    for (var routee : this.routees) {
        routee.send(new PleaseCleanupAndStop(), getSelf());
    }
  • Am Ende wird noch der listener wie bisher auch heruntergefahren:
    this.listener.tell(new ResultMsg(this.result), getSelf());
    getContext().stop(getSelf());
}

Ändern der Aufgabe

  • Dazu kann in Maindas lokale String-Array geändert werden:
String[] files = { "test1.txt", "test2.txt", "test3.txt", "test4.txt", 
                   "test5.txt", "test6.txt", "test7.txt", "test8.txt" };
  • Die neuen Dateien müssen in das Wurzelverzeichnis des Projekts kopiert werden, denn das ist in diesem Projekt das Arbeitsverzeichnis.

  • Im Beispiel hier ist WORKER_NUM 5 und die Anzahl der zu durchsuchenden Dateien 8. Mindestens 3 WorkMsg-Nachrichten müssen also von Routees verarbeitet werden, die mehr als eine Nachricht bekommen haben.