Skip to content
Snippets Groups Projects
Commit ce6960b4 authored by Stéphane Poirier's avatar Stéphane Poirier
Browse files

[async] implementation now use a pool of threads for better performances

parent 492f11da
No related branches found
No related tags found
No related merge requests found
......@@ -44,13 +44,14 @@
// DEPENDENCIES
// ----------------------------------------------------------------------------
#include <yat/any/GenericContainer.h>
#include <yat/threading/Thread.h>
#include <yat/threading/Task.h>
#include <yat/threading/Condition.h>
#include <yat/threading/Utilities.h>
#include <yat/threading/Atomic.h>
#include <yat/utils/ArgPack.h>
#include <yat/utils/Callback.h>
#include <yat/utils/Logging.h>
#include <yat/utils/Singleton.h>
#include <yat/time/Time.h>
namespace yat {
......@@ -65,6 +66,27 @@ namespace yat {
#define YAT_FUTURE_LOG(s, ...)
#endif
namespace exp // experimental
{
// =============================================================================
// async launch policy (not implemented yet)
// =============================================================================
class YAT_DECL Launch
{
public:
static uint8 async;
static uint8 deffered;
Launch(uint8 v) { m_value = v; }
operator uint8() const { return m_value;}
private:
uint8 m_value;
};
}
// =============================================================================
// The AsyncResult is an internal class, it should not be instantiated in client
// application
......@@ -449,58 +471,66 @@ struct YAT_DECL AsyncParams
std::size_t count() const { return args.count(); }
};
typedef AsyncParams& AsyncFunctionArg;
const char* ASYNC_ERROR = "Can't execute asynchronous function";
// =============================================================================
//! The thread entry point of the function or a method invoked through
//! yat::async
// =============================================================================
typedef AsyncParams& AsyncFunctionArg;
YAT_DEFINE_CALLBACK(AsyncFunction, AsyncFunctionArg);
class __AsyncImpl__: public Thread
// =============================================================================
// async implementation using a pool of threads
// =============================================================================
class __AsyncPool__;
typedef std::pair<AsyncFunction, SharedPtr<AsyncParams>> AsyncRequest;
class __AsyncWorker__:public Task
{
public:
__AsyncImpl__(AsyncFunction cb, SharedPtr<AsyncParams>& args_ptr)
: Thread(args_ptr.get()), m_cb(cb), m_args_ptr(args_ptr)
{
YAT_FUTURE_LOG("__AsyncImpl__::__AsyncImpl__() {}", this);
start();
}
~__AsyncImpl__()
{
YAT_FUTURE_LOG("__AsyncImpl__::~__AsyncImpl__() {}", this);
}
typedef enum { RUN = FIRST_USER_MSG } MSG_TYPE;
__AsyncWorker__(WeakPtr<__AsyncPool__>& pool_wptr);
~__AsyncWorker__();
void run(Thread::IOArg)
{
try
{
m_cb(*(m_args_ptr.get()));
}
catch(const Exception& ye)
{
m_args_ptr->promise.set_exception(ye);
}
catch(const std::exception& e)
{
yat::Exception ye("STD_ERROR", e.what(), "yat::async");
m_args_ptr->promise.set_exception(ye);
}
catch(...)
protected:
void handle_message (yat::Message& msg);
private:
void run(AsyncFunction cb, SharedPtr<AsyncParams> args_ptr);
WeakPtr<__AsyncPool__> m_pool_wptr;
};
// Threads pool
class __AsyncPool__: public Task
{
yat::Exception ye("UNKNOWN_ERROR", "Unknown error occured", "yat::async");
m_args_ptr->promise.set_exception(ye);
}
}
public:
__AsyncPool__();
virtual ~__AsyncPool__();
void async_request(AsyncFunction cb, SharedPtr<AsyncParams> args_ptr);
void set_ptr(SharedPtr<__AsyncPool__>& ptr);
void set_pool_size(std::size_t min, std::size_t max);
protected:
void handle_message (yat::Message& msg);
void exit() {}
private:
uint16 m_max_threads;
uint16 m_min_threads;
std::set<SharedPtr<__AsyncWorker__>> m_active_threads;
std::set<SharedPtr<__AsyncWorker__>> m_idle_threads;
std::vector<AsyncRequest> m_pending_requests;
Mutex m_mtx;
WeakPtr<__AsyncPool__> m_this_wptr;
};
// Thread pool owner because a yat::Task can't be a singleton object
class __AsyncManager__:public Singleton<__AsyncManager__>
{
public:
typedef enum { DONE = FIRST_USER_MSG } MSG_TYPE;
public:
__AsyncManager__();
virtual ~__AsyncManager__();
static void async_request(AsyncFunction cb, SharedPtr<AsyncParams>& args_ptr);
static void set_pool_size(std::size_t min, std::size_t max);
private:
AsyncFunction m_cb;
SharedPtr<AsyncParams> m_args_ptr;
SharedPtr<__AsyncPool__> m_pool_ptr;
};
//! \brief asynchronously execute a function
......@@ -516,7 +546,7 @@ YAT_DECL Future async(Function f)
SharedPtr<AsyncParams> args_ptr = new AsyncParams;
try
{
new __AsyncImpl__(AsyncFunction::instanciate(f), args_ptr);
__AsyncManager__::async_request(AsyncFunction::instanciate(f), args_ptr);
}
catch(Exception& ex)
{
......@@ -541,7 +571,7 @@ YAT_DECL Future async(Function f, ArgPack&& args)
args_ptr->args.swap(args);
try
{
new __AsyncImpl__(AsyncFunction::instanciate(f), args_ptr);
__AsyncManager__::async_request(AsyncFunction::instanciate(f), args_ptr);
}
catch(Exception& ex)
{
......@@ -565,7 +595,7 @@ YAT_DECL Future async(ClassMember m, Object& o)
SharedPtr<AsyncParams> args_ptr = new AsyncParams;
try
{
new __AsyncImpl__(AsyncFunction::instanciate(o, m), args_ptr);
__AsyncManager__::async_request(AsyncFunction::instanciate(o, m), args_ptr);
}
catch(Exception& ex)
{
......@@ -591,7 +621,7 @@ YAT_DECL Future async(ClassMember m, Object& o, ArgPack&& args)
args_ptr->args.swap(args);
try
{
new __AsyncImpl__(AsyncFunction::instanciate(o, m), args_ptr);
__AsyncManager__::async_request(AsyncFunction::instanciate(o, m), args_ptr);
}
catch(Exception& ex)
{
......@@ -601,6 +631,12 @@ YAT_DECL Future async(ClassMember m, Object& o, ArgPack&& args)
return args_ptr->promise.get_future();
}
//! \brief sets the thread pool size
//! initial values are
//! min = 0
//! max = 2 * ThreadingUtilities::hardware_concurrency
YAT_DECL void async_set_pool_size(std::size_t min, std::size_t max);
} // namespace yat
#endif //- _YAT_FUTURE_H_
//----------------------------------------------------------------------------
// Copyright (c) 2004-2021 Synchrotron SOLEIL
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the GNU Lesser Public License v3
// which accompanies this distribution, and is available at
// http://www.gnu.org/licenses/lgpl.html
//----------------------------------------------------------------------------
//----------------------------------------------------------------------------
// YAT LIBRARY
//----------------------------------------------------------------------------
//
// Copyright (C) 2006-2024 The Tango Community
//
// Part of the code comes from the ACE Framework (asm bytes swaping code)
// see http://www.cs.wustl.edu/~schmidt/ACE.html for more about ACE
//
// The thread native implementation has been initially inspired by omniThread
// - the threading support library that comes with omniORB.
// see http://omniorb.sourceforge.net/ for more about omniORB.
// The YAT library is free software; you can redistribute it and/or modify it
// under the terms of the GNU General Public License as published by the Free
// Software Foundation; either version 2 of the License, or (at your option)
// any later version.
//
// The YAT library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
// Public License for more details.
//
// See COPYING file for license details
//
// Contact:
// Stephane Poirier
// Synchrotron SOLEIL
//------------------------------------------------------------------------------
/*!
* \author See AUTHORS file
*/
// ============================================================================
// DEPENDENCIES
// ============================================================================
#include <yat/threading/Future.h>
#include <yat/utils/Logging.h>
namespace yat
{
// ============================================================================
// class __AsyncWorker__
// ============================================================================
//---------------------------------------------------------------------------
__AsyncWorker__::__AsyncWorker__(WeakPtr<__AsyncPool__>& pool_wptr)
{
YAT_FUTURE_LOG("__AsyncWorker__::__AsyncWorker__() {}", this);
m_pool_wptr = pool_wptr;
}
__AsyncWorker__::~__AsyncWorker__()
{
YAT_FUTURE_LOG("__AsyncWorker__::~__AsyncWorker__() {}", this);
}
//---------------------------------------------------------------------------
void __AsyncWorker__::handle_message (yat::Message& msg)
{
if( __AsyncWorker__::RUN == msg.type() )
{
YAT_FUTURE_LOG("__AsyncWorker__::handle_message() RUN msg received");
AsyncRequest req = msg.get_data<AsyncRequest>();
run(req.first, req.second);
}
}
//---------------------------------------------------------------------------
void __AsyncWorker__::run(AsyncFunction cb, SharedPtr<AsyncParams> args_ptr)
{
try
{
cb(*(args_ptr.get()));
}
catch(const Exception& ye)
{
args_ptr->promise.set_exception(ye);
}
catch(const std::exception& e)
{
yat::Exception ye("STD_ERROR", e.what(), "yat::async");
args_ptr->promise.set_exception(ye);
}
catch(...)
{
yat::Exception ye("UNKNOWN_ERROR", "Unknown error occured", "yat::async");
args_ptr->promise.set_exception(ye);
}
SharedPtr<__AsyncPool__> ptr = m_pool_wptr.lock();
if( ptr )
{
Message* msg_p = Message::allocate(__AsyncManager__::DONE, DEFAULT_MSG_PRIORITY, true);
msg_p->attach_data<Task>(this, false);
ptr->wait_msg_handled(msg_p);
}
}
// ============================================================================
// class __AsyncManager__
// ============================================================================
//---------------------------------------------------------------------------
__AsyncPool__::__AsyncPool__()
{
//- a compromise
m_max_threads = 2 * ThreadingUtilities::hardware_concurrency();
m_min_threads = 0;
}
__AsyncPool__::~__AsyncPool__()
{
YAT_FUTURE_LOG("__AsyncPool__::~__AsyncPool__()");
}
void __AsyncPool__::set_ptr(SharedPtr<__AsyncPool__>& pool_ptr)
{
m_this_wptr = pool_ptr;
}
//---------------------------------------------------------------------------
void __AsyncPool__::async_request(AsyncFunction cb, SharedPtr<AsyncParams> args_ptr)
{
YAT_FUTURE_LOG("__AsyncPool__::async_request()");
__AsyncWorker__* worker_p = 0;
{
AutoMutex<> lock(m_mtx);
if( m_idle_threads.size() > 0 )
{
YAT_FUTURE_LOG("__AsyncPool__::async_request() reuse worker");
std::set<SharedPtr<__AsyncWorker__>>::iterator it = m_idle_threads.begin();
worker_p = it->get();
m_active_threads.insert(*it);
m_idle_threads.erase(it);
}
else if( m_active_threads.size() < m_max_threads )
{
YAT_FUTURE_LOG("__AsyncPool__::async_request() new worker");
SharedPtr<__AsyncWorker__> worker_ptr(new __AsyncWorker__(m_this_wptr), TaskExiter());
worker_ptr->go();
m_active_threads.insert(worker_ptr);
worker_p = worker_ptr.get();
}
else
{
YAT_FUTURE_LOG("__AsyncManager__::Pool::async_request() no worker available");
m_pending_requests.push_back(std::make_pair(cb, args_ptr));
}
}
if( worker_p )
{
Message* msg_p = Message::allocate(__AsyncWorker__::RUN);
msg_p->attach_data(std::make_pair(cb, args_ptr));
worker_p->post(msg_p);
}
}
//---------------------------------------------------------------------------
void __AsyncPool__::handle_message (yat::Message& msg)
{
switch( msg.type() )
{
case yat::TASK_INIT:
set_periodic_msg_period(250);
break;
case __AsyncManager__::DONE:
{
YAT_FUTURE_LOG("__AsyncPool__::handle_message() DONE notified.");
Task* task_p = msg.detach_data<Task>();
SharedPtr<__AsyncWorker__> worker_ptr;
AutoMutex<> lock(m_mtx);
std::set<SharedPtr<__AsyncWorker__>>::iterator it = m_active_threads.begin();
for( ; it != m_active_threads.end(); ++it )
{
if( it->get() == task_p )
{
worker_ptr = *it;
break;
}
}
if( m_pending_requests.size() > 0 )
{
//- reuse worker
YAT_FUTURE_LOG("__AsyncPool__::handle_message() reuse worker");
std::vector<AsyncRequest>::iterator it = m_pending_requests.begin();
AsyncRequest req = *it;
Message* msg_p = Message::allocate(__AsyncWorker__::RUN);
msg_p->attach_data(req);
worker_ptr->post(msg_p);
m_pending_requests.erase(it);
}
else if( it != m_active_threads.end() )
{
YAT_FUTURE_LOG("__AsyncManager__::Pool::handle_message() no pending request");
m_idle_threads.insert(*it);
m_active_threads.erase(it);
enable_periodic_msg(true);
}
}
break;
case yat::TASK_PERIODIC:
{
AutoMutex<> lock(m_mtx);
if( m_idle_threads.size() > m_min_threads )
{
YAT_FUTURE_LOG("__AsyncPool__::handle_message() discard worker");
m_idle_threads.erase(m_idle_threads.begin());
}
else
enable_periodic_msg(false);
}
break;
}
}
//---------------------------------------------------------------------------
void __AsyncPool__::set_pool_size(std::size_t min, std::size_t max)
{
AutoMutex<> lock(m_mtx);
m_max_threads = max;
m_min_threads = min;
}
// ============================================================================
// class __AsyncManager__
// ============================================================================
//---------------------------------------------------------------------------
__AsyncManager__::__AsyncManager__()
{
YAT_FUTURE_LOG("__AsyncManager__::__AsyncManager__()");
m_pool_ptr.reset(new __AsyncPool__, TaskExiter());
m_pool_ptr->set_ptr(m_pool_ptr);
m_pool_ptr->go();
}
__AsyncManager__::~__AsyncManager__()
{
YAT_FUTURE_LOG("__AsyncManager__::~__AsyncManager__()");
}
//---------------------------------------------------------------------------
void __AsyncManager__::async_request(AsyncFunction cb, SharedPtr<AsyncParams>& args_ptr)
{
YAT_FUTURE_LOG("__AsyncManager__::async_request()");
instance().m_pool_ptr->async_request(cb, args_ptr);
}
//---------------------------------------------------------------------------
void __AsyncManager__::set_pool_size(std::size_t min, std::size_t max)
{
instance().m_pool_ptr->set_pool_size(min, max);
}
// ============================================================================
// free functions
// ============================================================================
void async_set_pool_size(std::size_t min, std::size_t max)
{
__AsyncManager__::set_pool_size(min, max);
}
} // namespace
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment