/* NOTE: mpi_datatype set to MPI_INT; So, all messages must be arrays of int's. Could also set mpi_datatype to MPI_CHAR, MPI_FLOAT? */ /* Exports: set_num_slaves(), get_num_slaves(), par_call(), finalize(), send_msg(), recv_msg(), declare_recv_buffer(), get_last_tag(), get_last_source(), get_last_size(), probe(), is_master(), myid(), runtime(), wait_a_little(), error_if_msg_on_stack() [for MSG()], attach_new_slave() [depends on macro, LOADER, defined by make] */ /* probe() is a misnomer; really, probe() checks if slave arg is free */ #include "mas-slave.h" #include "mpi.h" /* for getrusage(): */ #include #include #include /* for NULL */ #if 0 #include /* hack for MPI_Init forgetting curr. dir. */ #include /* For printf, sprintf */ #include /* For restoring AKCL signal handler */ #include /* For setitimer */ #include /* For mygetcwd() */ #include /* For getpriority(), setpriority(), setrlimit() */ #include /* For getpid() inside nice() */ #endif /* In future version, can be set otherwise by init_master_slave() */ /* IMPORTANT: MPI_INT, etc. are not C constants; set only after MPI_Init */ static MPI_Datatype mpi_datatype; static MPI_Status last_status; int get_last_source() { return last_status.MPI_SOURCE; } int get_last_tag() { return last_status.MPI_TAG; } int get_last_size() { int count; MPI_Get_count(&last_status, MPI_BYTE, &count); return count; } /* count in units of bytes is same as size */ #if 0 static MPI_Datatype MPI_type[3]; void MPI_Init_Datatype() { /* must be set only after running MPI_Init; MPI_BYTE is var, not const */ MPI_type[0] = MPI_BYTE /* STRING-CHAR */; MPI_type[1] = MPI_INT /* FIXNUM */; MPI_type[2] = MPI_FLOAT /* */;} #endif void finalize() { MPI_Finalize(); } int myid() { int rank = 0; MPI_Comm_rank(MPI_COMM_WORLD, &rank); return(rank); } int is_master() { return myid() == 0; } int get_num_slaves() { int size; MPI_Comm_size(MPI_COMM_WORLD, &size); return(size - 1); } static void *mas_slave_stack_loc; /* Initialized by par_call() */ void error_if_msg_on_stack(msg) void *msg; { int stack_loc2; if (!is_master()) if ( (msg < (void *)&stack_loc2) && (msg > mas_slave_stack_loc) || (msg > (void *)&stack_loc2) && (msg < mas_slave_stack_loc) ) { printf("TOPC: error(do_task): MSG() returned pointer into stack.\n"); printf(" Buffer referenced by MSG() should either be static or global.\n"); exit(1); } } /* count parameter necessary for distributed memory, but not for shared mem */ static int send_buf[2]; void send_msg(msg_ptr, dest, tag) /* should have count parameter */ void *msg_ptr; int dest, tag; { if ( mas_slave_is_msg ) { #ifdef DEBUG printf("send: to dest: %d; tag: %d; mas_slave_is_msg: 1;\n count: %d; mpi_datatype: MPI_BYTE(%d); send_buf[0]: %d\n", dest, tag, mas_slave_msg_size, MPI_BYTE, *(int *)mas_slave_msg_ptr); #endif if (msg_ptr != mas_slave_msg_ptr) { printf("send_msg: get_task/do_task must return MSG() or not call it.\n"); exit(1); } MPI_Send( mas_slave_msg_ptr, mas_slave_msg_size, MPI_BYTE, dest, tag, MPI_COMM_WORLD); } else { send_buf[0] = MARKER; send_buf[1] = INT(msg_ptr); #ifdef DEBUG printf("send: to dest: %d; tag: %d; mas_slave_is_msg: 0; count: %d; mpi_datatype: MPI_INT; send_buf[0]\n", dest, tag, 2, *(int *)send_buf); #endif MPI_Send( send_buf, 2, MPI_INT, dest, tag, MPI_COMM_WORLD); } /* Initialize global flags before next call to set_task(), get_task(), do_task(); WHAT ABOUT update_environment()?? */ mas_slave_is_msg = FALSE; /* MSG() causes it to be reset to TRUE */ } #define RECV_BUF_SIZE 10000 /* This should be large enough for common appl's */ /* If int changed to char, slave stalls at MPI_Init(). Why? */ static int recv_buf[RECV_BUF_SIZE]; static int *recv_buf_ptr = recv_buf; static int recv_buf_size = RECV_BUF_SIZE; void declare_recv_buf(ptr, size) int *ptr; int size; { recv_buf_ptr = ptr; recv_buf_size = size; } void *recv_msg() { int count, error; MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &last_status); /* MAN PAGE SAYS THIS SHOULD BE last_status, NOT &last_status ? */ MPI_Get_count(&last_status, MPI_BYTE, &count); if ( count > recv_buf_size ) { printf("error: recv_msg(processor ~d): msg length ~d", myid(), count); printf(" too large for buffer.\n Call: declare_recv_buf(ptr,size)\n"); printf(" with larger size before sending such a large message.\n"); } /* recv from last_source, in case source used to be MPI_ANY_SOURCE */ /* Then rely on non-overtaking messages to insure that MPI_Recv */ /* sees the same message as MPI_Probe */ error = MPI_Get_count(&last_status, mpi_datatype, &count); if (error) { /* But this doesn't seem to trigger under standard */ printf("recv_msg: datatype %d inconsistent with count %d\n", mpi_datatype, count); exit(1); } #ifdef DEBUG printf("recv: count: %d; mpi_datatype: %d; MPI_INT: %d\n", count, mpi_datatype, MPI_INT); printf("recv(cont): from source: %d; tag: %d\n", get_last_source(), get_last_tag()); *(int *)recv_buf_ptr = -9999; /* See if overwritten */ fflush(stdout); #endif MPI_Recv(recv_buf_ptr, count, mpi_datatype, get_last_source(), MPI_ANY_TAG, MPI_COMM_WORLD, &last_status); /* HACK: MARKER in first position means int passed */ /* This hack should be replaced by using tags to indicate type of msg */ #ifdef DEBUG printf("recv(cont2): recv_buf[0]: %d at 0x%x\n", *(int *)recv_buf_ptr, recv_buf_ptr); printf("return: mas_slave_msg_is_int: %d\n", mas_slave_msg_is_int);fflush(stdout); #endif mas_slave_msg_is_int = ( recv_buf_ptr[0] == MARKER ); return ( ( recv_buf_ptr[0] == MARKER ? (void *)recv_buf_ptr[1] : (void *)recv_buf_ptr ) ); } int runtime() { struct rusage buf; /* getrusage is BSD */ if ( getrusage( RUSAGE_SELF, &buf ) ) perror("runtime"); return buf.ru_utime.tv_sec*1000 + buf.ru_utime.tv_usec/1000; } /* used only in definition of ms_barrier in mas-slave.c; ms_barrier() isn't useful for distributed memory. */ void wait_a_little() { return; } int probe() { return 1; } #ifdef LOADER int attach_new_slave() { if ( poll_new_slaves() ) { /* DEFINED BY LOADER */ MPI_Spawn2(); /* DEFINED BY MPINU */ return(1); } else return(0); } #else /* Else loader shouldn't be linked in if we don't use above symbols */ int attach_new_slave() { return(0); } #endif #if 0 ;;;;; UTILITIES -- left over from old LISP version. ;;;;; It will be converted to C, or deleted. ;;;chdir, getpwd not used?? (defglue (getcwd (&optional (string (make-fill-string 256)))) (getcwd-glue string) ((object "mygetcwd") (object string)) ("char *cwd, *getcwd(), *buf" ;;if (type_of(vs_base[0]) == t_array) { ;; vs_base[0]->a.a_dims[0] ;; } else { ;; vs_base[0]->v.v_dim } ;; "printf(\"string length (dim): %d\\n\", string->st.st_dim)" ;; "if (string->st.st_dim < 3) {printf(\"getcwd: needs string arg at least len 3\\n\"); return Cnil; }" ;; "printf(\"getcwd: %s\\n\", string->st.st_self)" ;; "fflush(stdout)" ;; "string->st.st_self[string->st.st_dim-3] = 0" ;; "buf = \" \"" "if ((cwd = getcwd(string->st.st_self, string->st.st_dim-2)) == NULL) { perror (\"pwd\"); return Cnil; }" "if (string->v.v_hasfillp) (string->st.st_fillp)=strlen(string->st.st_self)" "printf(\"getcwd: %s\\n\", string->st.st_self)" "return string") "getcwd (&optional string); returns string with pathname") (defglue (chdir (object path)) ((object "mychdir") (object path)) ("if (chdir((unsigned char *)path->st.st_self) == -1) {perror (\"chdir\"); return Cnil;}" "return Ct") "returns t if chdir was successful, else nil") (defentry getcwd (object) (object mygetcwd)) (defentry chdir (object) (object mychdir)) (defglue (nice (int prio)) ((int "nice") (int prio)) ("int oldprio, success" "oldprio = getpriority(PRIO_PROCESS,getpid())" "success = setpriority(PRIO_PROCESS,getpid(),prio)" "if (success == -1) {perror(\"nice\"); return -1000;}" "return oldprio") "Changes priority to value of -20 to 20. higher means lower priority. initial default is priority 0. Result is previous value of priority.") (defglue (limit-rss (int size)) ((int "limit_ram") (int size)) ("int oldsize, success" "struct rlimit rlp" "success = getrlimit(RLIMIT_RSS, &rlp)" "if (success == -1) {perror(\"limit-rss\"); return -1000;}" "oldsize = rlp.rlim_cur" "rlp.rlim_cur = size" "success = setrlimit(RLIMIT_RSS, &rlp)" "if (success == -1) {perror(\"limit-rss\"); return -1000;}" "return oldsize") "Changes RSS (Resident Set Size) limit (in bytes) and returns last setting. Try \"ps aux\" or \"man ps\" for description of RSS.") #endif /* mas-slave.c calls this from inside init_master_slave() */ static char *array[256]; /* Substitute argv[] if default procgroup file added */ int set_num_slaves(num_slaves, argc_ptr, argv_ptr) int num_slaves, *argc_ptr; char ***argv_ptr; { int bool, i, argc = 1; char **argv; MPI_Initialized( &bool ); if ( bool ) return -1; if ( argc_ptr == NULL ) { printf("master_slave: Warning: init_master_slave() not called.\n"); printf(" MPI had added extra command line args at end that cannot be removed.\n"); printf(" If this is a problem, add:\n"); printf(" init_master_slave(num_threads, argc_ptr, argv_ptr;\n"); printf(" int num_threads, *argc_ptr; char ***argv_ptr;\n"); printf(" before master_slave(); [num_threads used only for SMP version]\n"); exit(1); /* No easy patch; master had sent host/port on command line */ argc_ptr = &argc; argv[0] = "unknown_program"; argv[1] = NULL; argv_ptr = &argv; } #if 0 else { /* This part is specific to MPICH -- It's commented out */ argc = *argc_ptr; bool = 1; /* 1 means add default procgroup file */ for ( i = 0; i < argc; i++ ) { if ( ! strcmp( (*argv_ptr)[i], "-p4amslave" ) ) bool = 0; /* slave okay */ if ( ! strcmp( (*argv_ptr)[i], "-p4pg" ) ) bool = 0; /* procgroup okay */ } if ( bool ) { /* if master needs default procgroup file */ for ( i = 0; i < argc; i++ ) array[i] = (*argv_ptr)[i]; /* We lie by adding on these two extra command-line args */ array[argc++] = "-p4pg"; array[argc++] = "procgroup"; /* add default if "-p4pg" missing */ *argv_ptr = array; *argc_ptr = argc; } } #endif /* MPI_Init() will lie by deleting "p4" args, after it uses them */ MPI_Init(argc_ptr, argv_ptr); #ifdef LOADER if (is_master()) init_loaderctl(argc_ptr, argv_ptr); #endif mpi_datatype = MPI_BYTE; /* HACK: marker: MPI_BYTE is a large random number in early MPICH; but it varies on different processors and in different sessions; better way to do it when there's time */ MPI_Comm_size(MPI_COMM_WORLD, &i); return i - 1; } /* If MPI, slaves were forked off by init_master_slave() -> set_num_slaves() */ void par_call(routine, get_task, do_task, get_task_result, update_environment) void (*routine)(), (*get_task)(), (*do_task)(), (*get_task_result)(), (*update_environment)(); { mas_slave_stack_loc = &routine; /* Need any address on stack */ /* Used by error_if_msg_on_stack() */ (*routine)(get_task, do_task, get_task_result, update_environment); } /* finalize used inside master_slave_stats ? */