SubprocessEngine.java

00001 /* 
00002 ** Author(s): Miguel Calejo
00003 ** Contact:   interprolog@declarativa.com, http://www.declarativa.com
00004 ** Copyright (C) Declarativa, Portugal, 2000-2002
00005 ** Use and distribution, without any warranties, under the terms of the 
00006 ** GNU Library General Public License, readable in http://www.fsf.org/copyleft/lgpl.html
00007 */
00008 package com.declarativa.interprolog;
00009 import com.declarativa.interprolog.util.*;
00010 import java.io.*;
00011 import java.net.*;
00012 import java.util.*;
00013 import java.lang.reflect.*;
00014 import java.util.zip.*;
00015 
00019 public class SubprocessEngine extends PrologEngine{
00020     Process prolog;
00021     PrintWriter prologStdin;
00022     OutputHandler stdoutHandler, stderrHandler;
00023     ServerSocket serverSocket;
00024     protected Socket socket;
00025     ServerSocket intServerSocket=null; Socket intSocket=null; // Used only for Windows
00026     String interruptCommand=null; // Used only for UNIX
00027     Vector listeners = new Vector ();
00028     boolean available;
00029     
00030     static class ClientRecognizer extends Recognizer implements RecognizerListener{
00031         PrologOutputListener client;
00032         ClientRecognizer (PrologOutputListener client){
00033             this.client=client;
00034             addRecognizerListener (this);
00035         }
00036         public void recognized (Recognizer source,Object extra){
00037             client.print ((String)extra);
00038         }
00039     }
00040     
00045     public synchronized void addPrologOutputListener (PrologOutputListener client){
00046         ClientRecognizer RE = new ClientRecognizer (client);
00047         listeners.addElement (RE);
00048         addPrologStdoutListener (RE);
00049         addPrologStderrListener (RE);
00050     }
00051         
00052         public synchronized void removePrologOutputListener(PrologOutputListener client){
00053                 for (int i=0;i<listeners.size();i++) {
00054                         ClientRecognizer cr = (ClientRecognizer)(listeners.elementAt(i));
00055                         if (cr.client==client) {
00056                                 listeners.removeElementAt(i);
00057                                 removePrologStdoutListener(cr);
00058                                 removePrologStderrListener(cr);
00059                         }
00060                 }
00061         }
00062         
00069         public void addPrologStdoutListener(OutputListener l){
00070                 stdoutHandler.addOutputListener(l);
00071         }
00072         
00073         public void addPrologStderrListener(OutputListener l){
00074                 stderrHandler.addOutputListener(l);
00075         }
00076         
00077         public void removePrologStdoutListener(OutputListener l){
00078                 stdoutHandler.removeOutputListener(l);
00079         }
00080         
00081         public void removePrologStderrListener(OutputListener l){
00082                 stderrHandler.removeOutputListener(l);
00083         }
00084         
00085         Recognizer promptTrigger = new Recognizer("| ?-");
00086         Recognizer breakTrigger = new Recognizer(": ?-");
00087             
00096         public SubprocessEngine(String startPrologCommand, boolean debug) {
00097             super(startPrologCommand,debug);
00098             // Let's make sure PrologEngines get their finalize() message when we exit:
00099             // Now (JDK 1.2) considered unsafe, so make sure to message shutdown():  System.runFinalizersOnExit(true);
00100             try {
00101                 RecognizerListener availableSetter = new RecognizerListener (){
00102                     public void recognized(Recognizer source,Object extra){
00103                         available=true;
00104                     }
00105                 };
00106                 promptTrigger.addRecognizerListener(availableSetter);
00107                 breakTrigger.addRecognizerListener(availableSetter);
00108                 
00109                 progressMessage("Launching subprocess "+startPrologCommand);
00110                 prolog = Runtime.getRuntime().exec(startPrologCommand);
00111 
00112                 // prolog -> java.lang.UNIXProcess@f9dc36 
00113 
00114                 // No explicit buffering, because it's already being done by our Process's streams
00115                 // If not, OutputHandler will handle the issue
00116                 stdoutHandler = new OutputHandler(prolog.getInputStream());
00117                 stderrHandler = new OutputHandler(prolog.getErrorStream());
00118                 setDetectPromptAndBreak(true);
00119                 
00120                 stdoutHandler.start();
00121                 stderrHandler.start();
00122                 
00123                 Thread.yield(); // let's try to catch Prolog output ASAP
00124                 
00125                 prologStdin = new PrintWriter(prolog.getOutputStream());
00126                 
00127                 progressMessage("Loading initial files...");
00128                 command("assert(library_directory('"+tempDirectory.getAbsolutePath()+"'))");
00129                 //consultFromPackage("interprolog.O",SubprocessEngine.class);
00130                 consultFromPackage("interprolog.xwam",SubprocessEngine.class);
00131                 
00132                 String myHost="127.0.0.1"; // to avoid annoying Windows dialup attempt
00133                 progressMessage ("Allocating the ServerSocket...");
00134                 serverSocket = new ServerSocket (0); // let the system pick a port
00135                 progressMessage ("server port:"+serverSocket.getLocalPort ());
00136                                 
00137                 command("ipinitialize('"+myHost+"',"+
00138                         serverSocket.getLocalPort ()+","+
00139                         registerJavaObject (this)+")");
00140                 
00141                 progressMessage("Waiting for the socket to accept...");
00142                 socket = serverSocket.accept();
00143                 
00144                 progressMessage("Teaching examples to XSB...");
00145                 
00146                 PrologOutputObjectStream bootobjects = new PrologOutputObjectStream(socket.getOutputStream());
00147                 
00148                 // slightly more tortuous to ease reuse of these methods
00149                 ObjectOutputStream oos = bootobjects.getObjectStream(); 
00150                 teachIPobjects(oos);
00151                 teachBasicObjects(oos);         
00152                 bootobjects.flush();    
00153                 // Now for OS-dependent Prolog interrupt generation
00154                 //prepareInterrupt(myHost);
00155                 
00156 
00157                 // Added later to setup the call back server first
00158                         
00159                   setupCallbackServer();
00160                   waitUntilAvailable();
00161                   prepareInterrupt(myHost);
00162                   
00163             } catch (IOException e){
00164                     throw new IPException("Could not launch XSB:"+e);
00165             }
00166             
00167            // setupCallbackServer();
00168            // waitUntilAvailable();
00169             
00170         }
00171         
00172         public SubprocessEngine(String startPrologCommand){
00173                 this(startPrologCommand,false);
00174         }
00175         
00177         public boolean isAvailable(){
00178                 return available;
00179         }
00180         
00181         protected void setupCallbackServer(){
00182                 Thread server = new Thread (){
00183                         public void run(){
00184                                 try{
00185                                         while(!shutingDown) {
00186                                                 progressMessage("Waiting to receive object");
00187                                                 Object x = receiveObject();
00188                                                 progressMessage("Received object:"+x);
00189                                                 Object y = handleCallback(x);
00190                                                 progressMessage("Handled object and computed:"+y);
00191                                                 if (y!=null) sendObject(y);
00192                                         }
00193                                 } catch (IOException e){
00194                                         if (!shutingDown) 
00195                                         throw new IPException("Bad exception in setupCallbackServer:"+e);
00196                                 }
00197                         }
00198                 };
00199                 progressMessage ("Starting up callback service...");
00200                 server.setName("IP javaMessage handler");
00201                 server.start();
00202         }
00203         
00204         protected Object receiveObject() throws IOException{
00205         progressMessage("entering receiveObject()");
00206                 Object x=null;
00207         try{
00208                         ObjectInputStream ios = new ObjectInputStream(socket.getInputStream());
00209                         progressMessage("got input stream");
00210                         x = ios.readObject();
00211                 } catch (ClassNotFoundException e){
00212                         x = e;
00213                 }
00214         progressMessage("exiting receiveObject():"+x);
00215                 return x;
00216         }
00217         
00218         protected void sendObject (Object y) throws IOException{
00219         progressMessage("entering sendObject("+y+")");
00220                 PrologOutputObjectStream poos = 
00221                     new PrologOutputObjectStream(socket.getOutputStream());
00222                 poos.writeObject(y);
00223                 poos.flush(); // this actually writes to the socket stream
00224         progressMessage("exiting sendObject("+y+")");
00225         }
00226         
00229         public synchronized void shutdown(){
00230                 shutingDown=true;
00231                 available=false;
00232                 stdoutHandler.setIgnoreStreamEnd(true);
00233                 stderrHandler.setIgnoreStreamEnd(true);
00234                 
00235                 // Miti: Added for testit to work properly on Linux
00236                 try{
00237                     socket.close();
00238                     serverSocket.close();
00239                 }catch(IOException e) {}
00240                     
00241                 // Miti: Added the check so that the intSocket, intServerSocket are closed only for Windows
00242                 if(needsSocketInterrupt()){
00243                     try {
00244                         // closing sockets will stop them, no need to deprecate:
00245                         // stdoutHandler.stop(); stderrHandler.stop(); cbhandler.stop();
00246                         socket.close(); serverSocket.close();
00247                         intSocket.close(); intServerSocket.close();
00248                     }
00249                     catch (IOException e) {throw new IPException("Problems closing sockets:"+e);}
00250                     
00251                     finally{prolog.destroy();}
00252                 
00253                 }              
00254                 progressMessage("shutdown performed ");
00255         }
00256         
00257         protected boolean isShutingDown(){
00258                 return shutingDown;
00259         }
00260         
00264         protected void finalize() throws Throwable{
00265                 if (prolog!=null) prolog.destroy();
00266         }
00267         
00268         protected void setDetectPromptAndBreak(boolean yes){
00269                 if (yes==isDetectingPromptAndBreak()) return;
00270                 if(yes){
00271                         stdoutHandler.addOutputListener(promptTrigger);
00272                         stdoutHandler.addOutputListener(breakTrigger);
00273                         //stderrHandler.addOutputListener(promptTrigger);
00274                         //stderrHandler.addOutputListener(breakTrigger);
00275                 } else{
00276                         stdoutHandler.removeOutputListener(promptTrigger);
00277                         stdoutHandler.removeOutputListener(breakTrigger);
00278                         //stderrHandler.removeOutputListener(promptTrigger);
00279                         //stderrHandler.removeOutputListener(breakTrigger);
00280                 }
00281         }
00282         protected boolean isDetectingPromptAndBreak(){
00283                 return stdoutHandler.hasListener(promptTrigger) /*&& stderrHandler.hasListener(promptTrigger)*/ &&
00284                         stdoutHandler.hasListener(breakTrigger) /*&& stderrHandler.hasListener(breakTrigger)*/;
00285         }
00286         
00289         public synchronized void sendAndFlush(String s){
00290                 available=false;
00291                 prologStdin.print(s); prologStdin.flush();
00292         }
00293         
00294         public void sendAndFlushLn(String s){
00295                 sendAndFlush(s+nl);
00296         }
00297         
00298         protected static boolean needsSocketInterrupt(){ // if we're under Windows
00299                     return (System.getProperty("os.name").toLowerCase().indexOf("windows")!=-1);
00300             //return true;
00301           
00302         }
00303         
00304         protected void prepareInterrupt(String myHost) throws IOException{ // requires successful startup steps
00305                 if (needsSocketInterrupt()){ 
00306                         intServerSocket = new ServerSocket(0);
00307                         command("setupWindowsInterrupt('"+myHost+"',"+intServerSocket.getLocalPort()+")");
00308                         intSocket = intServerSocket.accept();
00309                 } else {
00310                         //available=true; // sort of a hack... but when will the state of 'available' become valid ?
00311                         waitUntilAvailable();
00312                         Object bindings[] = deterministicGoal("getPrologPID(N), ipObjectSpec('java.lang.Integer',Integer,[N],_)",
00313                                 "[Integer]");           
00314                         if (bindings!=null) interruptCommand = "/bin/kill -s INT "+bindings[0];
00315                         else throw new IPException("Could not find XSB's PID");
00316                 }
00317         }
00318         protected synchronized void doInterrupt(){
00319             setDetectPromptAndBreak(true);
00320             try {
00321                 if(needsSocketInterrupt()){
00322                                 // Windows
00323                     byte[] ctrlc = {3};
00324                     progressMessage("Attempting to interrupt XSB...");
00325                     OutputStream IS = intSocket.getOutputStream();
00326                     IS.write(ctrlc); IS.flush();
00327                 } else{
00328                                 // Probably Solaris: we'll just use a standard UNIX signal
00329                     progressMessage("Interrupting XSB with "+interruptCommand);
00330                     Runtime.getRuntime().exec(interruptCommand);
00331                 }
00332                         
00333             } 
00334             catch(IOException e) {throw new IPException("Exception in interrupt():"+e);}
00335             waitUntilAvailable();
00336             sendAndFlushLn("abort."); // leave break mode
00337         }
00338         
00341         protected boolean realCommand(String s){
00342                 progressMessage("COMMAND:"+s+".");
00343                 sendAndFlushLn(s+".");
00344                 return true; // we do not really know
00345         }
00346         
00347         public Object[] deterministicGoal(String G, String OVar, Object[] objectsP, String RVars){
00348            
00349                 boolean first=false;
00350                 synchronized(this){
00351                 if (!topGoalHasStarted){
00352                         topGoalHasStarted = true;
00353                                 first=true;
00354                         }
00355                 }
00356                 if (first){
00357                         if (!isIdle()) throw new IPException("Inconsistency in deterministicGoal:");            
00358                         Object[] result = firstGoal(G, OVar, objectsP, RVars);
00359                         return result;
00360                 } else return super.deterministicGoal(G, OVar, objectsP, RVars);
00361         }
00362 
00364         protected Object[] firstGoal(String G, String OVar, Object[] objectsP, String RVars){
00365                 int mytimestamp = incGoalTimestamp();
00366         GoalFromJava GO = makeDGoalObject(G, OVar, objectsP, RVars, mytimestamp);
00367         Object[] resultToReturn=null;
00368         try{
00369             progressMessage("Schedulling (first) goal "+G+", timestamp "+mytimestamp+" in thread "+Thread.currentThread().getName());
00370             GoalToExecute goalToDo = new GoalToExecute(GO);
00371             goalToDo.setFirstGoalStatus();
00372             scheduleGoal(goalToDo);
00373                         goalToDo.prologWasCalled();
00374             //setupErrorHandling();
00375             sendObject(GO);
00376             realCommand("deterministicGoal"); // assynchronous
00377             ResultFromProlog result = goalToDo.waitForResult();
00378             // goalToDo is forgotten by handleCallback
00379             progressMessage("got dG result for timestamp "+mytimestamp);         
00380             if (result==null) throw new IPException("Problems in goal result");
00381             if (goalToDo.wasAborted()) throw new IPAbortedException(G+" was aborted");
00382             if (goalToDo.wasInterrupted()) throw new IPInterruptedException(G+" was interrupted");
00383             if (result.error!=null) throw new IPException (result.error);
00384             if (result.timestamp!=mytimestamp)
00385                 throw new IPException ("bad timestamp in deterministicGoal, got "+result.timestamp+" instead of "+goalTimestamp);
00386             if (result.succeeded)
00387                 resultToReturn = result.rVars;
00388         } catch (IPException e) {
00389             throw e;
00390         } catch (Exception e) {
00391             throw new IPException ("Problem in deterministicGoal:"+e);
00392         } finally{
00393                         topGoalHasStarted = false; // is this OK? this assumes no initiative from the Prolog side, which is probably correct
00394                         //removeErrorHandling();
00395                         progressMessage("Leaving firstGoal for "+G+", timestamp "+mytimestamp+" isIdle()=="+isIdle());
00396         }
00397         return resultToReturn;
00398     }
00399 
00400         protected Object doSomething(){
00401                 if (onlyFirstGoalSchedulled()) return null;
00402                 else return super.doSomething();
00403         }
00404         
00405         protected synchronized boolean onlyFirstGoalSchedulled(){
00406                 return isIdle() || (messagesExecuting.size()==0 && goalsToExecute.size()==1 && 
00407                         ((GoalToExecute)goalsToExecute.elementAt(0)).isFirstGoal());
00408         }
00409         
00410         // deterministicGoal helpers
00411         
00412     protected void setupErrorHandling(){
00413                 setDetectPromptAndBreak(false);
00414                 stderrHandler.addOutputListener(errorTrigger); // no need to listen to stdout
00415                 abortMessage = "";
00416                 final Thread current = Thread.currentThread();
00417                 // We could dispense creating this every time:
00418                 errorHandler = new RecognizerListener(){
00419                         public void recognized(Recognizer source,Object extra){
00420                             abortMessage = (String)extra;
00421                             current.interrupt(); 
00422                         }
00423                     };
00424                 errorTrigger.addRecognizerListener(errorHandler);
00425     }
00426     
00427     protected void removeErrorHandling(){
00428         errorTrigger.removeRecognizerListener(errorHandler);
00429         stderrHandler.removeOutputListener(errorTrigger);
00430         errorHandler=null;
00431                 setDetectPromptAndBreak(true);
00432     }
00433     private RecognizerListener errorHandler=null;
00434     Recognizer errorTrigger = new Recognizer("++Error",true); // was "++Error: " for XSB 2.4
00435     private String abortMessage;
00436 
00437 }
00438 
00439 

Generated on Wed Jul 26 13:30:44 2006 for XSB by  doxygen 1.4.5