Page MenuHomec4science

Mandelbrot_single_thread_master_worker.c
No OneTemporary

File Metadata

Created
Thu, Jun 6, 05:25

Mandelbrot_single_thread_master_worker.c

/*********************************************************************************
* Copyright (c) 2018 *
* Ahmed Eleliemy <ahmed.eleliemy@unibas.ch> *
* University of Basel, Switzerland *
* All rights reserved. *
* This code is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. *
* This program is free software; you can redistribute it and/or modify it *
* under the terms of the license (GNU LGPL) which comes with this package. *
*********************************************************************************/
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
#include <math.h>
#include <complex.h>
#include "dls_core.h"
#include "SpinDataStructures.h"
#include "SpinCalculator.h"
#include "ObjectLoader.h"
#define MIN(a,b) (((a)<(b))?(a):(b))
#define MASTER 0
//#define DATAOWNER 0
//#define x_max -0.26
//#define x_min -1.26
//#define y_max 1
//#define y_min 0
#define x_max 0.5
#define x_min -0.5
#define y_max 0.5
#define y_min -0.5
#ifndef CHECK_PERIOD
#define CHECK_PERIOD 1
#endif
#define REQ_WORK 1000
#define ASSI_WORK 1001
#define TERMINATE 1002
double * weights;
//-------- function headers -------
void parallel_single_thread_master_test(int argc, char** argv);
void single_thread_master(int argc, char ** argv, int number_of_workers, int id);
void worker(int argc, char ** argv, int number_of_workers, int id);
void perform_static_assignment(struct Scheduling_params * params, double * total_calc_time, int * executed_at_id, int * images, int image_width,int iterations);
void perform_dynamic_assignment(struct Scheduling_params * params, double * total_calc_time, int * executed_at_id, int id, int * images, int image_width, int iterations);
void request_static_assignment (int id, int number_of_tasks, double *total_calc_time,int * images,int image_width,int iterations);
void request_dynamic_assignment(int number_of_tasks, double * total_request_time, double * total_calc_time, int * images, int image_width, int iterations);
void Mandelbrot(int start, int end, int * computed_data,int image_width, int iterations);
//--------- main -------
int main (int argc, char** argv)
{
parallel_single_thread_master_test(argc,argv);
return 0;
}
//------- master-worker functions ---------------
void single_thread_master(int argc, char ** argv, int number_of_workers, int id)
{
// variables for scheduling
int flag=0;
MPI_Status status;
int terminated=0;
int master_is_terminated=0;
int my_start_index=0;
int my_end_index=0;
struct Scheduling_params params;
char dummy='\0';
int indices [2];
if(argc<4)
{
printf("Usgae: ./program image_width iterations iDLSMethods(0-STATIC 1-SS 2-FSC 3-GSS 4-FAC 5-TSS)\n");
MPI_Abort(MPI_COMM_WORLD, -1);
}
int number_of_tasks=atoi(argv[1])*atoi(argv[1]);
int method=atoi(argv[3]);
int iterations=atoi(argv[2]);
printf("-------------------\n");
printf("Experiment type single-thread non-dedicated master\n");
printf("Number of workers %d \n", number_of_workers);
printf("Number of tasks %d \n", number_of_tasks);
printf("Scheduling method %d \n", method);
printf("------------------- \n");
// variables for the PSIA
int * images = NULL;
int * results=NULL;
params.scheduled_tasks=0;
params.total_tasks=number_of_tasks;
params.total_workers=number_of_workers;
params.step=0;
params.method=method;
params.last_chunk=0;
if(method == 2 && argc == 6)
{
double h = atof(argv[4]);
double sigma = atof(argv[5]);
params.constant=helper_FSC(number_of_workers,number_of_tasks,h,sigma);
}
else if(method ==5 && argc == 6)
{
int F = atoi(argv[4]);
int L = atoi(argv[5]);
int D = helper_TSS(number_of_workers,number_of_tasks, F, L);
params.last_chunk = F + D;
params.constant = D;
params.minimum_chunk=L;
}
else if(method ==5 || method ==2 )
{
printf("Usgae: ./program image_width iterations iDLSMethods(0-STATIC 1-SS 2-FSC 3-GSS 4-FAC 5-TSS) (h,sigma) or (F,L) \n");
MPI_Abort(MPI_COMM_WORLD, -1);
}
else if(method==6)
{
char processor_name[30];
int name_len;
MPI_Get_processor_name(processor_name, &name_len);
params.knl_count= atoi(argv[4]);
params.my_weight= WF_helper(params.knl_count, processor_name, number_of_workers);
//printf("%s my weight %lf\n",processor_name,params.my_weight );
}
double start_time= MPI_Wtime();
images= (int *) calloc(number_of_tasks*3, sizeof(int));
results = (int *) calloc(number_of_tasks*3, sizeof(int));
double start_calc_time=0;
double end_calc_time=0;
double total_calc_time=0;
double total_request_time=0;
double max_loop_time=0;
double min_loop_time=0;
int check_request=0;
int to_execute=0;
int * executed_at_id = (int *) calloc(params.total_workers,sizeof(int));
double execution_times[number_of_workers];
for (int i = 0; i < number_of_workers; i++)
{
executed_at_id[i]=0;
}
if(params.method==0)
{
perform_static_assignment(&params, &total_calc_time,executed_at_id,images,sqrt(number_of_tasks),iterations);
}
else
{
perform_dynamic_assignment(&params,&total_calc_time,executed_at_id,id,images,sqrt(number_of_tasks),iterations);
}
// get data from workers
// printf("master %d spent %f in loop\n",id ,total_calc_time);
MPI_Reduce(images,results,number_of_tasks*3,MPI_INT,MPI_SUM,MASTER,MPI_COMM_WORLD);
double end_time= MPI_Wtime();
MPI_Gather(&total_calc_time,1,MPI_DOUBLE,&execution_times,1,MPI_DOUBLE,MASTER,MPI_COMM_WORLD);
max_loop_time = execution_times[0];
min_loop_time= execution_times[0];
printf("----- ranks calculation times -------\n");
printf("id time(s)\n");
printf("%d %lf\n",id, execution_times[0]);
for (int i = 1; i < number_of_workers; i++)
{
if(max_loop_time < execution_times[i] )
max_loop_time = execution_times[i];
else if(min_loop_time > execution_times[i])
min_loop_time = execution_times[i];
printf("%d %lf\n",i, execution_times[i]);
}
printf("Program execution time %lf\n", end_time-start_time);
printf("Paralell max calculation time %lf\n", max_loop_time);
//printf("Paralell min calculation time %lf\n", min_loop_time);
//for(int i=0;i<number_of_tasks*3;i=i+3)
{
// printf("%d %d %d\n",results[i],results[i+1], results[i+2]);
}
free(executed_at_id);
//printf("average waiting time %lf\n", end_time-start_time);
// free resources
if(results!=NULL)
free(results);
if(images!=NULL)
free(images);
}
void worker (int argc, char ** argv,int number_of_workers, int id)
{
// variables scheduling
char dummy='\0';
int indices [2];
int flag=0;
MPI_Status status;
int number_of_tasks=atoi(argv[1])*atoi(argv[1]);
int method=atoi(argv[3]);
int iterations=atoi(argv[2]);
int * images= (int *) calloc(number_of_tasks*3, sizeof(int));
double start_request_time=0;
double end_request_time =0;
double start_calc_time=0;
double end_calc_time=0;
double total_calc_time=0;
double total_request_time=0;
// printf("worker %d check which method\n",id);
if(method==0)
{
// printf("worker %d will request static assignmnet\n",id);
request_static_assignment(id, number_of_tasks, &total_calc_time,images,sqrt(number_of_tasks),iterations);
}
else
{
request_dynamic_assignment(number_of_tasks, &total_request_time,&total_calc_time,images,sqrt(number_of_tasks),iterations);
}
MPI_Reduce(images,NULL,number_of_tasks*3,MPI_INT,MPI_SUM,MASTER,MPI_COMM_WORLD);
MPI_Gather(&total_calc_time,1,MPI_DOUBLE,NULL,1,MPI_DOUBLE,MASTER,MPI_COMM_WORLD);
if(images!=NULL)
free(images);
}
void perform_static_assignment(struct Scheduling_params * params, double * total_calc_time, int executed_at_id [], int * images,int image_width,int iterations)
{
int terminated=0;
int indices[2];
double start_calc_time =0.0;
double end_calc_time =0.0;
*total_calc_time = 0.0;
char dummy='\0';
MPI_Status status;
while(terminated < params[0].total_workers-1)
{
MPI_Recv(&dummy,1,MPI_CHAR,MPI_ANY_SOURCE,REQ_WORK,MPI_COMM_WORLD,&status);
calculate_next_chunk_stateless(params[0], &indices[0],&indices[1]);
MPI_Send(&indices,2,MPI_INT,status.MPI_SOURCE,ASSI_WORK,MPI_COMM_WORLD);
params[0].scheduled_tasks += indices[1] - indices[0];
params[0].step++;
params[0].last_chunk= indices[1] - indices[0];
terminated++;
executed_at_id[status.MPI_SOURCE]+=indices[1] - indices[0];
//printf("received request from %d assign it %d treminated %d\n",status.MPI_SOURCE,indices[1] - indices[0], terminated );
}
//printf("all workers has been assigned work remaining_tasks %d\n", params[0].total_tasks - params[0].scheduled_tasks );
if(params[0].scheduled_tasks < params[0].total_tasks)
{
calculate_next_chunk_stateless(params[0], &indices[0],&indices[1]);
//printf("master will work on %d\n", indices[1] - indices[0]);
start_calc_time = MPI_Wtime();
Mandelbrot(indices[0], indices[1],images,image_width,iterations);
end_calc_time = MPI_Wtime();
*total_calc_time = end_calc_time - start_calc_time;
executed_at_id[0] += indices[1] - indices[0];
}
//printf("master %d finished its part in %f \n",0, *total_calc_time);
}
void perform_dynamic_assignment(struct Scheduling_params * params, double * total_calc_time, int * executed_at_id, int id, int * images, int image_width,int iterations)
{
int terminated=0;
int master_is_terminated=0;
int my_start_index=0;
int my_end_index=0;
double start_calc_time=0.0;
double end_calc_time=0.0;
*total_calc_time=0.0;
int check_request=0;
int to_execute=0;
int flag=0;
int dummy='\0';
MPI_Status status;
int indices[2];
start_calc_time = MPI_Wtime();
while(terminated < params[0].total_workers-1 || !master_is_terminated)
{
if( !check_request && !master_is_terminated)
{
if(my_start_index < my_end_index) // this condition is never true if CHECK_PERIOD = chunk_size
{
to_execute = my_end_index - my_start_index;
//start_calc_time = MPI_Wtime();
//printf("master part from %d to %d\n", my_start_index, my_start_index+MIN(CHECK_PERIOD,to_execute));
Mandelbrot(my_start_index, my_start_index+MIN(CHECK_PERIOD,to_execute),images,image_width,iterations);
//end_calc_time = MPI_Wtime();
//*total_calc_time += end_calc_time - start_calc_time;
my_start_index += MIN(CHECK_PERIOD,to_execute);
}
else if(params[0].scheduled_tasks < params[0].total_tasks)
{
params[0].my_weight= weights[MASTER];
//printf("id %d weight %lf method %d\n",MASTER, weights[MASTER],params[0].method);
calculate_next_chunk_stateless(params[0], &indices[0],&indices[1]);
params[0].scheduled_tasks+= indices[1] - indices[0];
params[0].step++;
params[0].last_chunk= indices[1] - indices[0];
my_start_index=indices[0];
my_end_index=indices[1];
to_execute = my_end_index - my_start_index;
executed_at_id[0] += to_execute;
//printf("my block %d\n",to_execute);
//printf("from %d to %d\n", my_start_index, my_start_index+MIN(CHECK_PERIOD,to_execute));
Mandelbrot(my_start_index, my_start_index+MIN(CHECK_PERIOD,to_execute),images,image_width,iterations);
my_start_index += MIN(CHECK_PERIOD,to_execute);
//printf("chunk of %d scheduled at master %d \n",indices[1] - indices[0],MASTER);
}
else if(!master_is_terminated)
{
master_is_terminated=1;
// printf("Master terminated itself executed %d \n", executed_at_id[id] );
}
check_request =1;
}
else
{
flag=0;
MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, &status);
while(flag)
{
flag=0;
if(status.MPI_TAG == REQ_WORK)
{
MPI_Recv(&dummy,1,MPI_CHAR,status.MPI_SOURCE,REQ_WORK,MPI_COMM_WORLD,&status);
if(params[0].scheduled_tasks < params[0].total_tasks)
{
if(executed_at_id[status.MPI_SOURCE]!=0 && params[0].method==0)
{
MPI_Send(&dummy,1,MPI_CHAR,status.MPI_SOURCE,TERMINATE,MPI_COMM_WORLD);
terminated++;
}
else
{
params[0].my_weight= weights[status.MPI_SOURCE];
//printf("id %d weight %lf\n",status.MPI_SOURCE, weights[status.MPI_SOURCE]);
calculate_next_chunk_stateless(params[0], &indices[0],&indices[1]);
params[0].scheduled_tasks+= indices[1] - indices[0];
params[0].step++;
params[0].last_chunk = indices[1] - indices[0];
MPI_Send(&indices,2,MPI_INT,status.MPI_SOURCE,ASSI_WORK,MPI_COMM_WORLD);
executed_at_id[status.MPI_SOURCE]+=indices[1] - indices[0];
// printf("chunk of %d scheduled at worker %d \n",indices[1] - indices[0],status.MPI_SOURCE);
}
}
else
{
// printf("Master terminated the following worker %d executed %d \n",status.MPI_SOURCE, executed_at_id[status.MPI_SOURCE]);
MPI_Send(&dummy,1,MPI_CHAR,status.MPI_SOURCE,TERMINATE,MPI_COMM_WORLD);
terminated++;
}
}
MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, &status);
}
check_request = 0;
}
}
end_calc_time = MPI_Wtime();
*total_calc_time += end_calc_time - start_calc_time;
}
void request_static_assignment (int id, int number_of_tasks, double *total_calc_time,int * images,int image_width,int iterations)
{
int flag=0;
int indices[2];
double start_calc_time = 0.0;
double end_calc_time = 0.0;
MPI_Status status;
*total_calc_time=0.0;
char dummy='\0';
//printf("before sending\n");
start_calc_time=MPI_Wtime();
MPI_Send(&dummy,1,MPI_CHAR,MASTER,REQ_WORK,MPI_COMM_WORLD);
//printf("worker %d request a chunk\n",id);
MPI_Recv(&indices,2,MPI_INT,MASTER,ASSI_WORK,MPI_COMM_WORLD,&status);
//printf("worker %d received %d \n", id, indices[1]-indices[0] );
Mandelbrot(indices[0], indices[1],images,image_width,iterations);
end_calc_time=MPI_Wtime();
*total_calc_time += end_calc_time - start_calc_time;
//printf("worker %d finished its part in %f \n", id, *total_calc_time);
}
void request_dynamic_assignment(int number_of_tasks, double * total_request_time, double * total_calc_time, int * images, int image_width, int iterations)
{
int terminated=0;
double start_request_time=0.0;
double end_request_time=0.0;
double start_calc_time=0.0;
double end_calc_time=0.0;
*total_request_time=0.0;
*total_calc_time=0.0;
char dummy='\0';
int flag=0;
MPI_Status status;
int indices[2];
start_calc_time=MPI_Wtime();
while(!terminated)
{
MPI_Send(&dummy,1,MPI_CHAR,MASTER,REQ_WORK,MPI_COMM_WORLD);
flag=0;
while(!flag)
{
MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, &status);
}
if(status.MPI_TAG==ASSI_WORK)
{
MPI_Recv(&indices,2,MPI_INT,MASTER,ASSI_WORK,MPI_COMM_WORLD,&status);
Mandelbrot(indices[0], indices[1],images,image_width,iterations);
}
else
{
terminated=1;
}
}
end_calc_time=MPI_Wtime();
*total_calc_time += end_calc_time - start_calc_time;
}
//------- testing functions ----------
void parallel_single_thread_master_test(int argc, char ** argv)
{
int id, number_of_workers;
MPI_Init(NULL,NULL);
MPI_Comm_size(MPI_COMM_WORLD, &number_of_workers);
MPI_Comm_rank(MPI_COMM_WORLD,&id);
weights=(double *) calloc(number_of_workers,sizeof(double));
//printf("allocated %d\n",id);
if(argc>=5)
{
if(atoi(argv[3])==6)
{
char processor_name[30];
int name_len;
MPI_Get_processor_name(processor_name, &name_len);
int knl_count= atoi(argv[4]);
double my_weight= WF_helper(knl_count, processor_name, number_of_workers);
MPI_Gather(&my_weight,1,MPI_DOUBLE,weights,1,MPI_DOUBLE,MASTER,MPI_COMM_WORLD);
}
}
if(id==MASTER)
single_thread_master(argc, argv, number_of_workers,id);
else
worker(argc,argv,number_of_workers,id);
free(weights);
MPI_Finalize();
}
void Mandelbrot(int start, int end, int * computed_data,int image_width, int iterations)
{
int i ,j;
//#pragma omp parallel for private(i,j) schedule(runtime)
for(int counter=start;counter<end;++counter)
{
i = counter / image_width ;
j = counter % image_width;
double complex c= (x_min + i/(image_width-1.0)*(x_max-x_min)) + (y_min + j/(image_width-1.0)*(y_max-y_min)) * I;
double complex z = 0+(0 * I);
int k=0;
for (k = 0; k < iterations && cabs(z) < 2.0; ++k)
{
z = cpow(z,4) + c;
}
int R,G,B;
if (k==iterations)
{
R=0;
G=0;
B=0;
}
else if (k<16)
{
R=k*8;
G=k*8;
B=128+k*4;
}
else if (k>=16 && k<64)
{
R=128+k-16;
G=128+k-16;
B=192+k-16;
}
else if (k>=64)
{
R=iterations-k;
G=128+(iterations-k)/2;
B=iterations-k;
}
double temp = 0.0;
//printf("at %lf (%d,%d) (%f + i%f) %d,%d,%d\n",i,j,cabs(z),creal(c),cimag(c), R, G, B);
int index= 3*counter;
computed_data[index]= R;
computed_data[index+1]= G;
computed_data[index+2]= B;
}
}

Event Timeline