Page Menu
Home
c4science
Search
Configure Global Search
Log In
Files
F102908060
generate-trace-files.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Subscribers
None
File Metadata
Details
File Info
Storage
Attached
Created
Tue, Feb 25, 09:52
Size
9 KB
Mime Type
text/x-python
Expires
Thu, Feb 27, 09:52 (2 d)
Engine
blob
Format
Raw Data
Handle
24454099
Attached To
R3704 elastic-yarn
generate-trace-files.py
View Options
#!/usr/bin/env python
import
sys
import
re
import
json
import
numpy
import
copy
import
random
from
math
import
ceil
JOB_START_OFFSET
=
1000
JOB_STOP
=
200000
JOB_INTERARRIVAL_TIME
=
1000
JOB_NR_SCALE_VALUES
=
[
100
,
250
,
500
]
MAX_MEMORY
=
9000
MIN_MEMORY
=
100
class
TraceDetails
:
def
__init__
(
self
):
self
.
name
=
''
self
.
suffix
=
''
self
.
jobs
=
[]
self
.
nr_total_jobs
=
0
class
JobDetails
:
def
__init__
(
self
):
self
.
nr_jobs
=
0
self
.
nr_tasks
=
0
self
.
task_dur
=
0
self
.
task_mem
=
0
self
.
t_start
=
0
self
.
t_stop
=
0
def
getTraceMaxTaskMem
(
trace
):
result
=
0
for
job
in
trace
.
jobs
:
if
job
.
task_mem
>
result
:
result
=
job
.
task_mem
return
result
def
scaleValueToRange
(
value
,
oldMin
,
oldMax
,
newMin
,
newMax
):
return
(((
value
-
oldMin
)
*
(
newMax
-
newMin
))
/
((
oldMax
-
oldMin
)
*
1.0
))
+
newMin
def
scaleValueWithRatio
(
value
,
ratio
):
return
value
*
ratio
def
scaleTraceTaskMem
(
trace
,
type
,
newMinMem
=
-
1
,
newMaxMem
=
-
1
):
minMem
=
MIN_MEMORY
maxMem
=
trace
.
jobs
[
0
]
.
task_mem
for
job
in
trace
.
jobs
:
if
job
.
task_mem
==
0
:
job
.
task_mem
=
MIN_MEMORY
if
job
.
task_mem
<
minMem
:
minMem
=
job
.
task_mem
if
job
.
task_mem
>
maxMem
:
maxMem
=
job
.
task_mem
for
job
in
trace
.
jobs
:
if
type
==
"max-ratio"
:
job
.
task_mem
=
scaleValueWithRatio
(
job
.
task_mem
,
newMaxMem
/
(
maxMem
*
1.0
))
elif
type
==
"min-ratio"
:
job
.
task_mem
=
scaleValueWithRatio
(
job
.
task_mem
,
newMinMem
/
(
minMem
*
1.0
))
elif
type
==
"range"
:
job
.
task_mem
=
scaleValueToRange
(
job
.
task_mem
,
minMem
,
maxMem
,
newMinMem
,
newMaxMem
)
def
scaleTraceTaskDur
(
trace
,
type
,
newMinDur
=
-
1
,
newMaxDur
=
-
1
):
minDur
=
trace
.
jobs
[
0
]
.
task_dur
maxDur
=
trace
.
jobs
[
0
]
.
task_dur
for
job
in
trace
.
jobs
:
if
job
.
task_dur
<
minDur
:
minDur
=
job
.
task_dur
if
job
.
task_dur
>
maxDur
:
maxDur
=
job
.
task_dur
for
job
in
trace
.
jobs
:
if
type
==
"max-ratio"
:
job
.
task_dur
=
scaleValueWithRatio
(
job
.
task_dur
,
newMaxDur
/
(
maxDur
*
1.0
))
elif
type
==
"min-ratio"
:
job
.
task_dur
=
scaleValueWithRatio
(
job
.
task_dur
,
newMinDur
/
(
minDur
*
1.0
))
elif
type
==
"range"
:
job
.
task_dur
=
scaleValueToRange
(
job
.
task_dur
,
minDur
,
maxDur
,
newMinDur
,
newMaxDur
)
def
generateTraceOutput
(
trace
):
for
max_nr_jobs
in
JOB_NR_SCALE_VALUES
:
with
open
(
"TRACE-"
+
trace
.
name
+
"-j"
+
str
(
max_nr_jobs
)
+
trace
.
suffix
+
".trace"
,
"w"
)
as
out
:
job_ratio
=
max_nr_jobs
/
(
trace
.
nr_total_jobs
*
1.0
)
arrival_times
=
generatePoissonArrivalTimes
(
JOB_INTERARRIVAL_TIME
,
max_nr_jobs
)
# Might need to tweak. TIME_BETWEEN_JOB_ARRIVAL = 1s
job_id
=
1
for
job
in
trace
.
jobs
:
scaled_job_nr
=
int
(
round
(
job
.
nr_jobs
*
job_ratio
))
if
scaled_job_nr
==
0
:
scaled_job_nr
=
1
# Get at least 1 sample of each type of job !
for
job_idx
in
xrange
(
0
,
scaled_job_nr
):
job_d
=
{}
job_d
[
'am.type'
]
=
'mapreduce'
if
job_id
<
len
(
arrival_times
):
job_d
[
'job.start.ms'
]
=
job
.
t_start
+
arrival_times
[
job_id
-
1
]
else
:
job_d
[
'job.start.ms'
]
=
job
.
t_start
+
arrival_times
[
len
(
arrival_times
)
-
1
]
job_d
[
'job.end.ms'
]
=
job
.
t_stop
job_d
[
'job.queue.name'
]
=
'sls_queue_1'
job_d
[
'job.id'
]
=
'job_'
+
str
(
job_id
)
job_d
[
'job.user'
]
=
'default'
job_d
[
'job.tasks'
]
=
[]
task
=
{}
task
[
'c.host'
]
=
'/default-rack/node2354434'
task
[
'c.dur'
]
=
int
(
round
(
job
.
task_dur
))
task
[
'c.prio'
]
=
20
task
[
'c.mem'
]
=
int
(
ceil
(
job
.
task_mem
/
100
)
*
100
)
# Make sure memory is in 100MB increments
task
[
'c.type'
]
=
'map'
task
[
'c.nr'
]
=
job
.
nr_tasks
job_d
[
'job.tasks'
]
.
append
(
task
)
json
.
dump
(
job_d
,
out
,
sort_keys
=
True
,
indent
=
4
)
out
.
write
(
'
\n
'
)
job_id
+=
1
def
sumListElem
(
list
):
result
=
copy
.
deepcopy
(
list
)
for
i
in
xrange
(
1
,
len
(
result
)):
result
[
i
]
=
result
[
i
-
1
]
+
result
[
i
]
return
result
def
generatePoissonArrivalTimes
(
TimeBetweenJobs
,
NrJobs
):
result
=
sumListElem
(
numpy
.
random
.
poisson
(
TimeBetweenJobs
,
NrJobs
))
random
.
shuffle
(
result
)
return
result
def
computeArrivalTimesForTrace
(
trace
):
return
generatePoissonArrivalTimes
(
JOB_INTERARRIVAL_TIME
,
trace
.
nr_total_jobs
)
# Might need to tweak. TIME_BETWEEN_JOB_ARRIVAL = 1s
data
=
sys
.
stdin
.
readlines
()
traces
=
[]
for
line
in
xrange
(
0
,
len
(
data
)):
res
=
re
.
search
(
'--[a-zA-Z0-9]+--'
,
data
[
line
])
if
res
!=
None
:
trace
=
TraceDetails
()
trace
.
name
=
res
.
group
(
0
)[
2
:
-
2
]
trace
.
suffix
=
"-no_scale"
idx
=
line
+
1
while
idx
<
len
(
data
)
and
re
.
match
(
'[0-9]+[
\t\n\r
]*[0-9]+.*'
,
data
[
idx
])
!=
None
:
tokens
=
data
[
idx
]
.
split
()
job
=
JobDetails
()
job
.
nr_jobs
=
int
(
tokens
[
0
])
job
.
nr_tasks
=
int
(
tokens
[
1
])
job
.
task_dur
=
float
(
tokens
[
3
])
*
1000
# Convert seconds to milliseconds
job
.
task_mem
=
int
(
tokens
[
5
])
job
.
t_start
=
JOB_START_OFFSET
job
.
t_stop
=
JOB_STOP
# print str(job.nr_jobs) + " " + str(job.nr_tasks) + " " + str(job.task_dur) + " " + str(job.task_mem)
trace
.
jobs
.
append
(
job
)
trace
.
nr_total_jobs
+=
job
.
nr_jobs
idx
+=
1
traces
.
append
(
trace
)
for
trace
in
traces
:
# Generate unmodified traces (only Poisson distributed arrival times)
generateTraceOutput
(
trace
)
### RANGE SCALED TRACES ###
"""
# Generate traces with mem scaled to [100, MAX_MEMORY] (MB) and time scaled to [10000, 50000] (ms)
scaled_trace = copy.deepcopy(trace)
scaled_trace.suffix = "-range-mem100_" + str(MAX_MEMORY) + "-dur10000_50000"
scaleTraceTaskMem(scaled_trace, 100, MAX_MEMORY, "range")
scaleTraceTaskDur(scaled_trace, 10000, 50000, "range")
generateTraceOutput(scaled_trace)
# Generate traces with mem scaled to [100, MAX_MEMORY] (MB) and time scaled to [25000, 500000] (ms)
scaled_trace = copy.deepcopy(trace)
scaled_trace.suffix = "-range-mem100_" + str(MAX_MEMORY) + "-dur25000_500000"
scaleTraceTaskMem(scaled_trace, 100, MAX_MEMORY, "range")
scaleTraceTaskDur(scaled_trace, 25000, 500000, "range")
generateTraceOutput(scaled_trace)
### MAX-RATIO SCALED TRACES ###
# Generate traces with mem scaled to 10000 (MB) and time scaled to 50000 (ms)
scaled_trace = copy.deepcopy(trace)
scaled_trace.suffix = "-max_ratio-mem" + str(MAX_MEMORY) + "-dur50000"
scaleTraceTaskMem(scaled_trace, 100, MAX_MEMORY, "max-ratio")
scaleTraceTaskDur(scaled_trace, 10000, 50000, "max-ratio")
generateTraceOutput(scaled_trace)
# Generate traces with mem scaled to [100, MAX_MEMORY] (MB) and time scaled to 500000 (ms)
scaled_trace = copy.deepcopy(trace)
scaled_trace.suffix= "-max_ratio-mem" + str(MAX_MEMORY) + "-dur500000"
scaleTraceTaskMem(scaled_trace, 100, MAX_MEMORY, "max-ratio")
scaleTraceTaskDur(scaled_trace, 25000, 500000, "max-ratio")
generateTraceOutput(scaled_trace)
### MIN-RATIO SCALED TRACES ###
# Generate traces with mem scaled to 10000 (MB) and time scaled to 50000 (ms)
scaled_trace = copy.deepcopy(trace)
scaled_trace.suffix = "-min_ratio-mem" + str(MAX_MEMORY) + "-dur50000"
scaleTraceTaskMem(scaled_trace, 100, MAX_MEMORY, "min-ratio")
scaleTraceTaskDur(scaled_trace, 10000, 50000, "min-ratio")
generateTraceOutput(scaled_trace)
# Generate traces with mem scaled to [100, MAX_MEMORY] (MB) and time scaled to 500000 (ms)
scaled_trace = copy.deepcopy(trace)
scaled_trace.suffix = "-min_ratio-mem" + str(MAX_MEMORY) + "-dur500000"
scaleTraceTaskMem(scaled_trace, 100, MAX_MEMORY, "min-ratio")
scaleTraceTaskDur(scaled_trace, 25000, 500000, "min-ratio")
generateTraceOutput(scaled_trace)
"""
# Generate traces with mem scaled if MAX_MEM(trace) > MAX_MEMORY and time scaled with min-ratio to 10000 (ms)
scaled_trace
=
copy
.
deepcopy
(
trace
)
scaled_trace
.
suffix
=
"-min_ratio-mem"
+
str
(
MAX_MEMORY
)
+
"-dur10000"
if
getTraceMaxTaskMem
(
scaled_trace
)
>
MAX_MEMORY
:
scaleTraceTaskMem
(
scaled_trace
,
"max-ratio"
,
newMaxMem
=
MAX_MEMORY
)
scaleTraceTaskDur
(
scaled_trace
,
"min-ratio"
,
newMinDur
=
10000
)
generateTraceOutput
(
scaled_trace
)
# Generate traces with mem scaled if MAX_MEM(trace) > MAX_MEMORY and time scaled with min-ratio to 25000 (ms)
scaled_trace
=
copy
.
deepcopy
(
trace
)
scaled_trace
.
suffix
=
"-min_ratio-mem"
+
str
(
MAX_MEMORY
)
+
"-dur25000"
if
getTraceMaxTaskMem
(
scaled_trace
)
>
MAX_MEMORY
:
scaleTraceTaskMem
(
scaled_trace
,
"max-ratio"
,
newMaxMem
=
MAX_MEMORY
)
scaleTraceTaskDur
(
scaled_trace
,
"min-ratio"
,
newMinDur
=
25000
)
generateTraceOutput
(
scaled_trace
)
Event Timeline
Log In to Comment