Page Menu
Home
c4science
Search
Configure Global Search
Log In
Files
F71846917
Kokkos_TaskQueue.hpp
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Subscribers
None
File Metadata
Details
File Info
Storage
Attached
Created
Sat, Jul 13, 10:03
Size
17 KB
Mime Type
text/x-c++
Expires
Mon, Jul 15, 10:03 (2 d)
Engine
blob
Format
Raw Data
Handle
19001983
Attached To
rLAMMPS lammps
Kokkos_TaskQueue.hpp
View Options
/*
//@HEADER
// ************************************************************************
//
// Kokkos v. 2.0
// Copyright (2014) Sandia Corporation
//
// Under the terms of Contract DE-AC04-94AL85000 with Sandia Corporation,
// the U.S. Government retains certain rights in this software.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// 1. Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright
// notice, this list of conditions and the following disclaimer in the
// documentation and/or other materials provided with the distribution.
//
// 3. Neither the name of the Corporation nor the names of the
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY SANDIA CORPORATION "AS IS" AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SANDIA CORPORATION OR THE
// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//
// Questions? Contact H. Carter Edwards (hcedwar@sandia.gov)
//
// ************************************************************************
//@HEADER
*/
// Experimental unified task-data parallel manycore LDRD
#ifndef KOKKOS_IMPL_TASKQUEUE_HPP
#define KOKKOS_IMPL_TASKQUEUE_HPP
#include <Kokkos_Macros.hpp>
#if defined( KOKKOS_ENABLE_TASKDAG )
#include <string>
#include <typeinfo>
#include <stdexcept>
//----------------------------------------------------------------------------
//----------------------------------------------------------------------------
namespace Kokkos {
namespace Impl {
template< class Space , typename ResultType , class FunctorType >
class TaskBase ;
template< typename Space >
class TaskQueue ;
template< typename Space >
class TaskQueueSpecialization ;
} /* namespace Impl */
} /* namespace Kokkos */
//----------------------------------------------------------------------------
//----------------------------------------------------------------------------
namespace Kokkos {
namespace Impl {
/** \brief Base class for task management, access, and execution.
*
* Inheritance structure to allow static_cast from the task root type
* and a task's FunctorType.
*
* // Enable a functor to access the base class
* // and provide memory for result value.
* TaskBase< Space , ResultType , FunctorType >
* : TaskBase< void , void , void >
* , FunctorType
* { ... };
* Followed by memory allocated for result value.
*
*
* States of a task:
*
* Constructing State, NOT IN a linked list
* m_wait == 0
* m_next == 0
*
* Scheduling transition : Constructing -> Waiting
* before:
* m_wait == 0
* m_next == this task's initial dependence, 0 if none
* after:
* m_wait == EndTag
* m_next == EndTag
*
* Waiting State, IN a linked list
* m_apply != 0
* m_queue != 0
* m_ref_count > 0
* m_wait == head of linked list of tasks waiting on this task
* m_next == next of linked list of tasks
*
* transition : Waiting -> Executing
* before:
* m_next == EndTag
* after::
* m_next == LockTag
*
* Executing State, NOT IN a linked list
* m_apply != 0
* m_queue != 0
* m_ref_count > 0
* m_wait == head of linked list of tasks waiting on this task
* m_next == LockTag
*
* Respawn transition : Executing -> Executing-Respawn
* before:
* m_next == LockTag
* after:
* m_next == this task's updated dependence, 0 if none
*
* Executing-Respawn State, NOT IN a linked list
* m_apply != 0
* m_queue != 0
* m_ref_count > 0
* m_wait == head of linked list of tasks waiting on this task
* m_next == this task's updated dependence, 0 if none
*
* transition : Executing -> Complete
* before:
* m_wait == head of linked list
* after:
* m_wait == LockTag
*
* Complete State, NOT IN a linked list
* m_wait == LockTag: cannot add dependence (<=> complete)
* m_next == LockTag: not a member of a wait queue
*
*/
template<>
class TaskBase< void , void , void >
{
public:
enum : int16_t { TaskTeam = 0 , TaskSingle = 1 , Aggregate = 2 };
enum : uintptr_t { LockTag = ~uintptr_t(0) , EndTag = ~uintptr_t(1) };
template< typename > friend class Kokkos::TaskScheduler ;
typedef TaskQueue< void > queue_type ;
typedef void (* function_type) ( TaskBase * , void * );
// sizeof(TaskBase) == 48
function_type m_apply ; ///< Apply function pointer
queue_type * m_queue ; ///< Pointer to queue
TaskBase * m_wait ; ///< Linked list of tasks waiting on this
TaskBase * m_next ; ///< Waiting linked-list next
int32_t m_ref_count ; ///< Reference count
int32_t m_alloc_size ; ///< Allocation size
int32_t m_dep_count ; ///< Aggregate's number of dependences
int16_t m_task_type ; ///< Type of task
int16_t m_priority ; ///< Priority of runnable task
TaskBase( TaskBase && ) = delete ;
TaskBase( const TaskBase & ) = delete ;
TaskBase & operator = ( TaskBase && ) = delete ;
TaskBase & operator = ( const TaskBase & ) = delete ;
KOKKOS_INLINE_FUNCTION ~TaskBase() = default ;
KOKKOS_INLINE_FUNCTION constexpr
TaskBase()
: m_apply( 0 )
, m_queue( 0 )
, m_wait( 0 )
, m_next( 0 )
, m_ref_count( 0 )
, m_alloc_size( 0 )
, m_dep_count( 0 )
, m_task_type( 0 )
, m_priority( 0 )
{}
//----------------------------------------
KOKKOS_INLINE_FUNCTION
TaskBase * volatile * aggregate_dependences() volatile
{ return reinterpret_cast<TaskBase*volatile*>( this + 1 ); }
KOKKOS_INLINE_FUNCTION
bool requested_respawn()
{
// This should only be called when a task has finished executing and is
// in the transition to either the complete or executing-respawn state.
TaskBase * const lock = reinterpret_cast< TaskBase * >( LockTag );
return lock != m_next;
}
KOKKOS_INLINE_FUNCTION
void add_dependence( TaskBase* dep )
{
// Precondition: lock == m_next
TaskBase * const lock = (TaskBase *) LockTag ;
// Assign dependence to m_next. It will be processed in the subsequent
// call to schedule. Error if the dependence is reset.
if ( lock != Kokkos::atomic_exchange( & m_next, dep ) ) {
Kokkos::abort("TaskScheduler ERROR: resetting task dependence");
}
if ( 0 != dep ) {
// The future may be destroyed upon returning from this call
// so increment reference count to track this assignment.
Kokkos::atomic_increment( &(dep->m_ref_count) );
}
}
//----------------------------------------
KOKKOS_INLINE_FUNCTION
int32_t reference_count() const
{ return *((int32_t volatile *)( & m_ref_count )); }
};
static_assert( sizeof(TaskBase<void,void,void>) == 48
, "Verifying expected sizeof(TaskBase<void,void,void>)" );
//----------------------------------------------------------------------------
//----------------------------------------------------------------------------
template< typename ResultType >
struct TaskResult {
enum : int32_t { size = sizeof(ResultType) };
using reference_type = ResultType & ;
KOKKOS_INLINE_FUNCTION static
ResultType * ptr( TaskBase<void,void,void> * task )
{
return reinterpret_cast< ResultType * >
( reinterpret_cast< char * >(task) + task->m_alloc_size - sizeof(ResultType) );
}
KOKKOS_INLINE_FUNCTION static
reference_type get( TaskBase<void,void,void> * task )
{ return *ptr( task ); }
};
template<>
struct TaskResult< void > {
enum : int32_t { size = 0 };
using reference_type = void ;
KOKKOS_INLINE_FUNCTION static
void * ptr( TaskBase<void,void,void> * ) { return (void*) 0 ; }
KOKKOS_INLINE_FUNCTION static
reference_type get( TaskBase<void,void,void> * ) {}
};
} /* namespace Impl */
} /* namespace Kokkos */
//----------------------------------------------------------------------------
//----------------------------------------------------------------------------
namespace Kokkos {
namespace Impl {
template<>
class TaskQueue< void > {};
/** \brief Manage task allocation, deallocation, and scheduling.
*
* Task execution is deferred to the TaskQueueSpecialization.
* All other aspects of task management have shared implementation.
*/
template< typename ExecSpace >
class TaskQueue : public TaskQueue<void> {
private:
friend class TaskQueueSpecialization< ExecSpace > ;
friend class Kokkos::TaskScheduler< ExecSpace > ;
using execution_space = ExecSpace ;
using specialization = TaskQueueSpecialization< execution_space > ;
using memory_space = typename specialization::memory_space ;
using device_type = Kokkos::Device< execution_space , memory_space > ;
using memory_pool = Kokkos::MemoryPool< device_type > ;
using task_root_type = Kokkos::Impl::TaskBase<void,void,void> ;
struct Destroy {
TaskQueue * m_queue ;
void destroy_shared_allocation();
};
//----------------------------------------
enum : int { NumQueue = 3 };
// Queue is organized as [ priority ][ type ]
memory_pool m_memory ;
task_root_type * volatile m_ready[ NumQueue ][ 2 ];
long m_accum_alloc ; // Accumulated number of allocations
int m_count_alloc ; // Current number of allocations
int m_max_alloc ; // Maximum number of allocations
int m_ready_count ; // Number of ready or executing
//----------------------------------------
~TaskQueue();
TaskQueue() = delete ;
TaskQueue( TaskQueue && ) = delete ;
TaskQueue( TaskQueue const & ) = delete ;
TaskQueue & operator = ( TaskQueue && ) = delete ;
TaskQueue & operator = ( TaskQueue const & ) = delete ;
TaskQueue( const memory_pool & arg_memory_pool );
// Schedule a task
// Precondition:
// task is not executing
// task->m_next is the dependence or zero
// Postcondition:
// task->m_next is linked list membership
KOKKOS_FUNCTION void schedule_runnable( task_root_type * const );
KOKKOS_FUNCTION void schedule_aggregate( task_root_type * const );
// Reschedule a task
// Precondition:
// task is in Executing state
// task->m_next == LockTag
// Postcondition:
// task is in Executing-Respawn state
// task->m_next == 0 (no dependence)
KOKKOS_FUNCTION
void reschedule( task_root_type * );
// Complete a task
// Precondition:
// task is not executing
// task->m_next == LockTag => task is complete
// task->m_next != LockTag => task is respawn
// Postcondition:
// task->m_wait == LockTag => task is complete
// task->m_wait != LockTag => task is waiting
KOKKOS_FUNCTION
void complete( task_root_type * );
KOKKOS_FUNCTION
static bool push_task( task_root_type * volatile * const
, task_root_type * const );
KOKKOS_FUNCTION
static task_root_type * pop_ready_task( task_root_type * volatile * const );
KOKKOS_FUNCTION static
void decrement( task_root_type * task );
public:
// If and only if the execution space is a single thread
// then execute ready tasks.
KOKKOS_INLINE_FUNCTION
void iff_single_thread_recursive_execute()
{
#if defined( KOKKOS_ACTIVE_EXECUTION_MEMORY_SPACE_HOST )
specialization::iff_single_thread_recursive_execute( this );
#endif
}
void execute() { specialization::execute( this ); }
template< typename FunctorType >
void proc_set_apply( typename task_root_type::function_type * ptr )
{
specialization::template proc_set_apply< FunctorType >( ptr );
}
// Assign task pointer with reference counting of assigned tasks
KOKKOS_FUNCTION static
void assign( task_root_type ** const lhs
, task_root_type * const rhs )
{
#if 0
{
printf( "assign( 0x%lx { 0x%lx %d %d } , 0x%lx { 0x%lx %d %d } )\n"
, uintptr_t( lhs ? *lhs : 0 )
, uintptr_t( lhs && *lhs ? (*lhs)->m_next : 0 )
, int( lhs && *lhs ? (*lhs)->m_task_type : 0 )
, int( lhs && *lhs ? (*lhs)->m_ref_count : 0 )
, uintptr_t(rhs)
, uintptr_t( rhs ? rhs->m_next : 0 )
, int( rhs ? rhs->m_task_type : 0 )
, int( rhs ? rhs->m_ref_count : 0 )
);
fflush( stdout );
}
#endif
if ( *lhs ) decrement( *lhs );
if ( rhs ) { Kokkos::atomic_increment( &(rhs->m_ref_count) ); }
// Force write of *lhs
*static_cast< task_root_type * volatile * >(lhs) = rhs ;
Kokkos::memory_fence();
}
KOKKOS_FUNCTION
size_t allocate_block_size( size_t n ); ///< Actual block size allocated
KOKKOS_FUNCTION
void * allocate( size_t n ); ///< Allocate from the memory pool
KOKKOS_FUNCTION
void deallocate( void * p , size_t n ); ///< Deallocate to the memory pool
//----------------------------------------
/**\brief Allocation size for a spawned task */
template< typename FunctorType >
KOKKOS_FUNCTION
size_t spawn_allocation_size() const
{
using value_type = typename FunctorType::value_type ;
using task_type = Impl::TaskBase< execution_space
, value_type
, FunctorType > ;
enum : size_t { align = ( 1 << 4 ) , align_mask = align - 1 };
enum : size_t { task_size = sizeof(task_type) };
enum : size_t { result_size = Impl::TaskResult< value_type >::size };
enum : size_t { alloc_size =
( ( task_size + align_mask ) & ~align_mask ) +
( ( result_size + align_mask ) & ~align_mask ) };
return m_memory.allocate_block_size( task_size );
}
/**\brief Allocation size for a when_all aggregate */
KOKKOS_FUNCTION
size_t when_all_allocation_size( int narg ) const
{
return m_memory.allocate_block_size( sizeof(task_root_type) + narg * sizeof(task_root_type*) );
}
};
} /* namespace Impl */
} /* namespace Kokkos */
//----------------------------------------------------------------------------
//----------------------------------------------------------------------------
namespace Kokkos {
namespace Impl {
template< class ExecSpace , typename ResultType , class FunctorType >
class TaskBase
: public TaskBase< void , void , void >
, public FunctorType
{
private:
TaskBase() = delete ;
TaskBase( TaskBase && ) = delete ;
TaskBase( const TaskBase & ) = delete ;
TaskBase & operator = ( TaskBase && ) = delete ;
TaskBase & operator = ( const TaskBase & ) = delete ;
public:
using root_type = TaskBase< void , void , void > ;
using functor_type = FunctorType ;
using result_type = ResultType ;
using specialization = TaskQueueSpecialization< ExecSpace > ;
using member_type = typename specialization::member_type ;
KOKKOS_INLINE_FUNCTION
void apply_functor( member_type * const member , void * )
{ functor_type::operator()( *member ); }
template< typename T >
KOKKOS_INLINE_FUNCTION
void apply_functor( member_type * const member
, T * const result )
{ functor_type::operator()( *member , *result ); }
KOKKOS_FUNCTION static
void apply( root_type * root , void * exec )
{
TaskBase * const task = static_cast< TaskBase * >( root );
member_type * const member = reinterpret_cast< member_type * >( exec );
result_type * const result = TaskResult< result_type >::ptr( task );
task->apply_functor( member , result );
// Task may be serial or team.
// If team then must synchronize before querying if respawn was requested.
// If team then only one thread calls destructor.
member->team_barrier();
if ( 0 == member->team_rank() && !(task->requested_respawn()) ) {
// Did not respawn, destroy the functor to free memory.
static_cast<functor_type*>(task)->~functor_type();
// Cannot destroy and deallocate the task until its dependences
// have been processed.
}
}
// Constructor for runnable task
KOKKOS_INLINE_FUNCTION constexpr
TaskBase( FunctorType && arg_functor )
: root_type() , functor_type( std::move(arg_functor) ) {}
KOKKOS_INLINE_FUNCTION
~TaskBase() {}
};
} /* namespace Impl */
} /* namespace Kokkos */
//----------------------------------------------------------------------------
//----------------------------------------------------------------------------
#endif /* #if defined( KOKKOS_ENABLE_TASKDAG ) */
#endif /* #ifndef KOKKOS_IMPL_TASKQUEUE_HPP */
Event Timeline
Log In to Comment