6.1.6 A model answer

import java.io.*;

public class Event implements Serializable {
    // A string for containing the event type.
    String type = "";

    // An event has data that it passes from one place to another.
    // We don't know what it is, exccept that it should be
    // serializable
    Serializable data;
    public Event(String type, Serializable data) {
	this.setType(type);
	this.setData(data);
    }

    /**
     * Get the Data value.
     * @return the Data value.
     */
    public Serializable getData() {
	return data;
    }

    /**
     * Set the Data value.
     * @param newData The new Data value.
     */
    public void setData(Serializable newData) {
	this.data = newData;
    }

    
    /**
     * Get the Type value.
     * @return the Type value.
     */
    public String getType() {
	return type;
    }

    /**
     * Set the Type value.
     * @param newType The new Type value.
     */
    public void setType(String newType) {
	this.type = newType;
    }

    public String toString() {
	return this.type + ":\n" + data;
    }
}

import java.rmi.*;
import java.util.regex.*;

public interface EventServer extends Remote {
    // To ease the pain of creating a name
    public static String rmiName = "PubSubServer";
    // Pass in the regex as a string, and a reference to the Remote
    // interface Returns an ineger label for referring to this
    // particular subscription thereafter
    public int subscribe(String regexp, EventClient client) 
	throws RemoteException, PatternSyntaxException;
    // Get rid of the subscription
    public void unsubscribe(int subscription) 
	throws RemoteException;
    // Get rid of all subscriptions matching this client
    public void unsubscribe(EventClient client) throws RemoteException;
    // Allow an event generator to generate an event.
    public void publish(Event e) throws RemoteException;

}

import java.rmi.*;

public interface EventClient extends Remote {
    // A callback from the server, letting us know what subscription
    // generated what event
    public void notify(int subscription, Event e) throws RemoteException;
}

import java.rmi.*;
import java.rmi.server.*;
import java.net.*;
import java.util.regex.*;
import java.util.*;
 
public class EventServerImpl extends UnicastRemoteObject 
    implements EventServer  {
 
    Vector regexps = new Vector();
    int subNum = 1;
    
    // Private inner class for wrapping up the subscriptions
    class Subscription {
	Pattern p;
	EventClient c;
	int subId;
    }
    
    public EventServerImpl() throws RemoteException {
	// Call the super class to ensure UnicastRemoteObject things
	// are done
        super();
    }
 

    public int subscribe(String regexp, EventClient client) 
	throws RemoteException, PatternSyntaxException {
	// Compile the pattern
	Pattern p = Pattern.compile(regexp);
	Subscription s = new Subscription();
	s.p = p; s.c = client; s.subId = subNum++;
	regexps.add(s);
	return s.subId;
    }
    
    public void unsubscribe(int subscription) 
	throws RemoteException {
	Iterator it = regexps.iterator();
	while(it.hasNext()) {
	    Subscription s = (Subscription)it.next();
	    if(s.subId == subscription) {
		it.remove();
	    }
	}
    }
    
    public void unsubscribe(EventClient client) throws RemoteException {
	Iterator it = regexps.iterator();
	while(it.hasNext()) {
	    Subscription s = (Subscription)it.next();
	    if(s.c == client) {
		it.remove();
	    }
	}
    }

    public void publish(Event e) throws RemoteException {
	System.out.println("publishing " + e);
	Iterator it = regexps.iterator();
	while(it.hasNext()) {
	    Subscription s = (Subscription)it.next();
	    if(s.p.matcher(e.getType()).matches()) {
		try {
		    s.c.notify(s.subId,e);
		} catch (RemoteException re) {
		    it.remove();
		}
	    }
	}
    }
 
    public static void main(String args[]) {
	System.out.println("Installing security manager");
	if (System.getSecurityManager() == null) {
            System.setSecurityManager(new RMISecurityManager());
        }
        try {
	    System.out.println("starting here...");
            EventServerImpl server = new EventServerImpl();
            String hostName = InetAddress.getLocalHost().getHostName();
	    System.out.println("Binding as //" + hostName + "/" + EventServer.rmiName);
            Naming.rebind("//" + hostName + "/" + EventServer.rmiName, server);
            System.out.println("EventServerImpl bound");
        } catch (Exception e) {
            System.err.println("EventServerImpl exception " + e.getMessage());
            e.printStackTrace();
        }
    }
}

import java.rmi.*;
import java.rmi.server.*;
import java.net.*;

public class EventClientImpl implements EventClient {
    
    EventServer server;
    
    public EventClientImpl(String rmiUrl) throws NotBoundException,
						 MalformedURLException,
						 RemoteException {
	this.server = (EventServer)Naming.lookup(rmiUrl);
	UnicastRemoteObject.exportObject(this);
    }
    
    public int addSubscription(String s) throws RemoteException {
	return server.subscribe(s, this);
    }

    public void removeSubscription(int key) throws RemoteException {
	server.unsubscribe(key);
    }
    
    public void notify(int subscription, Event e) 
	throws RemoteException {
	// We got an event
	System.out.println("Event received on subscription " + subscription);
	System.out.println(e);
	// Do something more interesting with the event?
    }

    public static void main(String args[]) {
	String url = args[0];
	String re = args[1];
	try {
	    // A very simple client for testing...
	    EventClientImpl client = new EventClientImpl(url);
	    client.addSubscription(re);
	} catch(Exception e) {
	    System.err.println(e);
	}
    }
}

import java.rmi.*;
import java.rmi.server.*;
import java.net.*;

public class EventGenerator  implements Runnable {
    
    EventServer server;
    String type;
    
    public EventGenerator(String rmiUrl, String type) 
	throws NotBoundException,
	       MalformedURLException,
	       RemoteException {
	this.type = type;
	this.server = (EventServer)Naming.lookup(rmiUrl);
    }
    
    public void run() {
	try {
	    int count = 0;
	    while(true) {
		server.publish(new Event(type,"Event " + count++));
		Thread.sleep(5000);
	    }
	}catch (Exception e) {
	    System.err.println(e);
	}
    }


    public static void main(String args[]) {
	String url = args[0];
	String type = args[1];
	try {
	    EventGenerator generator = new EventGenerator(url,type);
	    Thread t = new Thread(generator);
	    t.start();
	} catch(Exception e) {
	    System.err.println(e);
	}
    }
}

Ian Wakeman 2005-02-22