/* Exports: set_num_slaves(), par_call(), finalize(), send_msg(), recv_msg(), declare_recv_buffer(), get_last_tag(), get_last_source(), get_last_size(), probe(), is_master(), myid(), runtime(), wait_a_little( ), attach_new_slave() [ always returns 0 here; for LOADER/MPI ] */ /* Imports (SGI): task.h, ulocks.h, nanosleep() m_set_procs(), m_get_numprocs(), m_fork(), m_get_myid() */ /* Imports (SunOS): lwp/{lwp.h,stackdep.h,lwpmachdep.h}, lwp_sleep(), lwp_create(); Could use: lwp_suspend(SELF), lwp_resume() ?? */ /* Imports (SunOS): , nanosleep(), thr_create(); Could use: thr_suspend(thr_self()), thr_continue() ?? */ /* lwp_create( &tid, lwp_funcall, pod_getmaxpri(), LWPSERVER, STKTOP(lwp_stack), 1, hdCall); lwp_resume(tid); #include int thr_create(void *stack_base, size_t stack_size, void *(*start_routine)(void *), void *arg, long flags, thread_t *new_thread); */ /* Change solaris to sunos if necessary */ #ifdef sun #define solaris #endif #ifdef __convex__ #include /* for stderr */ #include /* for cnx_thread_create() */ #include /* for cnx_thread_create() */ #include /* for getrusage() */ #include /* for getrusage() */ #define thread_t cnx_tid_t #define stkalign_t double #ifndef MINSTACKSZ #define MINSTACKSZ 1280 #endif #define thr_self cnx_thread_self /* sunos-like code */ #define lwp_self cnx_thread_self /* solaris-like code */ #define lwp_resume(arg) /* solaris-like code */ #define thr_continue(tid) cnx_thread_unblock(tid, NULL, NULL) /* solaris-like code */ #endif #ifdef sgi #include #include #endif #if defined(sun) #include #include #include #include /* #include #include #include */ #endif #if defined(sun) || defined(__convex__) static num_slaves; #if defined(sunos) || defined(__convex__) #define STACKSZ MINSTACKSZ * sizeof (stkalign_t) + 10000 /* This means we're limited to at most NUMPROC processes */ #define MAXNUMPROC 32 static stkalign_t lwp_stack[MAXNUMPROC*STACKSZ]; static stkalign_t *stack = lwp_stack; #endif #if defined(sunos) /* But this SunOS version doesn't work until after m_fork() (no m_set_procs())*/ #define MAX_NUM_SLAVES 10000 thread_t myid_vec[MAX_NUM_SLAVES]; /* WOULD HAVE USED (BUT PROBLEM AS SLAVES EXIT): lwp_enumerate(NULL,0); */ int m_get_numprocs() {return num_slaves+1;} int m_get_myid() { thread_t myid; int i; lwp_self(&myid); for ( i = 0; i < MAX_NUM_SLAVES ; i++ ) if (SAMETHREAD(myid_vec[i], myid)) return(i); printf("error in myid_vec: %d not found\n", myid ); exit(1); } int m_fork(routine, get_task, do_task, get_task_result, update_environment) void (*routine)(), (*get_task)(), (*do_task)(), (*get_task_result)(), (*update_environment)(); { static int i = 0; /* static in case m_fork() called a second time */ thread_t tid; for ( ; i < num_slaves; i++ ) { #if defined(sunos) lwp_create( &tid, routine, pod_getmaxpri(), LWPSERVER, STKTOP(stack), 4, get_task, do_task, get_task_result, update_environment); #elif defined(__convex__) tid = cnx_thread_create(CNX_ANY_NODE, mmap_stack_0, (void *(*)(void *))call_args, CNX_USER_THREAD); #endif if (tid == (thread_t) -1) perror("thr_create"); lwp_resume(tid); stack = stack + STACKSZ; } } #endif #if defined(solaris) || defined(__convex__) #define MAX_NUM_SLAVES 10000 thread_t myid_vec[MAX_NUM_SLAVES]; /* WOULD HAVE USED (BUT PROBLEM AS SLAVES EXIT): lwp_enumerate(NULL,0); */ int numprocs; int m_get_numprocs() {return numprocs;} int m_get_myid() { thread_t myid; int i; myid = thr_self(); for ( i = 0; i <= MAX_NUM_SLAVES ; i++ ) if (myid_vec[i] == myid) return(i); printf("error in myid_vec: %d not found\n", thr_self() ); exit(1); } static void *args[6]; /* On convex, passed in globally */ int m_fork(routine, get_task, do_task, get_task_result, update_environment) void (*routine)(), (*get_task)(), (*do_task)(), (*get_task_result)(), (*update_environment)(); { thread_t tid; int i = 1; void call_args(); args[0] = (void *)routine; args[1] = (void *)get_task; args[2] = (void *)do_task; args[3] = (void *)get_task_result; args[4] = (void *)update_environment; args[5] = (void *)NULL; /* not used */ numprocs = 1; for ( ; i <= num_slaves; i++ ) { numprocs++; #if defined(solaris) if (thr_create(NULL, 0, (void *(*)(void *))call_args, (void *)args, THR_NEW_LWP | THR_SUSPENDED, &tid)) { perror("thr_create"); exit(1); } #elif defined(__convex__) tid = cnx_thread_create(CNX_ANY_NODE, stack, (void *(*)(void *))call_args, CNX_USER_THREAD); /* errno = 11, no more processes */ if (tid == (thread_t) -1) { perror("thr_create"); exit(1); } else stack = stack + STACKSZ; #endif myid_vec[i] = tid; /* wait until his tid is set */ if (0 != thr_continue( tid )) perror("thr_continue(tid)"); } /* We get a core dump if this pause isn't here. WHY? */ {void wait_a_little(); wait_a_little();} myid_vec[0] = thr_self(); (*routine)(get_task, do_task, get_task_result, update_environment); } /* Probably should use convex technique for solaris, too. */ #if defined(solaris) void call_args(args) void *args[5]; { (*(void (*)())(args[0]))(args[1],args[2],args[3],args[4]); } #elif defined(__convex__) void call_args(tid) thread_t tid; { if (-1 == cnx_thread_block( 0, NULL)) perror("cnx_thread_block"); (*(void (*)())(args[0]))(args[1],args[2],args[3],args[4]); cnx_thread_exit(); } #endif #endif #endif /* This version depends on fact that master and slave take turns communicating. ACK would need to be used for more general communication. */ #ifdef sgi #include #include #include #endif /* #define DEBUG */ #ifdef DEBUG #define TRACE #else #define TRACE if (0) #endif /* Courtesy of .h files in gcl */ #ifdef sgi #define ON_C_STACK(x) ((((int)x) << 1) < 0) /* irix */ #endif #ifdef ibm #define ON_C_STACK(x) (((unsigned int) x) >= 0x2f000000 ) /* rios */ #endif #ifdef NeXT_m68k #define ON_C_STACK(x) (((unsigned int) x) >= 0x3f00000 ) /* NeXT_m68k */ #endif #ifdef NeXT_i386 #define ON_C_STACK(x) (((unsigned int) x) >= 0xbff00000 ) /* NeXT_i386 */ #endif #ifdef alpha #define ON_C_STACK(x) ((long)x < TBEGIN) #endif #ifndef ON_C_STACK #define ON_C_STACK(x) ((int)x < 0) #endif void wait_a_little() /* solaris can use nanosleep */ #if defined(sgi) || defined(sun) { struct timespec rqtp; rqtp.tv_sec = 0; rqtp.tv_nsec = 10000; nanosleep(&rqtp, NULL); } #elif defined(__convex__) { int dummy; /* In my experiments, the constant use of system calls below results in unacceptably low use of CPU time. Hence, busy waiting is preferred. */ /* cnx_thread_block(10, &dummy); */ } #elif defined(sunos) { struct timeval tval; tval.tv_usec = 10; tval.tv_sec = 0; lwp_sleep( &tval ); } #else { sleep(1); } /*1 sec too coarse, of course*/ #endif /* This measures time on a single thread. */ #ifdef sun #include #include #endif int runtime() /* return time in ms */ #ifdef sun { struct lwpinfo buf; _lwp_info(&buf); return 1000 * buf.lwp_utime.tv_sec + buf.lwp_utime.tv_nsec/1000000; } #else { struct rusage buf; /* getrusage is BSD */ if ( getrusage( RUSAGE_SELF, &buf ) ) perror("runtime"); return buf.ru_utime.tv_sec*1000 + buf.ru_utime.tv_usec/1000; } #endif static void error(string) char *string; { fprintf( stderr, "threads.c: "); fprintf( stderr, string); fprintf( stderr, "\n"); exit(1); } int is_master() { return m_get_myid() == 0;} int myid() { return m_get_myid();} int set_num_slaves(number) int number; #ifdef sgi { if (number > 0) m_set_procs(number+1); return m_get_numprocs() - 1; } #else { if (number <= 0) number = 4; /* 4 is arbitrary */ #ifdef solaris thr_setconcurrency(number); /* Advise desired number of active threads */ #endif num_slaves = number; return number; } #endif #define MSG_TBL_SIZE 100 #define MASTER 0 #define SLAVE 1 #define NOBODY -1 #define ACK -2 /* sender must be entered only after other parts of msg are ready */ typedef struct { int sender; /* Must be MASTER, SLAVE, or NOBODY (ACK not used now) */ int tag; void *ptr; } msg; static msg msg_tbl[MSG_TBL_SIZE]; #if 0 void check_tbl () {printf("msg_tbl[1..2].sender: msg_tbl[1].sender: %d, msg_tbl[2].sender: %d\n", msg_tbl[1].sender, msg_tbl[2].sender); } #endif /* BUG: These are shared among all slaves */ int mas_last_tag; int mas_last_source; int get_last_tag() { int myid = m_get_myid(); if (myid == 0) return mas_last_tag; else return msg_tbl[myid].tag; } int get_last_source() { int myid = m_get_myid(); if (myid == 0) return mas_last_source; else return 0; /* 0 == master */ } int get_last_size() {return -1;} /* STILL NOT DEFINED */ static int curr_tag = 0; /* set only by master */ /* IMPORTANT: msg_ptr must point to global, and not to master's stack*/ void send_msg(msg_ptr, dest, tag) void *msg_ptr; int dest, tag; { int me = MASTER; TRACE printf("IN: dest: %d, m_get_myid(): %d, me: %d, MASTER: %d, msg_tbl[dest].sender: %d\n", dest, m_get_myid(), me, MASTER, msg_tbl[dest].sender); if (dest == 0) /* if I'm the slave, sending to master */ { dest = m_get_myid(); /* use my msg buf to talk to master */ me = SLAVE; } TRACE printf("dest: %d, m_get_myid(): %d, me: %d, MASTER: %d, msg_tbl[dest].sender: %d\n", dest, m_get_myid(), me, MASTER, msg_tbl[dest].sender); if (msg_tbl[dest].sender == me) { printf("sender: %d; me: %d\n", msg_tbl[dest].sender, me); /* printf("dest: %d, m_get_myid(): %d; me: %d\n", dest, m_get_myid(), me); */ wait_a_little(); printf("sender: %d; me: %d\n", msg_tbl[dest].sender, me); abort(); error("send_msg: previous message still pending\n"); } if (ON_C_STACK(msg_ptr)) error("send_msg: data on C stack\n"); msg_tbl[dest].tag = tag; msg_tbl[dest].ptr = msg_ptr; msg_tbl[dest].sender = me; } void declare_recv_buf(pointer,size) char *pointer; int size; { ;} /* On shared memory, recv_buf will be same as send_buf */ void *recv_msg() { int i, size = m_get_numprocs(), myid = m_get_myid(); TRACE printf("RECV_REPLY begin\n"); if (myid == 0) /* if we're master, receiving from slave */ while (1) { for( i=1; i < size; i++ ) if (msg_tbl[i].sender == SLAVE) { mas_last_tag = msg_tbl[i].tag; mas_last_source = i; msg_tbl[i].sender = NOBODY; TRACE printf("RECV_REPLY done in myid %d\n", i); return msg_tbl[i].ptr; } TRACE printf("RECV_REPLY for 1: sender: %d\n", msg_tbl[1].sender); wait_a_little(); } else /* else we're the slave, receiving from master */ { while (msg_tbl[myid].sender != MASTER) wait_a_little(); return msg_tbl[myid].ptr; } } int probe(slave) int slave; { /* printf("probe(%d): %d", slave, msg_tbl[slave].sender); */ return (msg_tbl[slave].sender == SLAVE || msg_tbl[slave].sender == NOBODY); } #if 0 #define DIETAG -1 void finalize() { int i, size = m_get_numprocs(); for (i=1; i < size; i++) { while(msg_tbl[i].sender == MASTER) wait_a_little(); /* prev. msg. pending */ TRACE printf("FINALIZE: id: %d, sender: %d, tag: %d\n", i, msg_tbl[i].sender, msg_tbl[i].tag); msg_tbl[i].tag = DIETAG; msg_tbl[i].sender = MASTER; } } #endif void finalize() { return;} void par_call(); static int in_par_call = 0; void par_call(routine, get_task, do_task, get_task_result, update_environment) void (*routine)(), (*get_task)(), (*do_task)(), (*get_task_result)(), (*update_environment)(); { if (in_par_call) error("recursive call to par_call"); {int i; for (i=0; i