00001 #include "xsb_config.h"
00002
00003 #ifdef CONC_COMPL
00004
00005 #include <pthread.h>
00006
00007 #include "context.h"
00008 #include "basicdefs.h"
00009 #include "macro_xsb.h"
00010 #include "cell_xsb.h"
00011 #include "choice.h"
00012 #include "conc_compl.h"
00013 #include "tables.h"
00014 #include "tr_utils.h"
00015 #include "thread_xsb.h"
00016 #include "error_xsb.h"
00017
00018 #define GetDepSubgoal(d) ((d)->Subgoal)
00019 #define GetDepLast(d) ((d)->last)
00020 #define GetDepTid(d) (subg_tid((d)->Subgoal))
00021
00022 static ThreadDep * find_tid( ThreadDepList *TDL, int tid )
00023 {
00024 int i ;
00025 i = 0 ;
00026 while ( i < TDL->NumDeps )
00027 if ( GetDepTid(&TDL->Deps[i]) == tid )
00028 return &TDL->Deps[i] ;
00029 else
00030 i++ ;
00031 return NULL ;
00032 }
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042 static int check_ins_subg( ThreadDepList *TDL, VariantSF subg )
00043 { int i ;
00044 i = 0 ;
00045 while ( i < TDL->NumDeps )
00046 { if ( GetDepTid(&TDL->Deps[i]) == subg_tid(subg) )
00047 { if( subg_compl_stack_ptr(subg) >
00048 subg_compl_stack_ptr(GetDepSubgoal(&TDL->Deps[i])) )
00049 { GetDepSubgoal(&TDL->Deps[i]) = subg ;
00050 return TRUE ;
00051 }
00052 else
00053 return FALSE ;
00054 }
00055 else
00056 i++ ;
00057 }
00058 if( TDL->NumDeps == MAX_TDEP_LIST - 1 )
00059 xsb_abort( "Too many inter-thread dependencies" );
00060
00061 GetDepSubgoal(&TDL->Deps[TDL->NumDeps++]) = subg ;
00062 return TRUE ;
00063 }
00064
00065 int EmptyThreadDepList( ThreadDepList *TDL )
00066 {
00067 return TDL->NumDeps == 0 ;
00068 }
00069
00070 void InitThreadDepList( ThreadDepList *TDL )
00071 {
00072 TDL->NumDeps = 0 ;
00073 }
00074
00075 static void CopyThreadDepList( ThreadDepList *to, ThreadDepList *from )
00076 {
00077 int i ;
00078 to->NumDeps = from->NumDeps ;
00079 for( i = 0 ; i < to->NumDeps ; i++ )
00080 to->Deps[i] = from->Deps[i] ;
00081 }
00082
00083 ThreadDep *GetInitDep( ThreadDepList *TDL )
00084 {
00085 if( TDL->NumDeps == 0 )
00086 return NULL ;
00087 else
00088 return &TDL->Deps[0] ;
00089 }
00090
00091 ThreadDep *GetNextDep( ThreadDepList *TDL, ThreadDep *dep )
00092 {
00093 int i = dep - TDL->Deps ;
00094 i++ ;
00095 if( TDL->NumDeps <= i )
00096 return NULL ;
00097 else
00098 return &TDL->Deps[i] ;
00099
00100 }
00101
00102
00103
00104
00105
00106
00107
00108
00109 static void PropagateDeps( th_context *th, th_context *dep_th,
00110 ThreadDepList *NewTDL, CPtr *leader, int *new_deps )
00111 {
00112 ThreadDep *dep ;
00113 VariantSF sgf ;
00114
00115 dep = GetInitDep(&dep_th->TDL);
00116 while( dep != NULL )
00117 { sgf = GetDepSubgoal(dep) ;
00118 if( subg_tid(sgf) == th->tid )
00119 {
00120 if( subg_compl_stack_ptr(sgf) > *leader )
00121 *leader = subg_compl_stack_ptr(sgf) ;
00122 }
00123 else
00124 if( check_ins_subg(NewTDL, sgf) )
00125 *new_deps = TRUE ;
00126 dep = GetNextDep(&dep_th->TDL, dep);
00127 }
00128 }
00129
00130
00131
00132
00133
00134
00135
00136
00137
00138
00139
00140
00141
00142 void UpdateDeps(th_context *th, int *busy, CPtr *leader)
00143 {
00144 ThreadDepList NewTDL;
00145 int new_deps ;
00146 VariantSF sgf;
00147 ThreadDep *dep, *dep1 ;
00148 th_context * dep_th ;
00149
00150 InitThreadDepList( &NewTDL ) ;
00151 *busy = FALSE ;
00152
00153 do
00154 {
00155 new_deps = FALSE ;
00156 dep = GetInitDep(&th->TDL);
00157 while( dep != NULL )
00158 { sgf = GetDepSubgoal(dep) ;
00159 if( !is_completed(sgf) )
00160 {
00161 check_ins_subg(&NewTDL, sgf) ;
00162 dep1 = find_tid(&NewTDL,subg_tid(sgf));
00163 dep_th = find_context(subg_tid(sgf)) ;
00164 if( dep_th->completing )
00165 GetDepLast(dep1) = dep_th->last_ans ;
00166 else
00167 GetDepLast(dep1) = 0 ;
00168 if( dep_th->completing )
00169 PropagateDeps( th, dep_th, &NewTDL, leader, &new_deps ) ;
00170 else
00171 *busy = TRUE ;
00172 }
00173 dep = GetNextDep(&th->TDL, dep);
00174 }
00175 CopyThreadDepList( &th->TDL, &NewTDL ) ;
00176 }
00177 while( new_deps ) ;
00178 }
00179
00180
00181
00182
00183
00184
00185
00186
00187
00188
00189 int MayHaveAnswers( th_context * th )
00190 {
00191 th_context * dep_th, *th1 ;
00192 ThreadDep *dep, *dep1 ;
00193 int tid, tid1 ;
00194 int rc = FALSE ;
00195
00196 dep = GetInitDep(&th->TDL);
00197 while( dep != NULL )
00198 { tid = GetDepTid(dep) ;
00199 dep_th = find_context(tid) ;
00200 dep1 = GetInitDep(&dep_th->TDL);
00201 while( dep1 != NULL )
00202 { tid1 = GetDepTid(dep1) ;
00203 th1 = find_context(tid1) ;
00204 if( GetDepLast(dep1) < th1->last_ans )
00205 rc = TRUE ;
00206 dep1 = GetNextDep(&dep_th->TDL, dep1);
00207 }
00208 dep = GetNextDep(&th->TDL, dep);
00209 }
00210 return rc ;
00211 }
00212
00213
00214
00215
00216
00217
00218
00219
00220
00221
00222
00223
00224
00225 int CheckForSCC( th_context * th )
00226 {
00227 th_context * dep_th ;
00228 ThreadDep *dep, *dep1 ;
00229 int tid, tid1 ;
00230 VariantSF sgf;
00231
00232 dep = GetInitDep(&th->TDL);
00233 while( dep != NULL )
00234 { tid = GetDepTid(dep) ;
00235 sgf = GetDepSubgoal(dep) ;
00236 dep_th = find_context(tid) ;
00237 if(dep_th->cc_leader < subg_compl_stack_ptr(sgf))
00238 return FALSE ;
00239 dep1 = GetInitDep(&dep_th->TDL);
00240 while( dep1 != NULL )
00241 { tid1 = GetDepTid(dep1) ;
00242 if( tid1 != th->tid && !find_tid(&th->TDL,tid1) )
00243 return FALSE ;
00244 dep1 = GetNextDep(&dep_th->TDL, dep1);
00245 }
00246 dep1 = GetInitDep(&th->TDL);
00247 while( dep1 != NULL )
00248 { tid1 = GetDepTid(dep1) ;
00249 if( tid1 != dep_th->tid && !find_tid(&dep_th->TDL,tid1) )
00250 return FALSE ;
00251 dep1 = GetNextDep(&th->TDL, dep1);
00252 }
00253 dep = GetNextDep(&th->TDL, dep);
00254 }
00255 return TRUE ;
00256 }
00257
00258 void CompleteOtherThreads( th_context * th )
00259 {
00260 th_context * dep_th ;
00261 ThreadDep *dep ;
00262 VariantSF sgf ;
00263
00264 dep = GetInitDep(&th->TDL);
00265 while( dep != NULL )
00266 { sgf = GetDepSubgoal(dep) ;
00267 dep_th = find_context(subg_tid(sgf)) ;
00268 CompleteTop(dep_th, dep_th->cc_leader) ;
00269 dep_th->completed = TRUE ;
00270 pthread_cond_signal(&dep_th->cond_var) ;
00271 dep = GetNextDep(&th->TDL, dep);
00272 }
00273 }
00274
00275 void WakeOtherThreads( th_context * th )
00276 {
00277 th_context * dep_th ;
00278 ThreadDep *dep ;
00279 int tid;
00280
00281 dep = GetInitDep(&th->TDL);
00282 while( dep != NULL )
00283 { tid = GetDepTid(dep) ;
00284 dep_th = find_context(tid) ;
00285 pthread_cond_signal(&dep_th->cond_var) ;
00286 dep = GetNextDep(&th->TDL, dep);
00287 }
00288 }
00289
00290 void WakeDependentThreads( th_context * th, VariantSF subg )
00291 {
00292 CPtr Cons ;
00293 th_context *cons_th ;
00294
00295 Cons = subg_asf_list_ptr(subg) ;
00296 while(Cons)
00297 {
00298 if( int_val(nlcp_tid(Cons)) != th->tid )
00299 { cons_th = find_context(int_val(nlcp_tid(Cons))) ;
00300 pthread_cond_signal(&cons_th->cond_var) ;
00301 }
00302 Cons = nlcp_prevlookup(Cons) ;
00303 }
00304 }
00305
00306 void CompleteTop( th_context * th, CPtr leader )
00307 {
00308 VariantSF compl_subg;
00309 CPtr ComplStkFrame = leader;
00310
00311
00312
00313
00314 while (ComplStkFrame >= openreg) {
00315 compl_subg = compl_subgoal_ptr(ComplStkFrame);
00316 if( !is_completed(compl_subg) )
00317 {
00318 mark_as_completed(compl_subg);
00319 WakeDependentThreads(th, compl_subg);
00320 }
00321 reclaim_incomplete_table_structs(compl_subg);
00322 ComplStkFrame = next_compl_frame(ComplStkFrame);
00323 }
00324 openreg = prev_compl_frame(leader);
00325 reclaim_stacks(breg);
00326 breg = tcp_prevbreg(breg);
00327 }
00328
00329 CPtr sched_external( th_context *th, CPtr ExtCons )
00330 {
00331 CPtr sched_cons = NULL ;
00332 VariantSF ExtProd ;
00333
00334 if ( ALN_Next(nlcp_trie_return(ExtCons)) )
00335 sched_cons = ExtCons ;
00336
00337 ExtProd = (VariantSF)nlcp_subgoal_ptr(ExtCons) ;
00338 if( !is_completed(ExtProd) )
00339 check_ins_subg(&th->TDL, ExtProd) ;
00340
00341 return sched_cons ;
00342 }
00343
00344 #endif