import java.io.*; public class Event implements Serializable { // A string for containing the event type. String type = ""; // An event has data that it passes from one place to another. // We don't know what it is, exccept that it should be // serializable Serializable data; public Event(String type, Serializable data) { this.setType(type); this.setData(data); } /** * Get the Data value. * @return the Data value. */ public Serializable getData() { return data; } /** * Set the Data value. * @param newData The new Data value. */ public void setData(Serializable newData) { this.data = newData; } /** * Get the Type value. * @return the Type value. */ public String getType() { return type; } /** * Set the Type value. * @param newType The new Type value. */ public void setType(String newType) { this.type = newType; } public String toString() { return this.type + ":\n" + data; } } import java.rmi.*; import java.util.regex.*; public interface EventServer extends Remote { // To ease the pain of creating a name public static String rmiName = "PubSubServer"; // Pass in the regex as a string, and a reference to the Remote // interface Returns an ineger label for referring to this // particular subscription thereafter public int subscribe(String regexp, EventClient client) throws RemoteException, PatternSyntaxException; // Get rid of the subscription public void unsubscribe(int subscription) throws RemoteException; // Get rid of all subscriptions matching this client public void unsubscribe(EventClient client) throws RemoteException; // Allow an event generator to generate an event. public void publish(Event e) throws RemoteException; } import java.rmi.*; public interface EventClient extends Remote { // A callback from the server, letting us know what subscription // generated what event public void notify(int subscription, Event e) throws RemoteException; } import java.rmi.*; import java.rmi.server.*; import java.net.*; import java.util.regex.*; import java.util.*; public class EventServerImpl extends UnicastRemoteObject implements EventServer { Vector regexps = new Vector(); int subNum = 1; // Private inner class for wrapping up the subscriptions class Subscription { Pattern p; EventClient c; int subId; } public EventServerImpl() throws RemoteException { // Call the super class to ensure UnicastRemoteObject things // are done super(); } public int subscribe(String regexp, EventClient client) throws RemoteException, PatternSyntaxException { // Compile the pattern Pattern p = Pattern.compile(regexp); Subscription s = new Subscription(); s.p = p; s.c = client; s.subId = subNum++; regexps.add(s); return s.subId; } public void unsubscribe(int subscription) throws RemoteException { Iterator it = regexps.iterator(); while(it.hasNext()) { Subscription s = (Subscription)it.next(); if(s.subId == subscription) { it.remove(); } } } public void unsubscribe(EventClient client) throws RemoteException { Iterator it = regexps.iterator(); while(it.hasNext()) { Subscription s = (Subscription)it.next(); if(s.c == client) { it.remove(); } } } public void publish(Event e) throws RemoteException { System.out.println("publishing " + e); Iterator it = regexps.iterator(); while(it.hasNext()) { Subscription s = (Subscription)it.next(); if(s.p.matcher(e.getType()).matches()) { try { s.c.notify(s.subId,e); } catch (RemoteException re) { it.remove(); } } } } public static void main(String args[]) { System.out.println("Installing security manager"); if (System.getSecurityManager() == null) { System.setSecurityManager(new RMISecurityManager()); } try { System.out.println("starting here..."); EventServerImpl server = new EventServerImpl(); String hostName = InetAddress.getLocalHost().getHostName(); System.out.println("Binding as //" + hostName + "/" + EventServer.rmiName); Naming.rebind("//" + hostName + "/" + EventServer.rmiName, server); System.out.println("EventServerImpl bound"); } catch (Exception e) { System.err.println("EventServerImpl exception " + e.getMessage()); e.printStackTrace(); } } } import java.rmi.*; import java.rmi.server.*; import java.net.*; public class EventClientImpl implements EventClient { EventServer server; public EventClientImpl(String rmiUrl) throws NotBoundException, MalformedURLException, RemoteException { this.server = (EventServer)Naming.lookup(rmiUrl); UnicastRemoteObject.exportObject(this); } public int addSubscription(String s) throws RemoteException { return server.subscribe(s, this); } public void removeSubscription(int key) throws RemoteException { server.unsubscribe(key); } public void notify(int subscription, Event e) throws RemoteException { // We got an event System.out.println("Event received on subscription " + subscription); System.out.println(e); // Do something more interesting with the event? } public static void main(String args[]) { String url = args[0]; String re = args[1]; try { // A very simple client for testing... EventClientImpl client = new EventClientImpl(url); client.addSubscription(re); } catch(Exception e) { System.err.println(e); } } } import java.rmi.*; import java.rmi.server.*; import java.net.*; public class EventGenerator implements Runnable { EventServer server; String type; public EventGenerator(String rmiUrl, String type) throws NotBoundException, MalformedURLException, RemoteException { this.type = type; this.server = (EventServer)Naming.lookup(rmiUrl); } public void run() { try { int count = 0; while(true) { server.publish(new Event(type,"Event " + count++)); Thread.sleep(5000); } }catch (Exception e) { System.err.println(e); } } public static void main(String args[]) { String url = args[0]; String type = args[1]; try { EventGenerator generator = new EventGenerator(url,type); Thread t = new Thread(generator); t.start(); } catch(Exception e) { System.err.println(e); } } }
Ian Wakeman 2005-02-22