Algorithmen Inhaltsverzeichnis Hypertext-Version Standardpakete © APSIS GmbH

12. Nebenläufigkeit

Seite 269

12.1. Prozesse

Seite 269

Prozess 1 Prozess 2

Abb. 12.1: Nebenläufige Prozesse

12.1.1. Disjunkte und voneinander abhängige Prozesse

Seite 269

x1 = max(a1, b1); // vier Werte // (12.1)
y1 = max (c1, d1);
z1 = x1 + y1;
x2 = max(a2, b2); // vier andere Werte
y2 = max(c2, d2);
z2 = x2 - y2;
x1 = max(a1, b1);
y1 = max(c1, d1);
z1 = x1 + y1;
x2 = max(a2, b2);
y2 = max(c2, d2);
z2 = x2 - y2;

Abb. 12.2: Disjunkte Prozesse

x1 = max (a1, b1);
y1 = max (c1, 1);
z1 = x1 + y1;
x2 = max (a1, b1);
y2 = max (c1, d1);
z2 = x2 - y2;

Abb. 12.3: Unabhängige Prozesse

a = a * a;
meldung(a);
a = a / 2;
meldung(a);

Abb. 12.4: Abhängige Prozesse

Beobachter Berichterstatter
while (true)
  if (ereignis)
    zaehler ++;
while (true) {
  meldung(zaehler);
  zaehler = 0;
}

Abb. 12.5: Synchronisierung

12.1.2. Kritische Abschnitte und gegenseiter Ausschluss

Seite 270

12.1.3. Synchronisierung

Seite 271

			boolean s = true;
while (true)
  while (s) {
    kritischerAbschnitt1();
    s = false;
    unkritischerAbschnitt1();
  };
while (true)
  while (!s) {
    kritischerAbschnitt2();
    s = true;
    unkritischerAbschnitt2();
  };

Abb. 12.6: Synchronisierung über eine logische Variable

12.1.4. Semaphore

Seite 271

public class Semaphor { // (12.2)
	private int wert;
	public Semaphor(int wert) { // Anzahl zugelassener Prozesse, typischerweise 1
		this.wert = wert;
	}
	public void p() {
		if (wert > 0)
			wert --;
		else { // wert == 0
			sichInDieWarteschlangeEinreihen(); // Prozess wird gestoppt ¬
		}
	}
	public void v() {
		wert ++;
		if (!warteschlangeLeer())
			warteschlangeVerlassen(); // Prozess läuft weiter ¬
	}
}
Semaphor s = new Semaphor(1);
while (true) {
  s.p();
  kritischerAbschnitt1();
  s.v();
  unkritischerAbschnitt1();
}
while (true) {
  s.p();
  kritischerAbschnitt2();
  s.v();
  unkritischerAbschnitt2();
}

Abb. 12.7: Semaphor

		Semaphor pufferZugriff = new Semaphor(1);
while (true) {
  erzeugeNachricht();
  pufferZugriff.p();
  nachrichtInDenPuffer();
  pufferZugriff.v();
}
while (true) {
  pufferZigriff.p();
  nachrichtAusDemPuffer();
  pufferZugriff.v();
  verarbeiteNachricht();
}

Abb. 12.8: Geschützter Puffer

		Semaphor pufferLeer = new Semaphor(0), 
			pufferZugriff = new Semaphor(1);
while (true) {
  erzeugeNachricht();
  pufferZugriff.p();
  nachrichtInDenPuffer();
  pufferZugriff.v();
  pufferLeer.v();
}
while (true) {
  pufferLeer.p();
  pufferZugriff.p();
  nachrichtAusDemPuffer();
  pufferZugriff.v();
  verarbeiteNachricht();
}

Abb. 12.9: Wartebedingung

12.1.5. Nachrichtenaustausch

Seite 273

			Nachricht[] puffer = new Nachricht[groesse];
			int juengstes = 0, aeltestes = groesse-1;
			int zaehler = 0;
while (true) {
  Nachricht nachricht = new Nachricht();
  nachricht.erzeugen();
  if (zaehler <= groesse) {
    puffer[juengstes] = nachricht;
    juengstes = (juengstes+1) % groesse;
    zaehler ++;
  }
}
while (true) {
  if (zaehler != 0) {
    Nachricht nachricht = puffer[aeltestes];
    aeltestes = (aeltestes+1) % groesse;
    zaehler --;
    nachricht.verarbeiten();
  }

Abb. 12.10: Nachrichtenaustausch

12.1.6. Monitore

Seite 274

public class Ringpuffer { // (12.3)
	private Object[] puffer;
	private Semaphor zugriff, pufferLeer, pufferVoll; // binäre Semaphore
	private int juengstes, aeltestes;
	public Ringpuffer(int groesse) {
		puffer = new String[groesse];
		juengstes = puffer.length - 1;
		aeltestes = 0;
		zugriff = new Semaphor(1);
		pufferLeer = new Semaphor(groesse);
		pufferVoll = new Semaphor(groesse);
	}
	// jetzt können verschiedene Prozesse den Puffer beschreiben und lesen:
	public void eintragen(Object nachricht) {
		pufferVoll.p(); // bei vollem Puffer warten
		zugriff.p(); // bei Zugriff durch andere warten
		puffer[juengstes] = nachricht; // kritischer Abschnitt ¬
		juengstes = (juengstes+1) % puffer.length; // kritischer Abschnitt ¬
		zugriff.v(); // kritischen Abschnitt freigeben
		pufferLeer.v(); // Wartende erlösen
	}
	public Object entnehmen() { // unsauber: Mutator mit Ergebnis
		pufferLeer.p(); // bei leerem Puffer warten
		zugriff.p(); // bei Zugriff durch andere warten
		Object nachricht = puffer[aeltestes]; // kritischer Abschnitt ¬
		aeltestes = (aeltestes+1) % puffer.length; // kritischer Abschnitt ¬
		zugriff.v(); // kritischen Abschnitt freigeben
		pufferVoll.v(); // Wartende erlösen
		return nachricht;
	}
}

12.2. Nebenläufige Prozesse in Java

Seite 274

12.2.1. Prozesse und Monitore

Seite 274

class Erzeuger extends Thread { // (12.4)
	private String spezialitaet; // was der Prozess produziert
	private Ringpuffer lager; // wohin er produziert
	public Erzeuger (String produkt, Ringpuffer ziel) {
		spezialitaet = produkt;
		lager = ziel;
	}
	public void run() { // überschreiben (hier wird nebenläufig produziert)
		while (true) {
			System.out.println(spezialitaet + " produziert");
			lager.eintragen(spezialitaet);
		}
	}
}
class Verbraucher extends Thread {
	private Ringpuffer lager;
	public Verbraucher (Ringpuffer quelle) {
		lager = quelle;
	}
	public void run() { // überschreiben
		while (true) {
			String speise = (String)lager.entnehmen();
			System.out.println(speise + " verzehrt");
		}
	}
}
public class MacDonalds {
	private static Ringpuffer vorrat = new Ringpuffer(100);
	public static void main(String[] args) {
		new Erzeuger("Eis", vorrat).start(); // drei anonyme Prozesse
		new Erzeuger("Hamburger", vorrat).start();
		new Erzeuger("Pommes", vorrat).start();
		Verbraucher kunde1 = new Verbraucher(vorrat); // zwei benannte Prozesse
		kunde1.start();
		Verbraucher kunde2 = new Verbraucher(vorrat);
		kunde2.start();
		new Erzeuger("Pommes", vorrat).start();
	}
}
public class RingpufferMonitor { // (12.5)
	private Object[] puffer;
	private int juengstes, aeltestes;
	public RingpufferMonitor(int groesse) { ... // Konstruktor wie im Programm (12.3)
	public synchronized void eintragen (Object nachricht) { // ¬
		puffer[juengstes] = nachricht;
		juengstes = (juengstes+1) % puffer.length;
	}
	public synchronized void entnehmen(Object nachricht) { // ¬
		nachricht = puffer[aeltestes];
		aeltestes = (aeltestes+1) % puffer.length;
	}
}
class VerbraucherRunnable implements Runnable { // (12.6)
	... // Konstruktor wie oben
	public void run() { ... // wie oben
}
new Thread(new ErzeugerRunnable("Ketschap", vorrat)).start();
Thread Kasse3 = new Thread(new VerbraucherRunnable("Zwiebelringe", vorrat));
Kasse3.start();

Übung 12.1: Schreiben Sie die Klasse MacDonalds als Applet um, und testen Sie sie. Zum Beenden der Prozesse müssen Sie das Applet beenden.

12.2.2. Synchronisierung auf Grund von Zusatzbedingungen

Seite 276

while (! bedingung) wait(); // warten auf notify, dann erneut prüfen
public synchronized void eintragen(Object nachricht) { // (12.7)
	try {
		while (fuellstand == puffer.length) {
			System.out.println("Warten auf Eintragen");
			wait(); // warten auf notify, dann erneut prüfen ¬
	}
	catch (InterruptedException ausnahme) { return; } // verschlucken
	puffer[juengstes] = nachricht;
	fuellstand ++;
	juengstes = (juengstes + 1) % laenge;
	notifyAll(); // wartende Verbraucher können aufwachen ¬
}
public synchronized Object entnehmen() {
	try {
		while (fuellstand == 0)
			System.out.println("Warten auf Entnehmen");
			wait(); // warten auf notify, dann erneut prüfen ¬
	}
	catch (InterruptedException ausnahme) { return null; } // kann nicht auftreten
	Object nachricht = puffer[aeltestes];
	aeltestes = (aeltestes + 1) % puffer.length;
	notifyAll(); // wartende Verbraucher können aufwachen ¬
	return nachricht;
}

12.2.3. Unterbrechungen

Seite 277

prozess.interrupt();

Übung 12.2: Verändern Sie Ihr Programm aus der Übung 12.1 auf Seite 276 so, dass die Ausnahme InterruptedException aus dem Ringpuffer den jeweils rufenden Prozess beendet. Dazu müssen Sie die Klasse Ringpuffer im Sinne des Programms (12.7) auf Seite 276 verändern und dabei die Ausnahme InterruptedException nach außen reichen. In den Klassen Erzeuger und Verbraucher müssen diese Ausnahmen aufgefangen und behandelt werden.

Damit der unterbrechbare Wartezustand oft genug auftritt, sollten Sie die Größe des Ringpuffers klein wählen und vor notify jeweils eine Zeitverzögerung (sleep) einbauen.

Programmieren Sie nun einen Störprozess, der den MacDonalds-Prozessen Unterbrechungen schickt, und starten Sie ihn als letzten Prozess. Dazu ist es notwendig, die Prozessreferenzen global zu vereinbaren, statt lokal oder anonym:

public class MacDonalds { ...
	private static Erzeuger lieferant1;
		...
	private static Verbraucher kunde1;
		...

Entfernen Sie die Erzeuger- und Verbrauchermeldungen und geben Sie stattdessen für jedes wait und jede InterruptedException eine meldung aus.

12.2.4. Weitere Synchronisierungsoperationen

Seite 278


Algorithmen Inhaltsverzeichnis Hypertext-Version Standardpakete © APSIS GmbH