Page MenuHomec4science

generate-trace-files.py
No OneTemporary

File Metadata

Created
Tue, Feb 25, 09:52

generate-trace-files.py

#!/usr/bin/env python
import sys
import re
import json
import numpy
import copy
import random
from math import ceil
JOB_START_OFFSET=1000
JOB_STOP=200000
JOB_INTERARRIVAL_TIME=1000
JOB_NR_SCALE_VALUES=[100, 250, 500]
MAX_MEMORY=9000
MIN_MEMORY=100
class TraceDetails:
def __init__(self):
self.name = ''
self.suffix = ''
self.jobs = []
self.nr_total_jobs = 0
class JobDetails:
def __init__(self):
self.nr_jobs = 0
self.nr_tasks = 0
self.task_dur = 0
self.task_mem = 0
self.t_start = 0
self.t_stop = 0
def getTraceMaxTaskMem(trace):
result = 0
for job in trace.jobs:
if job.task_mem > result:
result = job.task_mem
return result
def scaleValueToRange(value, oldMin, oldMax, newMin, newMax):
return (((value - oldMin) * (newMax - newMin)) / ((oldMax - oldMin) * 1.0)) + newMin
def scaleValueWithRatio(value, ratio):
return value * ratio
def scaleTraceTaskMem(trace, type, newMinMem = -1, newMaxMem = -1):
minMem = MIN_MEMORY
maxMem = trace.jobs[0].task_mem
for job in trace.jobs:
if job.task_mem == 0:
job.task_mem = MIN_MEMORY
if job.task_mem < minMem:
minMem = job.task_mem
if job.task_mem > maxMem:
maxMem = job.task_mem
for job in trace.jobs:
if type == "max-ratio":
job.task_mem = scaleValueWithRatio(job.task_mem, newMaxMem / (maxMem * 1.0))
elif type == "min-ratio":
job.task_mem = scaleValueWithRatio(job.task_mem, newMinMem / (minMem * 1.0))
elif type == "range":
job.task_mem = scaleValueToRange(job.task_mem, minMem, maxMem, newMinMem, newMaxMem)
def scaleTraceTaskDur(trace, type, newMinDur = -1, newMaxDur = -1):
minDur = trace.jobs[0].task_dur
maxDur = trace.jobs[0].task_dur
for job in trace.jobs:
if job.task_dur < minDur:
minDur = job.task_dur
if job.task_dur > maxDur:
maxDur = job.task_dur
for job in trace.jobs:
if type == "max-ratio":
job.task_dur = scaleValueWithRatio(job.task_dur, newMaxDur / (maxDur * 1.0))
elif type == "min-ratio":
job.task_dur = scaleValueWithRatio(job.task_dur, newMinDur / (minDur * 1.0))
elif type == "range":
job.task_dur = scaleValueToRange(job.task_dur, minDur, maxDur, newMinDur, newMaxDur)
def generateTraceOutput(trace):
for max_nr_jobs in JOB_NR_SCALE_VALUES:
with open("TRACE-" + trace.name + "-j" + str(max_nr_jobs) + trace.suffix + ".trace", "w") as out:
job_ratio = max_nr_jobs / (trace.nr_total_jobs * 1.0)
arrival_times = generatePoissonArrivalTimes(JOB_INTERARRIVAL_TIME, max_nr_jobs) # Might need to tweak. TIME_BETWEEN_JOB_ARRIVAL = 1s
job_id = 1
for job in trace.jobs:
scaled_job_nr = int(round(job.nr_jobs * job_ratio))
if scaled_job_nr == 0:
scaled_job_nr = 1 # Get at least 1 sample of each type of job !
for job_idx in xrange(0, scaled_job_nr):
job_d = {}
job_d['am.type'] = 'mapreduce'
if job_id < len(arrival_times):
job_d['job.start.ms'] = job.t_start + arrival_times[job_id - 1]
else:
job_d['job.start.ms'] = job.t_start + arrival_times[len(arrival_times) - 1]
job_d['job.end.ms'] = job.t_stop
job_d['job.queue.name'] = 'sls_queue_1'
job_d['job.id'] = 'job_' + str(job_id)
job_d['job.user'] = 'default'
job_d['job.tasks'] = []
task = {}
task['c.host'] = '/default-rack/node2354434'
task['c.dur'] = int(round(job.task_dur))
task['c.prio'] = 20
task['c.mem'] = int(ceil(job.task_mem / 100) * 100) # Make sure memory is in 100MB increments
task['c.type'] = 'map'
task['c.nr'] = job.nr_tasks
job_d['job.tasks'].append(task)
json.dump(job_d, out, sort_keys=True, indent=4)
out.write('\n')
job_id += 1
def sumListElem(list):
result = copy.deepcopy(list)
for i in xrange(1, len(result)):
result[i] = result[i-1] + result[i]
return result
def generatePoissonArrivalTimes(TimeBetweenJobs, NrJobs):
result = sumListElem(numpy.random.poisson(TimeBetweenJobs, NrJobs))
random.shuffle(result)
return result
def computeArrivalTimesForTrace(trace):
return generatePoissonArrivalTimes(JOB_INTERARRIVAL_TIME, trace.nr_total_jobs) # Might need to tweak. TIME_BETWEEN_JOB_ARRIVAL = 1s
data = sys.stdin.readlines()
traces = []
for line in xrange(0, len(data)):
res = re.search('--[a-zA-Z0-9]+--', data[line])
if res != None:
trace = TraceDetails()
trace.name = res.group(0)[2:-2]
trace.suffix = "-no_scale"
idx = line + 1
while idx < len(data) and re.match('[0-9]+[ \t\n\r]*[0-9]+.*', data[idx]) != None:
tokens = data[idx].split()
job = JobDetails()
job.nr_jobs = int(tokens[0])
job.nr_tasks = int(tokens[1])
job.task_dur = float(tokens[3]) * 1000 # Convert seconds to milliseconds
job.task_mem = int(tokens[5])
job.t_start = JOB_START_OFFSET
job.t_stop = JOB_STOP
# print str(job.nr_jobs) + " " + str(job.nr_tasks) + " " + str(job.task_dur) + " " + str(job.task_mem)
trace.jobs.append(job)
trace.nr_total_jobs += job.nr_jobs
idx += 1
traces.append(trace)
for trace in traces:
# Generate unmodified traces (only Poisson distributed arrival times)
generateTraceOutput(trace)
### RANGE SCALED TRACES ###
"""
# Generate traces with mem scaled to [100, MAX_MEMORY] (MB) and time scaled to [10000, 50000] (ms)
scaled_trace = copy.deepcopy(trace)
scaled_trace.suffix = "-range-mem100_" + str(MAX_MEMORY) + "-dur10000_50000"
scaleTraceTaskMem(scaled_trace, 100, MAX_MEMORY, "range")
scaleTraceTaskDur(scaled_trace, 10000, 50000, "range")
generateTraceOutput(scaled_trace)
# Generate traces with mem scaled to [100, MAX_MEMORY] (MB) and time scaled to [25000, 500000] (ms)
scaled_trace = copy.deepcopy(trace)
scaled_trace.suffix = "-range-mem100_" + str(MAX_MEMORY) + "-dur25000_500000"
scaleTraceTaskMem(scaled_trace, 100, MAX_MEMORY, "range")
scaleTraceTaskDur(scaled_trace, 25000, 500000, "range")
generateTraceOutput(scaled_trace)
### MAX-RATIO SCALED TRACES ###
# Generate traces with mem scaled to 10000 (MB) and time scaled to 50000 (ms)
scaled_trace = copy.deepcopy(trace)
scaled_trace.suffix = "-max_ratio-mem" + str(MAX_MEMORY) + "-dur50000"
scaleTraceTaskMem(scaled_trace, 100, MAX_MEMORY, "max-ratio")
scaleTraceTaskDur(scaled_trace, 10000, 50000, "max-ratio")
generateTraceOutput(scaled_trace)
# Generate traces with mem scaled to [100, MAX_MEMORY] (MB) and time scaled to 500000 (ms)
scaled_trace = copy.deepcopy(trace)
scaled_trace.suffix= "-max_ratio-mem" + str(MAX_MEMORY) + "-dur500000"
scaleTraceTaskMem(scaled_trace, 100, MAX_MEMORY, "max-ratio")
scaleTraceTaskDur(scaled_trace, 25000, 500000, "max-ratio")
generateTraceOutput(scaled_trace)
### MIN-RATIO SCALED TRACES ###
# Generate traces with mem scaled to 10000 (MB) and time scaled to 50000 (ms)
scaled_trace = copy.deepcopy(trace)
scaled_trace.suffix = "-min_ratio-mem" + str(MAX_MEMORY) + "-dur50000"
scaleTraceTaskMem(scaled_trace, 100, MAX_MEMORY, "min-ratio")
scaleTraceTaskDur(scaled_trace, 10000, 50000, "min-ratio")
generateTraceOutput(scaled_trace)
# Generate traces with mem scaled to [100, MAX_MEMORY] (MB) and time scaled to 500000 (ms)
scaled_trace = copy.deepcopy(trace)
scaled_trace.suffix = "-min_ratio-mem" + str(MAX_MEMORY) + "-dur500000"
scaleTraceTaskMem(scaled_trace, 100, MAX_MEMORY, "min-ratio")
scaleTraceTaskDur(scaled_trace, 25000, 500000, "min-ratio")
generateTraceOutput(scaled_trace)
"""
# Generate traces with mem scaled if MAX_MEM(trace) > MAX_MEMORY and time scaled with min-ratio to 10000 (ms)
scaled_trace = copy.deepcopy(trace)
scaled_trace.suffix = "-min_ratio-mem" + str(MAX_MEMORY) + "-dur10000"
if getTraceMaxTaskMem(scaled_trace) > MAX_MEMORY:
scaleTraceTaskMem(scaled_trace, "max-ratio", newMaxMem = MAX_MEMORY)
scaleTraceTaskDur(scaled_trace, "min-ratio", newMinDur = 10000)
generateTraceOutput(scaled_trace)
# Generate traces with mem scaled if MAX_MEM(trace) > MAX_MEMORY and time scaled with min-ratio to 25000 (ms)
scaled_trace = copy.deepcopy(trace)
scaled_trace.suffix = "-min_ratio-mem" + str(MAX_MEMORY) + "-dur25000"
if getTraceMaxTaskMem(scaled_trace) > MAX_MEMORY:
scaleTraceTaskMem(scaled_trace, "max-ratio", newMaxMem = MAX_MEMORY)
scaleTraceTaskDur(scaled_trace, "min-ratio", newMinDur = 25000)
generateTraceOutput(scaled_trace)

Event Timeline