Page Menu
Home
c4science
Search
Configure Global Search
Log In
Files
F63216599
Kokkos_Qthreads_Task.cpp
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Subscribers
None
File Metadata
Details
File Info
Storage
Attached
Created
Sat, May 18, 13:23
Size
9 KB
Mime Type
text/x-c
Expires
Mon, May 20, 13:23 (1 d, 23 h)
Engine
blob
Format
Raw Data
Handle
17746081
Attached To
rLAMMPS lammps
Kokkos_Qthreads_Task.cpp
View Options
/*
//@HEADER
// ************************************************************************
//
// Kokkos v. 2.0
// Copyright (2014) Sandia Corporation
//
// Under the terms of Contract DE-AC04-94AL85000 with Sandia Corporation,
// the U.S. Government retains certain rights in this software.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// 1. Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright
// notice, this list of conditions and the following disclaimer in the
// documentation and/or other materials provided with the distribution.
//
// 3. Neither the name of the Corporation nor the names of the
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY SANDIA CORPORATION "AS IS" AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SANDIA CORPORATION OR THE
// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//
// Questions? Contact H. Carter Edwards (hcedwar@sandia.gov)
//
// ************************************************************************
//@HEADER
*/
#include <Kokkos_Macros.hpp>
#if defined( KOKKOS_ENABLE_QTHREADS ) && defined( KOKKOS_ENABLE_TASKPOLICY )
#include <Kokkos_Core.hpp>
#include <impl/Kokkos_TaskQueue_impl.hpp>
//----------------------------------------------------------------------------
//----------------------------------------------------------------------------
namespace
Kokkos
{
namespace
Impl
{
template
class
TaskQueue
<
Kokkos
::
Qthreads
>
;
//----------------------------------------------------------------------------
TaskExec
<
Kokkos
::
Qthreads
>::
TaskExec
()
:
m_self_exec
(
0
),
m_team_exec
(
0
),
m_sync_mask
(
0
),
m_sync_value
(
0
),
m_sync_step
(
0
),
m_group_rank
(
0
),
m_team_rank
(
0
),
m_team_size
(
1
)
{}
TaskExec
<
Kokkos
::
Qthreads
>::
TaskExec
(
Kokkos
::
Impl
::
QthreadsExec
&
arg_exec
,
int
const
arg_team_size
)
:
m_self_exec
(
&
arg_exec
),
m_team_exec
(
arg_exec
.
pool_rev
(
arg_exec
.
pool_rank_rev
()
/
arg_team_size
)
),
m_sync_mask
(
0
),
m_sync_value
(
0
),
m_sync_step
(
0
),
m_group_rank
(
arg_exec
.
pool_rank_rev
()
/
arg_team_size
),
m_team_rank
(
arg_exec
.
pool_rank_rev
()
%
arg_team_size
),
m_team_size
(
arg_team_size
)
{
// This team spans
// m_self_exec->pool_rev( team_size * group_rank )
// m_self_exec->pool_rev( team_size * ( group_rank + 1 ) - 1 )
int64_t
volatile
*
const
sync
=
(
int64_t
*
)
m_self_exec
->
scratch_reduce
();
sync
[
0
]
=
int64_t
(
0
)
;
sync
[
1
]
=
int64_t
(
0
)
;
for
(
int
i
=
0
;
i
<
m_team_size
;
++
i
)
{
m_sync_value
|=
int64_t
(
1
)
<<
(
8
*
i
);
m_sync_mask
|=
int64_t
(
3
)
<<
(
8
*
i
);
}
Kokkos
::
memory_fence
();
}
#if defined( KOKKOS_ACTIVE_EXECUTION_MEMORY_SPACE_HOST )
void
TaskExec
<
Kokkos
::
Qthreads
>::
team_barrier
()
const
{
if
(
1
<
m_team_size
)
{
if
(
m_team_exec
->
scratch_reduce_size
()
<
int
(
2
*
sizeof
(
int64_t
))
)
{
Kokkos
::
abort
(
"TaskQueue<Qthreads> scratch_reduce memory too small"
);
}
// Use team shared memory to synchronize.
// Alternate memory locations between barriers to avoid a sequence
// of barriers overtaking one another.
int64_t
volatile
*
const
sync
=
((
int64_t
*
)
m_team_exec
->
scratch_reduce
())
+
(
m_sync_step
&
0x01
);
// This team member sets one byte within the sync variable
int8_t
volatile
*
const
sync_self
=
((
int8_t
*
)
sync
)
+
m_team_rank
;
#if 0
fprintf( stdout,
"barrier group(%d) member(%d) step(%d) wait(%lx) : before(%lx)\n",
m_group_rank,
m_team_rank,
m_sync_step,
m_sync_value,
*sync
);
fflush(stdout);
#endif
*
sync_self
=
int8_t
(
m_sync_value
&
0x03
);
// signal arrival
while
(
m_sync_value
!=
*
sync
);
// wait for team to arrive
#if 0
fprintf( stdout,
"barrier group(%d) member(%d) step(%d) wait(%lx) : after(%lx)\n",
m_group_rank,
m_team_rank,
m_sync_step,
m_sync_value,
*sync
);
fflush(stdout);
#endif
++
m_sync_step
;
if
(
0
==
(
0x01
&
m_sync_step
)
)
{
// Every other step
m_sync_value
^=
m_sync_mask
;
if
(
1000
<
m_sync_step
)
m_sync_step
=
0
;
}
}
}
#endif
//----------------------------------------------------------------------------
void
TaskQueueSpecialization
<
Kokkos
::
Qthreads
>::
execute
(
TaskQueue
<
Kokkos
::
Qthreads
>
*
const
queue
)
{
using
execution_space
=
Kokkos
::
Qthreads
;
using
queue_type
=
TaskQueue
<
execution_space
>
;
using
task_root_type
=
TaskBase
<
execution_space
,
void
,
void
>
;
using
PoolExec
=
Kokkos
::
Impl
::
QthreadsExec
;
using
Member
=
TaskExec
<
execution_space
>
;
task_root_type
*
const
end
=
(
task_root_type
*
)
task_root_type
::
EndTag
;
// Required: team_size <= 8
const
int
team_size
=
PoolExec
::
pool_size
(
2
);
// Threads per core
// const int team_size = PoolExec::pool_size(1); // Threads per NUMA
if
(
8
<
team_size
)
{
Kokkos
::
abort
(
"TaskQueue<Qthreads> unsupported team size"
);
}
#pragma omp parallel
{
PoolExec
&
self
=
*
PoolExec
::
get_thread_omp
();
Member
single_exec
;
Member
team_exec
(
self
,
team_size
);
// Team shared memory
task_root_type
*
volatile
*
const
task_shared
=
(
task_root_type
**
)
team_exec
.
m_team_exec
->
scratch_thread
();
// Barrier across entire Qthreads thread pool to insure initialization
#pragma omp barrier
// Loop until all queues are empty and no tasks in flight
do
{
// Each team lead attempts to acquire either a thread team task
// or collection of single thread tasks for the team.
if
(
0
==
team_exec
.
team_rank
()
)
{
task_root_type
*
tmp
=
0
<
*
((
volatile
int
*
)
&
queue
->
m_ready_count
)
?
end
:
0
;
// Loop by priority and then type
for
(
int
i
=
0
;
i
<
queue_type
::
NumQueue
&&
end
==
tmp
;
++
i
)
{
for
(
int
j
=
0
;
j
<
2
&&
end
==
tmp
;
++
j
)
{
tmp
=
queue_type
::
pop_task
(
&
queue
->
m_ready
[
i
][
j
]
);
}
}
*
task_shared
=
tmp
;
// Fence to be sure shared_task_array is stored
Kokkos
::
memory_fence
();
}
// Whole team waits for every team member to reach this statement
team_exec
.
team_barrier
();
Kokkos
::
memory_fence
();
task_root_type
*
const
task
=
*
task_shared
;
#if 0
fprintf( stdout,
"\nexecute group(%d) member(%d) task_shared(0x%lx) task(0x%lx)\n",
team_exec.m_group_rank,
team_exec.m_team_rank,
uintptr_t(task_shared),
uintptr_t(task)
);
fflush(stdout);
#endif
if
(
0
==
task
)
break
;
// 0 == m_ready_count
if
(
end
==
task
)
{
team_exec
.
team_barrier
();
}
else
if
(
task_root_type
::
TaskTeam
==
task
->
m_task_type
)
{
// Thread Team Task
(
*
task
->
m_apply
)(
task
,
&
team_exec
);
// The m_apply function performs a barrier
if
(
0
==
team_exec
.
team_rank
()
)
{
// team member #0 completes the task, which may delete the task
queue
->
complete
(
task
);
}
}
else
{
// Single Thread Task
if
(
0
==
team_exec
.
team_rank
()
)
{
(
*
task
->
m_apply
)(
task
,
&
single_exec
);
queue
->
complete
(
task
);
}
// All team members wait for whole team to reach this statement.
// Not necessary to complete the task.
// Is necessary to prevent task_shared from being updated
// before it is read by all threads.
team_exec
.
team_barrier
();
}
}
while
(
1
);
}
// END #pragma omp parallel
}
void
TaskQueueSpecialization
<
Kokkos
::
Qthreads
>::
iff_single_thread_recursive_execute
(
TaskQueue
<
Kokkos
::
Qthreads
>
*
const
queue
)
{
using
execution_space
=
Kokkos
::
Qthreads
;
using
queue_type
=
TaskQueue
<
execution_space
>
;
using
task_root_type
=
TaskBase
<
execution_space
,
void
,
void
>
;
using
Member
=
TaskExec
<
execution_space
>
;
if
(
1
==
omp_get_num_threads
()
)
{
task_root_type
*
const
end
=
(
task_root_type
*
)
task_root_type
::
EndTag
;
Member
single_exec
;
task_root_type
*
task
=
end
;
do
{
task
=
end
;
// Loop by priority and then type
for
(
int
i
=
0
;
i
<
queue_type
::
NumQueue
&&
end
==
task
;
++
i
)
{
for
(
int
j
=
0
;
j
<
2
&&
end
==
task
;
++
j
)
{
task
=
queue_type
::
pop_task
(
&
queue
->
m_ready
[
i
][
j
]
);
}
}
if
(
end
==
task
)
break
;
(
*
task
->
m_apply
)(
task
,
&
single_exec
);
queue
->
complete
(
task
);
}
while
(
1
);
}
}
}}
/* namespace Kokkos::Impl */
//----------------------------------------------------------------------------
#else
void
KOKKOS_SRC_QTHREADS_TASK_PREVENT_LINK_ERROR
()
{}
#endif
/* #if defined( KOKKOS_ENABLE_QTHREADS ) && defined( KOKKOS_ENABLE_TASKPOLICY ) */
Event Timeline
Log In to Comment