00001
00002
00003
00004
00005
00006
00007
00008 package com.declarativa.interprolog;
00009 import com.declarativa.interprolog.util.*;
00010 import java.io.*;
00011 import java.net.*;
00012 import java.util.*;
00013 import java.lang.reflect.*;
00014 import java.util.zip.*;
00015
00019 public class SubprocessEngine extends PrologEngine{
00020 Process prolog;
00021 PrintWriter prologStdin;
00022 OutputHandler stdoutHandler, stderrHandler;
00023 ServerSocket serverSocket;
00024 protected Socket socket;
00025 ServerSocket intServerSocket=null; Socket intSocket=null;
00026 String interruptCommand=null;
00027 Vector listeners = new Vector ();
00028 boolean available;
00029
00030 static class ClientRecognizer extends Recognizer implements RecognizerListener{
00031 PrologOutputListener client;
00032 ClientRecognizer (PrologOutputListener client){
00033 this.client=client;
00034 addRecognizerListener (this);
00035 }
00036 public void recognized (Recognizer source,Object extra){
00037 client.print ((String)extra);
00038 }
00039 }
00040
00045 public synchronized void addPrologOutputListener (PrologOutputListener client){
00046 ClientRecognizer RE = new ClientRecognizer (client);
00047 listeners.addElement (RE);
00048 addPrologStdoutListener (RE);
00049 addPrologStderrListener (RE);
00050 }
00051
00052 public synchronized void removePrologOutputListener(PrologOutputListener client){
00053 for (int i=0;i<listeners.size();i++) {
00054 ClientRecognizer cr = (ClientRecognizer)(listeners.elementAt(i));
00055 if (cr.client==client) {
00056 listeners.removeElementAt(i);
00057 removePrologStdoutListener(cr);
00058 removePrologStderrListener(cr);
00059 }
00060 }
00061 }
00062
00069 public void addPrologStdoutListener(OutputListener l){
00070 stdoutHandler.addOutputListener(l);
00071 }
00072
00073 public void addPrologStderrListener(OutputListener l){
00074 stderrHandler.addOutputListener(l);
00075 }
00076
00077 public void removePrologStdoutListener(OutputListener l){
00078 stdoutHandler.removeOutputListener(l);
00079 }
00080
00081 public void removePrologStderrListener(OutputListener l){
00082 stderrHandler.removeOutputListener(l);
00083 }
00084
00085 Recognizer promptTrigger = new Recognizer("| ?-");
00086 Recognizer breakTrigger = new Recognizer(": ?-");
00087
00096 public SubprocessEngine(String startPrologCommand, boolean debug) {
00097 super(startPrologCommand,debug);
00098
00099
00100 try {
00101 RecognizerListener availableSetter = new RecognizerListener (){
00102 public void recognized(Recognizer source,Object extra){
00103 available=true;
00104 }
00105 };
00106 promptTrigger.addRecognizerListener(availableSetter);
00107 breakTrigger.addRecognizerListener(availableSetter);
00108
00109 progressMessage("Launching subprocess "+startPrologCommand);
00110 prolog = Runtime.getRuntime().exec(startPrologCommand);
00111
00112
00113
00114
00115
00116 stdoutHandler = new OutputHandler(prolog.getInputStream());
00117 stderrHandler = new OutputHandler(prolog.getErrorStream());
00118 setDetectPromptAndBreak(true);
00119
00120 stdoutHandler.start();
00121 stderrHandler.start();
00122
00123 Thread.yield();
00124
00125 prologStdin = new PrintWriter(prolog.getOutputStream());
00126
00127 progressMessage("Loading initial files...");
00128 command("assert(library_directory('"+tempDirectory.getAbsolutePath()+"'))");
00129
00130 consultFromPackage("interprolog.xwam",SubprocessEngine.class);
00131
00132 String myHost="127.0.0.1";
00133 progressMessage ("Allocating the ServerSocket...");
00134 serverSocket = new ServerSocket (0);
00135 progressMessage ("server port:"+serverSocket.getLocalPort ());
00136
00137 command("ipinitialize('"+myHost+"',"+
00138 serverSocket.getLocalPort ()+","+
00139 registerJavaObject (this)+")");
00140
00141 progressMessage("Waiting for the socket to accept...");
00142 socket = serverSocket.accept();
00143
00144 progressMessage("Teaching examples to XSB...");
00145
00146 PrologOutputObjectStream bootobjects = new PrologOutputObjectStream(socket.getOutputStream());
00147
00148
00149 ObjectOutputStream oos = bootobjects.getObjectStream();
00150 teachIPobjects(oos);
00151 teachBasicObjects(oos);
00152 bootobjects.flush();
00153
00154
00155
00156
00157
00158
00159 setupCallbackServer();
00160 waitUntilAvailable();
00161 prepareInterrupt(myHost);
00162
00163 } catch (IOException e){
00164 throw new IPException("Could not launch XSB:"+e);
00165 }
00166
00167
00168
00169
00170 }
00171
00172 public SubprocessEngine(String startPrologCommand){
00173 this(startPrologCommand,false);
00174 }
00175
00177 public boolean isAvailable(){
00178 return available;
00179 }
00180
00181 protected void setupCallbackServer(){
00182 Thread server = new Thread (){
00183 public void run(){
00184 try{
00185 while(!shutingDown) {
00186 progressMessage("Waiting to receive object");
00187 Object x = receiveObject();
00188 progressMessage("Received object:"+x);
00189 Object y = handleCallback(x);
00190 progressMessage("Handled object and computed:"+y);
00191 if (y!=null) sendObject(y);
00192 }
00193 } catch (IOException e){
00194 if (!shutingDown)
00195 throw new IPException("Bad exception in setupCallbackServer:"+e);
00196 }
00197 }
00198 };
00199 progressMessage ("Starting up callback service...");
00200 server.setName("IP javaMessage handler");
00201 server.start();
00202 }
00203
00204 protected Object receiveObject() throws IOException{
00205 progressMessage("entering receiveObject()");
00206 Object x=null;
00207 try{
00208 ObjectInputStream ios = new ObjectInputStream(socket.getInputStream());
00209 progressMessage("got input stream");
00210 x = ios.readObject();
00211 } catch (ClassNotFoundException e){
00212 x = e;
00213 }
00214 progressMessage("exiting receiveObject():"+x);
00215 return x;
00216 }
00217
00218 protected void sendObject (Object y) throws IOException{
00219 progressMessage("entering sendObject("+y+")");
00220 PrologOutputObjectStream poos =
00221 new PrologOutputObjectStream(socket.getOutputStream());
00222 poos.writeObject(y);
00223 poos.flush();
00224 progressMessage("exiting sendObject("+y+")");
00225 }
00226
00229 public synchronized void shutdown(){
00230 shutingDown=true;
00231 available=false;
00232 stdoutHandler.setIgnoreStreamEnd(true);
00233 stderrHandler.setIgnoreStreamEnd(true);
00234
00235
00236 try{
00237 socket.close();
00238 serverSocket.close();
00239 }catch(IOException e) {}
00240
00241
00242 if(needsSocketInterrupt()){
00243 try {
00244
00245
00246 socket.close(); serverSocket.close();
00247 intSocket.close(); intServerSocket.close();
00248 }
00249 catch (IOException e) {throw new IPException("Problems closing sockets:"+e);}
00250
00251 finally{prolog.destroy();}
00252
00253 }
00254 progressMessage("shutdown performed ");
00255 }
00256
00257 protected boolean isShutingDown(){
00258 return shutingDown;
00259 }
00260
00264 protected void finalize() throws Throwable{
00265 if (prolog!=null) prolog.destroy();
00266 }
00267
00268 protected void setDetectPromptAndBreak(boolean yes){
00269 if (yes==isDetectingPromptAndBreak()) return;
00270 if(yes){
00271 stdoutHandler.addOutputListener(promptTrigger);
00272 stdoutHandler.addOutputListener(breakTrigger);
00273
00274
00275 } else{
00276 stdoutHandler.removeOutputListener(promptTrigger);
00277 stdoutHandler.removeOutputListener(breakTrigger);
00278
00279
00280 }
00281 }
00282 protected boolean isDetectingPromptAndBreak(){
00283 return stdoutHandler.hasListener(promptTrigger) &&
00284 stdoutHandler.hasListener(breakTrigger) ;
00285 }
00286
00289 public synchronized void sendAndFlush(String s){
00290 available=false;
00291 prologStdin.print(s); prologStdin.flush();
00292 }
00293
00294 public void sendAndFlushLn(String s){
00295 sendAndFlush(s+nl);
00296 }
00297
00298 protected static boolean needsSocketInterrupt(){
00299 return (System.getProperty("os.name").toLowerCase().indexOf("windows")!=-1);
00300
00301
00302 }
00303
00304 protected void prepareInterrupt(String myHost) throws IOException{
00305 if (needsSocketInterrupt()){
00306 intServerSocket = new ServerSocket(0);
00307 command("setupWindowsInterrupt('"+myHost+"',"+intServerSocket.getLocalPort()+")");
00308 intSocket = intServerSocket.accept();
00309 } else {
00310
00311 waitUntilAvailable();
00312 Object bindings[] = deterministicGoal("getPrologPID(N), ipObjectSpec('java.lang.Integer',Integer,[N],_)",
00313 "[Integer]");
00314 if (bindings!=null) interruptCommand = "/bin/kill -s INT "+bindings[0];
00315 else throw new IPException("Could not find XSB's PID");
00316 }
00317 }
00318 protected synchronized void doInterrupt(){
00319 setDetectPromptAndBreak(true);
00320 try {
00321 if(needsSocketInterrupt()){
00322
00323 byte[] ctrlc = {3};
00324 progressMessage("Attempting to interrupt XSB...");
00325 OutputStream IS = intSocket.getOutputStream();
00326 IS.write(ctrlc); IS.flush();
00327 } else{
00328
00329 progressMessage("Interrupting XSB with "+interruptCommand);
00330 Runtime.getRuntime().exec(interruptCommand);
00331 }
00332
00333 }
00334 catch(IOException e) {throw new IPException("Exception in interrupt():"+e);}
00335 waitUntilAvailable();
00336 sendAndFlushLn("abort.");
00337 }
00338
00341 protected boolean realCommand(String s){
00342 progressMessage("COMMAND:"+s+".");
00343 sendAndFlushLn(s+".");
00344 return true;
00345 }
00346
00347 public Object[] deterministicGoal(String G, String OVar, Object[] objectsP, String RVars){
00348
00349 boolean first=false;
00350 synchronized(this){
00351 if (!topGoalHasStarted){
00352 topGoalHasStarted = true;
00353 first=true;
00354 }
00355 }
00356 if (first){
00357 if (!isIdle()) throw new IPException("Inconsistency in deterministicGoal:");
00358 Object[] result = firstGoal(G, OVar, objectsP, RVars);
00359 return result;
00360 } else return super.deterministicGoal(G, OVar, objectsP, RVars);
00361 }
00362
00364 protected Object[] firstGoal(String G, String OVar, Object[] objectsP, String RVars){
00365 int mytimestamp = incGoalTimestamp();
00366 GoalFromJava GO = makeDGoalObject(G, OVar, objectsP, RVars, mytimestamp);
00367 Object[] resultToReturn=null;
00368 try{
00369 progressMessage("Schedulling (first) goal "+G+", timestamp "+mytimestamp+" in thread "+Thread.currentThread().getName());
00370 GoalToExecute goalToDo = new GoalToExecute(GO);
00371 goalToDo.setFirstGoalStatus();
00372 scheduleGoal(goalToDo);
00373 goalToDo.prologWasCalled();
00374
00375 sendObject(GO);
00376 realCommand("deterministicGoal");
00377 ResultFromProlog result = goalToDo.waitForResult();
00378
00379 progressMessage("got dG result for timestamp "+mytimestamp);
00380 if (result==null) throw new IPException("Problems in goal result");
00381 if (goalToDo.wasAborted()) throw new IPAbortedException(G+" was aborted");
00382 if (goalToDo.wasInterrupted()) throw new IPInterruptedException(G+" was interrupted");
00383 if (result.error!=null) throw new IPException (result.error);
00384 if (result.timestamp!=mytimestamp)
00385 throw new IPException ("bad timestamp in deterministicGoal, got "+result.timestamp+" instead of "+goalTimestamp);
00386 if (result.succeeded)
00387 resultToReturn = result.rVars;
00388 } catch (IPException e) {
00389 throw e;
00390 } catch (Exception e) {
00391 throw new IPException ("Problem in deterministicGoal:"+e);
00392 } finally{
00393 topGoalHasStarted = false;
00394
00395 progressMessage("Leaving firstGoal for "+G+", timestamp "+mytimestamp+" isIdle()=="+isIdle());
00396 }
00397 return resultToReturn;
00398 }
00399
00400 protected Object doSomething(){
00401 if (onlyFirstGoalSchedulled()) return null;
00402 else return super.doSomething();
00403 }
00404
00405 protected synchronized boolean onlyFirstGoalSchedulled(){
00406 return isIdle() || (messagesExecuting.size()==0 && goalsToExecute.size()==1 &&
00407 ((GoalToExecute)goalsToExecute.elementAt(0)).isFirstGoal());
00408 }
00409
00410
00411
00412 protected void setupErrorHandling(){
00413 setDetectPromptAndBreak(false);
00414 stderrHandler.addOutputListener(errorTrigger);
00415 abortMessage = "";
00416 final Thread current = Thread.currentThread();
00417
00418 errorHandler = new RecognizerListener(){
00419 public void recognized(Recognizer source,Object extra){
00420 abortMessage = (String)extra;
00421 current.interrupt();
00422 }
00423 };
00424 errorTrigger.addRecognizerListener(errorHandler);
00425 }
00426
00427 protected void removeErrorHandling(){
00428 errorTrigger.removeRecognizerListener(errorHandler);
00429 stderrHandler.removeOutputListener(errorTrigger);
00430 errorHandler=null;
00431 setDetectPromptAndBreak(true);
00432 }
00433 private RecognizerListener errorHandler=null;
00434 Recognizer errorTrigger = new Recognizer("++Error",true);
00435 private String abortMessage;
00436
00437 }
00438
00439