conc_compl.c

00001 #include "xsb_config.h"
00002 
00003 #ifdef CONC_COMPL
00004 
00005 #include <pthread.h>
00006 
00007 #include "context.h"
00008 #include "basicdefs.h"
00009 #include "macro_xsb.h"
00010 #include "cell_xsb.h"
00011 #include "choice.h"
00012 #include "conc_compl.h"
00013 #include "tables.h"
00014 #include "tr_utils.h"
00015 #include "thread_xsb.h"
00016 #include "error_xsb.h"
00017 
00018 #define GetDepSubgoal(d)        ((d)->Subgoal)
00019 #define GetDepLast(d)           ((d)->last)
00020 #define GetDepTid(d)            (subg_tid((d)->Subgoal))
00021 
00022 static ThreadDep * find_tid( ThreadDepList *TDL, int tid )
00023 {
00024         int i ;
00025         i = 0 ;
00026         while ( i < TDL->NumDeps )
00027                 if ( GetDepTid(&TDL->Deps[i]) == tid )
00028                         return &TDL->Deps[i] ;
00029                 else
00030                         i++ ;
00031         return NULL ;
00032 }
00033 
00034 /********************************************************************* 
00035    Updates thread dependency list (TDL) for a thread t (accessible
00036    through the context for t).  The TDL for t includes each thread t_1
00037    upon which some subgoal computed by t depends, along with the
00038    oldest subgoal computed by t_1 upon which there is a dependency.
00039    Returns TRUE if a new dependency is found; and FALSE if not.
00040 */
00041 
00042 static int check_ins_subg( ThreadDepList *TDL, VariantSF subg )
00043 {       int i ;
00044         i = 0 ;
00045         while ( i < TDL->NumDeps )
00046         {       if ( GetDepTid(&TDL->Deps[i]) == subg_tid(subg) )
00047                 {       if( subg_compl_stack_ptr(subg) > 
00048                             subg_compl_stack_ptr(GetDepSubgoal(&TDL->Deps[i])) )
00049                         {       GetDepSubgoal(&TDL->Deps[i]) = subg ;
00050                                 return TRUE ;
00051                         }
00052                         else
00053                                 return FALSE ;
00054                 }
00055                 else
00056                         i++ ;
00057         }
00058         if( TDL->NumDeps == MAX_TDEP_LIST - 1 )
00059                 xsb_abort( "Too many inter-thread dependencies" );
00060 
00061         GetDepSubgoal(&TDL->Deps[TDL->NumDeps++]) = subg ;
00062         return TRUE ;
00063 }
00064 
00065 int EmptyThreadDepList( ThreadDepList *TDL )
00066 {
00067         return TDL->NumDeps == 0 ;
00068 }
00069 
00070 void InitThreadDepList( ThreadDepList *TDL )
00071 {
00072         TDL->NumDeps = 0 ;
00073 }
00074 
00075 static void CopyThreadDepList( ThreadDepList *to, ThreadDepList *from )
00076 {
00077         int i ;
00078         to->NumDeps = from->NumDeps ;
00079         for( i = 0 ; i < to->NumDeps ; i++ )
00080                 to->Deps[i] = from->Deps[i] ;
00081 }
00082 
00083 ThreadDep *GetInitDep( ThreadDepList *TDL )
00084 {
00085         if( TDL->NumDeps == 0 )
00086                 return NULL ;
00087         else
00088                 return &TDL->Deps[0] ;
00089 }
00090 
00091 ThreadDep *GetNextDep( ThreadDepList *TDL, ThreadDep *dep )
00092 {
00093         int i = dep - TDL->Deps ;
00094         i++ ;
00095         if( TDL->NumDeps <= i )
00096                 return NULL ;
00097         else
00098                 return &TDL->Deps[i] ;
00099 
00100 }
00101 
00102 /********************************************************************* 
00103    Propagates dependencies from the TDL of the dep_th to the TDL of
00104    th, using check_ins_subg().  If the process finds a new leader in
00105    th, then *leader is set to TRUE; otherwise, if some other new
00106    dependency is found, *new_deps is set to TRUE.
00107  */
00108 
00109 static void PropagateDeps( th_context *th, th_context *dep_th, 
00110                         ThreadDepList *NewTDL, CPtr *leader, int *new_deps )
00111 {
00112     ThreadDep *dep ;
00113     VariantSF sgf ;
00114 
00115     dep = GetInitDep(&dep_th->TDL); 
00116     while( dep != NULL )
00117     {   sgf = GetDepSubgoal(dep) ;
00118         if( subg_tid(sgf) == th->tid )
00119         {
00120             if( subg_compl_stack_ptr(sgf) > *leader )
00121                 *leader = subg_compl_stack_ptr(sgf) ;
00122         }
00123         else
00124                 if( check_ins_subg(NewTDL, sgf) )
00125                         *new_deps = TRUE ;
00126         dep = GetNextDep(&dep_th->TDL, dep); 
00127     }
00128 }
00129 
00130 /********************************************************************* 
00131   UpdateDeps updates all of a given thread t's dependencies.  If th ->
00132   completing is TRUE, the thread is waiting on the completing mutex;
00133   if false, its either doing work or executing the completion
00134   algorithm -- but only one thread can execute the completion
00135   algorithm at one time.  In the inner loop, for each completing
00136   (blocked) thread t1 that t depends on, the dependencies are updated
00137   as is last_ans.  It turns out that by propagating dependencies from
00138   t1 to t, new dependencies may arise, which are handled by the outer
00139   loop.  If busy is set to TRUE, the thread will later resuspend.
00140  */
00141 
00142 void UpdateDeps(th_context *th, int *busy, CPtr *leader)
00143 {
00144     ThreadDepList NewTDL;
00145     int new_deps ;
00146     VariantSF sgf;
00147     ThreadDep *dep, *dep1 ;
00148     th_context * dep_th ;
00149 
00150     InitThreadDepList( &NewTDL ) ;
00151     *busy = FALSE ;
00152 
00153     do
00154     {
00155         new_deps = FALSE ;
00156         dep = GetInitDep(&th->TDL); 
00157         while( dep != NULL )
00158         {   sgf = GetDepSubgoal(dep) ;
00159             if( !is_completed(sgf) )
00160             {
00161                 check_ins_subg(&NewTDL, sgf) ;
00162                 dep1 = find_tid(&NewTDL,subg_tid(sgf));
00163                 dep_th = find_context(subg_tid(sgf)) ;
00164                 if( dep_th->completing )
00165                         GetDepLast(dep1) = dep_th->last_ans ;
00166                 else
00167                         GetDepLast(dep1) = 0 ;
00168                 if( dep_th->completing )
00169                     PropagateDeps( th, dep_th, &NewTDL, leader, &new_deps ) ;
00170                 else
00171                     *busy = TRUE ;
00172             }
00173             dep = GetNextDep(&th->TDL, dep); 
00174         }
00175         CopyThreadDepList( &th->TDL, &NewTDL ) ;
00176     }
00177     while( new_deps ) ;
00178 }
00179 
00180 /********************************************************************* 
00181    MayHaveAnswers is only called by a thread th after UpdateDeps
00182    traverses all dependencies and it is determined that each dependent
00183    thread th0 for th is "completing", i.e.  blocked on the completing
00184    mutex.  MayHaveAnswers then conducts a second pass that checks
00185    whether the last answer in th0 is the same as the last answer known
00186    to th.
00187 */
00188 
00189 int MayHaveAnswers( th_context * th )
00190 {
00191     th_context * dep_th, *th1 ;
00192     ThreadDep *dep, *dep1 ;
00193     int tid, tid1 ;
00194     int rc = FALSE ;
00195 
00196     dep = GetInitDep(&th->TDL); 
00197     while( dep != NULL )
00198     {   tid = GetDepTid(dep) ;
00199         dep_th = find_context(tid) ;
00200         dep1 = GetInitDep(&dep_th->TDL); 
00201         while( dep1 != NULL )
00202         {   tid1 = GetDepTid(dep1) ;
00203             th1 = find_context(tid1) ;
00204             if( GetDepLast(dep1) < th1->last_ans )
00205                 rc = TRUE ;
00206             dep1 = GetNextDep(&dep_th->TDL, dep1); 
00207         }
00208         dep = GetNextDep(&th->TDL, dep); 
00209     }
00210     return rc ;
00211 }
00212 
00213 /********************************************************************* 
00214   CheckForSCC() is only called by a thread th after UpdateDeps
00215   traverses all dependencies and it is determined that each dependent
00216   thread th0 for th is "completing", i.e.  blocked on the completing
00217   mutex.  Once called, it traverses through the TDL for th and for
00218   each dependent thread dep_th, ensures that dep_th's TDL contains all
00219   and only the dependencies of th's TDL.
00220 
00221   Question: is cc_leader necessary??
00222 
00223 */
00224 
00225 int CheckForSCC( th_context * th )
00226 {
00227     th_context * dep_th ;
00228     ThreadDep *dep, *dep1 ;
00229     int tid, tid1 ;
00230     VariantSF sgf;
00231 
00232     dep = GetInitDep(&th->TDL); 
00233     while( dep != NULL )
00234     {   tid = GetDepTid(dep) ;
00235         sgf = GetDepSubgoal(dep) ;
00236         dep_th = find_context(tid) ;
00237         if(dep_th->cc_leader < subg_compl_stack_ptr(sgf))
00238                 return FALSE ;
00239         dep1 = GetInitDep(&dep_th->TDL); 
00240         while( dep1 != NULL )
00241         {   tid1 = GetDepTid(dep1) ;
00242             if( tid1 != th->tid && !find_tid(&th->TDL,tid1) )
00243                 return FALSE ;
00244             dep1 = GetNextDep(&dep_th->TDL, dep1); 
00245         }
00246         dep1 = GetInitDep(&th->TDL); 
00247         while( dep1 != NULL )
00248         {   tid1 = GetDepTid(dep1) ;
00249             if( tid1 != dep_th->tid && !find_tid(&dep_th->TDL,tid1) )
00250                 return FALSE ;
00251             dep1 = GetNextDep(&th->TDL, dep1); 
00252         }
00253         dep = GetNextDep(&th->TDL, dep); 
00254     }
00255     return TRUE ;
00256 }
00257 
00258 void CompleteOtherThreads( th_context * th )
00259 {
00260     th_context * dep_th ;
00261     ThreadDep *dep ;
00262     VariantSF sgf ;
00263 
00264     dep = GetInitDep(&th->TDL); 
00265     while( dep != NULL )
00266     {   sgf = GetDepSubgoal(dep) ;
00267         dep_th = find_context(subg_tid(sgf)) ;
00268         CompleteTop(dep_th, dep_th->cc_leader) ;
00269         dep_th->completed = TRUE ;
00270         pthread_cond_signal(&dep_th->cond_var) ;
00271         dep = GetNextDep(&th->TDL, dep); 
00272     }
00273 }
00274 
00275 void WakeOtherThreads( th_context * th )
00276 {
00277     th_context * dep_th ;
00278     ThreadDep *dep ;
00279     int tid;
00280 
00281     dep = GetInitDep(&th->TDL); 
00282     while( dep != NULL )
00283     {   tid = GetDepTid(dep) ;
00284         dep_th = find_context(tid) ;
00285         pthread_cond_signal(&dep_th->cond_var) ;
00286         dep = GetNextDep(&th->TDL, dep); 
00287     }
00288 }
00289 
00290 void WakeDependentThreads( th_context * th, VariantSF subg ) 
00291 {
00292     CPtr Cons ;
00293     th_context *cons_th ;
00294         
00295     Cons = subg_asf_list_ptr(subg) ;
00296     while(Cons)
00297     {
00298         if( int_val(nlcp_tid(Cons)) != th->tid )
00299         {   cons_th = find_context(int_val(nlcp_tid(Cons))) ;
00300             pthread_cond_signal(&cons_th->cond_var) ;
00301         }
00302         Cons = nlcp_prevlookup(Cons) ;
00303     }
00304 }
00305 
00306 void CompleteTop( th_context * th, CPtr leader )
00307 {
00308   VariantSF compl_subg;
00309   CPtr ComplStkFrame = leader;
00310                                                                                 
00311   /* mark all SCC as completed and do simplification also, reclaim
00312      space for all but the leader */
00313                                                                                 
00314   while (ComplStkFrame >= openreg) {
00315     compl_subg = compl_subgoal_ptr(ComplStkFrame);
00316     if( !is_completed(compl_subg) )
00317     {
00318         mark_as_completed(compl_subg);
00319         WakeDependentThreads(th, compl_subg);
00320     }
00321     reclaim_incomplete_table_structs(compl_subg);
00322     ComplStkFrame = next_compl_frame(ComplStkFrame);
00323   } /* while */
00324   openreg = prev_compl_frame(leader);
00325   reclaim_stacks(breg);
00326   breg = tcp_prevbreg(breg);
00327 }
00328 
00329 CPtr sched_external( th_context *th, CPtr ExtCons )
00330 {
00331     CPtr sched_cons = NULL ;
00332     VariantSF ExtProd ;
00333 
00334     if ( ALN_Next(nlcp_trie_return(ExtCons)) )
00335         sched_cons = ExtCons ;
00336 
00337     ExtProd = (VariantSF)nlcp_subgoal_ptr(ExtCons) ;
00338     if( !is_completed(ExtProd) )
00339         check_ins_subg(&th->TDL, ExtProd) ;
00340 
00341     return sched_cons ;
00342 }
00343 
00344 #endif

Generated on Wed Jul 26 13:30:40 2006 for XSB by  doxygen 1.4.5