Changeset 1468:bba71c9ccf97


Ignore:
Timestamp:
08/22/10 08:44:29 (18 months ago)
Author:
niam
Branch:
default
Message:

{issue #101[resolved]} workqueue/thread pool implementation

Location:
sources
Files:
2 added
5 edited
2 copied

Legend:

Unmodified
Added
Removed
  • sources/configure

    r1464 r1468  
    11#! /bin/sh 
    22# Guess values for system-dependent variables and create Makefiles. 
    3 # Generated by GNU Autoconf 2.66 for libdodo 0.0.0. 
     3# Generated by GNU Autoconf 2.67 for libdodo 0.0.0. 
    44# 
    55# Report bugs to <issues.libdodo.org>. 
     
    765765 
    766766  case $ac_option in 
    767   *=*)  ac_optarg=`expr "X$ac_option" : '[^=]*=\(.*\)'` ;; 
    768   *)    ac_optarg=yes ;; 
     767  *=?*) ac_optarg=`expr "X$ac_option" : '[^=]*=\(.*\)'` ;; 
     768  *=)   ac_optarg= ;; 
     769  *)    ac_optarg=yes ;; 
    769770  esac 
    770771 
     
    14351436  cat <<\_ACEOF 
    14361437libdodo configure 0.0.0 
    1437 generated by GNU Autoconf 2.66 
     1438generated by GNU Autoconf 2.67 
    14381439 
    14391440Copyright (C) 2010 Free Software Foundation, Inc. 
     
    15611562  fi 
    15621563  $as_echo "$as_me:${as_lineno-$LINENO}: \$? = $ac_status" >&5 
    1563   test $ac_status = 0; } >/dev/null && { 
     1564  test $ac_status = 0; } > conftest.i && { 
    15641565         test -z "$ac_c_preproc_warn_flag$ac_c_werror_flag" || 
    15651566         test ! -s conftest.err 
     
    17811782  ac_header_preproc=no 
    17821783fi 
    1783 rm -f conftest.err conftest.$ac_ext 
     1784rm -f conftest.err conftest.i conftest.$ac_ext 
    17841785{ $as_echo "$as_me:${as_lineno-$LINENO}: result: $ac_header_preproc" >&5 
    17851786$as_echo "$ac_header_preproc" >&6; } 
     
    18291830 
    18301831It was created by libdodo $as_me 0.0.0, which was 
    1831 generated by GNU Autoconf 2.66.  Invocation command line was 
     1832generated by GNU Autoconf 2.67.  Invocation command line was 
    18321833 
    18331834  $ $0 $@ 
     
    31573158continue 
    31583159fi 
    3159 rm -f conftest.err conftest.$ac_ext 
     3160rm -f conftest.err conftest.i conftest.$ac_ext 
    31603161 
    31613162  # OK, works on sane cases.  Now check whether nonexistent headers 
     
    31733174break 
    31743175fi 
    3175 rm -f conftest.err conftest.$ac_ext 
     3176rm -f conftest.err conftest.i conftest.$ac_ext 
    31763177 
    31773178done 
    31783179# Because of `break', _AC_PREPROC_IFELSE's cleaning code was skipped. 
    3179 rm -f conftest.err conftest.$ac_ext 
     3180rm -f conftest.i conftest.err conftest.$ac_ext 
    31803181if $ac_preproc_ok; then : 
    31813182  break 
     
    32163217continue 
    32173218fi 
    3218 rm -f conftest.err conftest.$ac_ext 
     3219rm -f conftest.err conftest.i conftest.$ac_ext 
    32193220 
    32203221  # OK, works on sane cases.  Now check whether nonexistent headers 
     
    32323233break 
    32333234fi 
    3234 rm -f conftest.err conftest.$ac_ext 
     3235rm -f conftest.err conftest.i conftest.$ac_ext 
    32353236 
    32363237done 
    32373238# Because of `break', _AC_PREPROC_IFELSE's cleaning code was skipped. 
    3238 rm -f conftest.err conftest.$ac_ext 
     3239rm -f conftest.i conftest.err conftest.$ac_ext 
    32393240if $ac_preproc_ok; then : 
    32403241 
     
    44394440continue 
    44404441fi 
    4441 rm -f conftest.err conftest.$ac_ext 
     4442rm -f conftest.err conftest.i conftest.$ac_ext 
    44424443 
    44434444  # OK, works on sane cases.  Now check whether nonexistent headers 
     
    44554456break 
    44564457fi 
    4457 rm -f conftest.err conftest.$ac_ext 
     4458rm -f conftest.err conftest.i conftest.$ac_ext 
    44584459 
    44594460done 
    44604461# Because of `break', _AC_PREPROC_IFELSE's cleaning code was skipped. 
    4461 rm -f conftest.err conftest.$ac_ext 
     4462rm -f conftest.i conftest.err conftest.$ac_ext 
    44624463if $ac_preproc_ok; then : 
    44634464  break 
     
    44984499continue 
    44994500fi 
    4500 rm -f conftest.err conftest.$ac_ext 
     4501rm -f conftest.err conftest.i conftest.$ac_ext 
    45014502 
    45024503  # OK, works on sane cases.  Now check whether nonexistent headers 
     
    45144515break 
    45154516fi 
    4516 rm -f conftest.err conftest.$ac_ext 
     4517rm -f conftest.err conftest.i conftest.$ac_ext 
    45174518 
    45184519done 
    45194520# Because of `break', _AC_PREPROC_IFELSE's cleaning code was skipped. 
    4520 rm -f conftest.err conftest.$ac_ext 
     4521rm -f conftest.i conftest.err conftest.$ac_ext 
    45214522if $ac_preproc_ok; then : 
    45224523 
     
    67466747 
    67476748 
    6748 ac_config_files="$ac_config_files bin/dodo-config examples/Makefile examples/graphics/Makefile examples/cgi_fast/Makefile examples/cgi_basic/Makefile examples/db_mysql/Makefile examples/db_postgresql/Makefile examples/db_sqlite/Makefile examples/db/Makefile examples/io_network_server.nonblocked/Makefile examples/io_network_ssl_server.nonblocked/Makefile examples/io_stdio/Makefile examples/io_file.tools_filesystem/Makefile examples/io_memory/Makefile examples/io_pipe.io_file_fifo.pc_thread/Makefile examples/io_network.pc_thread/Makefile examples/io_network_client/Makefile examples/io_network_server/Makefile examples/io_network_ssl_server/Makefile examples/io_network_ssl_client/Makefile examples/io_network_http/Makefile examples/pc_thread/Makefile examples/pc_process/Makefile examples/pc_job/Makefile examples/tools_network/Makefile examples/tools_os/Makefile examples/tools_time/Makefile examples/tools_misc/Makefile examples/tools_code/Makefile examples/tools_logger/Makefile examples/tools_library/Makefile examples/tools_regexp/Makefile examples/xexec/Makefile examples/rpc_xml_http_client/Makefile examples/rpc_xml_cgi_server/Makefile examples/rpc_json_http_client/Makefile examples/rpc_json_cgi_server/Makefile examples/data_format_json/Makefile examples/data_format_xml/Makefile" 
     6749ac_config_files="$ac_config_files bin/dodo-config examples/Makefile examples/graphics/Makefile examples/cgi_fast/Makefile examples/cgi_basic/Makefile examples/db_mysql/Makefile examples/db_postgresql/Makefile examples/db_sqlite/Makefile examples/db/Makefile examples/io_network_server.nonblocked/Makefile examples/io_network_ssl_server.nonblocked/Makefile examples/io_stdio/Makefile examples/io_file.tools_filesystem/Makefile examples/io_memory/Makefile examples/io_pipe.io_file_fifo.pc_thread/Makefile examples/io_network.pc_thread/Makefile examples/io_network_client/Makefile examples/io_network_server/Makefile examples/io_network_ssl_server/Makefile examples/io_network_ssl_client/Makefile examples/io_network_http/Makefile examples/pc_thread/Makefile examples/pc_process/Makefile examples/pc_job/Makefile examples/pc_workqueue/Makefile examples/tools_network/Makefile examples/tools_os/Makefile examples/tools_time/Makefile examples/tools_misc/Makefile examples/tools_code/Makefile examples/tools_logger/Makefile examples/tools_library/Makefile examples/tools_regexp/Makefile examples/xexec/Makefile examples/rpc_xml_http_client/Makefile examples/rpc_xml_cgi_server/Makefile examples/rpc_json_http_client/Makefile examples/rpc_json_cgi_server/Makefile examples/data_format_json/Makefile examples/data_format_xml/Makefile" 
    67496750 
    67506751 
     
    72927293ac_log=" 
    72937294This file was extended by libdodo $as_me 0.0.0, which was 
    7294 generated by GNU Autoconf 2.66.  Invocation command line was 
     7295generated by GNU Autoconf 2.67.  Invocation command line was 
    72957296 
    72967297  CONFIG_FILES    = $CONFIG_FILES 
     
    73457346ac_cs_version="\\ 
    73467347libdodo config.status 0.0.0 
    7347 configured by $0, generated by GNU Autoconf 2.66, 
     7348configured by $0, generated by GNU Autoconf 2.67, 
    73487349  with options \\"\$ac_cs_config\\" 
    73497350 
     
    73647365do 
    73657366  case $1 in 
    7366   --*=*) 
     7367  --*=?*) 
    73677368    ac_option=`expr "X$1" : 'X\([^=]*\)='` 
    73687369    ac_optarg=`expr "X$1" : 'X[^=]*=\(.*\)'` 
     7370    ac_shift=: 
     7371    ;; 
     7372  --*=) 
     7373    ac_option=`expr "X$1" : 'X\([^=]*\)='` 
     7374    ac_optarg= 
    73697375    ac_shift=: 
    73707376    ;; 
     
    73907396    case $ac_optarg in 
    73917397    *\'*) ac_optarg=`$as_echo "$ac_optarg" | sed "s/'/'\\\\\\\\''/g"` ;; 
     7398    '') as_fn_error $? "missing file argument" ;; 
    73927399    esac 
    73937400    as_fn_append CONFIG_FILES " '$ac_optarg'" 
     
    74747481    "examples/pc_process/Makefile") CONFIG_FILES="$CONFIG_FILES examples/pc_process/Makefile" ;; 
    74757482    "examples/pc_job/Makefile") CONFIG_FILES="$CONFIG_FILES examples/pc_job/Makefile" ;; 
     7483    "examples/pc_workqueue/Makefile") CONFIG_FILES="$CONFIG_FILES examples/pc_workqueue/Makefile" ;; 
    74767484    "examples/tools_network/Makefile") CONFIG_FILES="$CONFIG_FILES examples/tools_network/Makefile" ;; 
    74777485    "examples/tools_os/Makefile") CONFIG_FILES="$CONFIG_FILES examples/tools_os/Makefile" ;; 
  • sources/configure.in

    r1464 r1468  
    811811        examples/pc_process/Makefile 
    812812        examples/pc_job/Makefile 
     813        examples/pc_workqueue/Makefile 
    813814        examples/tools_network/Makefile 
    814815        examples/tools_os/Makefile 
  • sources/include/libdodo/pc.h

    r1439 r1468  
    3939#include <libdodo/pcExecutionScheduler.h> 
    4040#include <libdodo/pcExecutionSchedulerEx.h> 
     41#include <libdodo/pcExecutionWorkqueue.h> 
    4142#include <libdodo/pcSyncDataObject.h> 
    4243#include <libdodo/pcSyncProcess.h> 
  • sources/include/libdodo/pcExecutionThread.h

    r1439 r1468  
    110110 
    111111                /** 
     112                 * @return true if calle is in the same thread as represented by instance 
     113                 */ 
     114                bool self() const; 
     115 
     116                /** 
    112117                 * @return uncought exception thrown by thread routine 
    113118                 */ 
    114                 virtual exception::basic *exception(); 
     119                exception::basic *exception(); 
    115120 
    116121#ifdef DL_EXT 
  • sources/include/libdodo/pcExecutionWorkqueue.h

    r1439 r1468  
    11/*************************************************************************** 
    2  *            pcExecutionManager.h 
     2 *            pcExecutionWorkqueue.h 
    33 * 
    4  *  Mon Mar 05 2007 
    5  *  Copyright  2007  Dmytro Milinevskyy 
     4 *  Sun Jul 25 2010 
     5 *  Copyright  2010  Dmytro Milinevskyy 
    66 *  milinevskyy@gmail.com 
    77 ****************************************************************************/ 
     
    2828 */ 
    2929 
    30 #ifndef _PCEXECUTIONMANAGER_H_ 
    31 #define _PCEXECUTIONMANAGER_H_ 1 
     30#ifndef _PCEXECUTIONWORKQUEUE_H_ 
     31#define _PCEXECUTIONWORKQUEUE_H_ 1 
    3232 
    3333#include <libdodo/directives.h> 
    3434 
     35#include <libdodo/pcExecutionJob.h> 
    3536#include <libdodo/types.h> 
    3637 
     
    3839    namespace pc { 
    3940        namespace sync { 
    40             class protector; 
     41            class thread; 
    4142        }; 
    4243 
    4344        namespace execution { 
    44             class job; 
     45            class thread; 
     46            struct __work__; 
     47            struct __wake__; 
    4548 
    4649            /** 
    47              * @class manager 
    48              * @brief provides interface for jobs management 
     50             * @class workqueue 
     51             * @brief provides workqueue/thread pool 
    4952             */ 
    50             class manager { 
     53            class workqueue { 
    5154              public: 
    5255 
    5356                /** 
    5457                 * constructor 
     58                 * @param maxThreads defines max amount of active threads 
     59                 * @param minThreads defines min amount of active threads 
     60                 * @param minIdleTime defines amount of time(in seconds) thread does nothing if amount of threads greater than minThreads 
    5561                 */ 
    56                 manager(); 
     62                workqueue(unsigned int maxThreads, 
     63                        unsigned int minThreads = 1, 
     64                        unsigned long minIdleTime = 60); 
    5765 
    5866                /** 
    5967                 * destructor 
    6068                 */ 
    61                 ~manager(); 
     69                ~workqueue(); 
    6270 
    6371                /** 
    64                  * add a job 
    65                  * @return job identificator 
    66                  * @param job defines job for managing 
     72                 * add a routine for execution 
     73                 * @param routine defines routine for execution 
     74                 * @param data defines data passed to the routine 
    6775                 */ 
    68                 unsigned long add(const job &job); 
     76                void add(routine routine, 
     77                        void *data); 
    6978 
    7079                /** 
    71                  * remove registered job 
    72                  * @param id defines job identificator 
    73                  * @param terminate defines termination condition 
     80                 * workqueue worker routine 
     81                 * @return execution status 
     82                 * @param data defines user data 
    7483                 */ 
    75                 void remove(unsigned long id, 
    76                             bool          terminate = false); 
    77  
    78                 /** 
    79                  * execute job 
    80                  * @param id defines job identificator 
    81                  */ 
    82                 void run(unsigned long id); 
    83  
    84                 /** 
    85                  * stop job 
    86                  * @param id defines job identificator 
    87                  */ 
    88                 void stop(unsigned long id); 
    89  
    90                 /** 
    91                  * stop all registered jobs 
    92                  */ 
    93                 void stop(); 
    94  
    95                 /** 
    96                  * wait for job termination 
    97                  * @return status of the job 
    98                  * @param id defines job identificator 
    99                  */ 
    100                 int wait(unsigned long id); 
    101  
    102                 /** 
    103                  * wait for all registered jobs termination 
    104                  */ 
    105                 void wait(); 
    106  
    107                 /** 
    108                  * @return true if job is running 
    109                  * @param id defines job identificator 
    110                  */ 
    111                 bool isRunning(unsigned long id) const; 
    112  
    113                 /** 
    114                  * @return amount of running jobs 
    115                  */ 
    116                 unsigned long running() const; 
    117  
    118                 /** 
    119                  * @return list of jobs in object 
    120                  */ 
    121                 dodoList<unsigned long> jobs(); 
    122  
    123                 /** 
    124                  * @return job object 
    125                  * @param id defines job identificator 
    126                  */ 
    127                 execution::job *job(unsigned long id); 
     84                static int worker(workqueue *queue); 
    12885 
    12986              protected: 
    13087 
    131                 dodoMap<unsigned long, execution::job *> handles;  ///< managed jobs 
     88                dodoList<__work__ *> tasks;  ///< pending work 
    13289 
    133                 unsigned long counter;              ///< job id counter 
     90                dodoList<execution::thread *> active;  ///< active thread 
     91                dodoList<execution::thread *> inactive;  ///< inactive thread 
    13492 
    135                 sync::protector *keeper;            ///< section locker 
     93                unsigned int maxThreads; ///< max amount of active threads 
     94                unsigned int minThreads; ///< min amount of active threads 
     95                unsigned long minIdleTime; ///< amount of time(in seconds) thread does nothing if amount of threads greater than minThreads 
     96 
     97                sync::thread *tasksProtector; ///< tasks queue protector 
     98                sync::thread *threadsProtector; ///< threads queues protector 
     99 
     100                __wake__ *wake; ///< thread wake handler 
    136101            }; 
    137102        }; 
  • sources/src/pcExecutionThread.cc

    r1464 r1468  
    439439 
    440440//------------------------------------------------------------------- 
     441 
     442bool 
     443thread::self() const 
     444{ 
     445    return (pthread_equal(pthread_self(), handle->thread) > 0); 
     446} 
     447 
     448//------------------------------------------------------------------- 
  • sources/src/pcExecutionWorkqueue.cc

    r1464 r1468  
    11/*************************************************************************** 
    2  *            pcExecutionManager.cc 
     2 *            pcExecutionWorkqueue.cc 
    33 * 
    4  *  Sun Oct  30 2007 
    5  *  Copyright  2007  Dmytro Milinevskyy 
     4 *  Sun Jul 25 2010 
     5 *  Copyright  2010  Dmytro Milinevskyy 
    66 *  milinevskyy@gmail.com 
    77 ****************************************************************************/ 
     
    3030#include <libdodo/directives.h> 
    3131 
    32 #include <libdodo/pcExecutionManager.h> 
     32#include <time.h> 
     33#ifdef PTHREAD_EXT 
     34#include <pthread.h> 
     35#include <errno.h> 
     36#endif 
     37 
     38#include <libdodo/pcExecutionWorkqueue.h> 
     39#include <libdodo/pcExecutionThread.h> 
    3340#include <libdodo/pcExecutionJob.h> 
    34 #include <libdodo/pcExecutionThread.h> 
    35 #include <libdodo/pcExecutionProcess.h> 
    36 #include <libdodo/pcExecutionManagerEx.h> 
    3741#include <libdodo/types.h> 
    3842#include <libdodo/pcSyncThread.h> 
    3943#include <libdodo/pcSyncStack.h> 
    4044 
     45namespace dodo { 
     46    namespace pc { 
     47        namespace execution { 
     48            /** 
     49             * @struct __work__ 
     50             * @brief defines workqueue task 
     51             */ 
     52            struct __work__ { 
     53                /** 
     54                 * constructor 
     55                 * @param routine defines routine for execution 
     56                 * @param data defines data passed to the routine 
     57                 */ 
     58                __work__(pc::execution::routine routine, 
     59                        void *data) : routine(routine), 
     60                                      data(data) 
     61                { 
     62                } 
     63 
     64                pc::execution::routine routine; ///< task to be exeuted 
     65                void *data; ///< task data 
     66            }; 
     67 
     68            /** 
     69             * @struct __wake__ 
     70             * @brief defines wake mechanism 
     71             */ 
     72            struct __wake__ { 
     73                /** 
     74                 * constructor 
     75                 */ 
     76                __wake__() 
     77                { 
     78#ifdef PTHREAD_EXT 
     79                    pthread_mutex_init(&mutex, NULL); 
     80                    pthread_cond_init(&cond, NULL); 
     81#endif 
     82                } 
     83 
     84                /** 
     85                 * wait for event occurency 
     86                 * @return true if event occured 
     87                 * @param timeout defines time to wait for the event 
     88                 */ 
     89                bool 
     90                wait(timespec *timeout) 
     91                { 
     92#ifdef PTHREAD_EXT 
     93                    int rc; 
     94                    timespec now; 
     95 
     96                    clock_gettime(CLOCK_REALTIME, &now); 
     97                    now.tv_sec += timeout->tv_sec; 
     98 
     99                    pthread_mutex_lock(&mutex); 
     100                    rc = pthread_cond_timedwait(&cond, &mutex, &now); 
     101                    pthread_mutex_unlock(&mutex); 
     102 
     103                    return (rc != ETIMEDOUT); 
     104#else 
     105                    return true; 
     106#endif 
     107                } 
     108 
     109                /** 
     110                 * raise an event 
     111                 */ 
     112                void 
     113                raise() 
     114                { 
     115#ifdef PTHREAD_EXT 
     116                    pthread_cond_signal(&cond); 
     117#endif 
     118                } 
     119 
     120#ifdef PTHREAD_EXT 
     121                pthread_cond_t cond; ///< condition handler 
     122                pthread_mutex_t mutex; ///< lock mutex 
     123#endif 
     124            }; 
     125        }; 
     126    }; 
     127}; 
     128 
    41129using namespace dodo::pc::execution; 
    42130 
    43 manager::manager() : counter(0), 
    44                      keeper(new pc::sync::thread) 
    45 { 
    46 } 
    47  
    48 //------------------------------------------------------------------- 
    49  
    50 manager::~manager() 
    51 { 
    52     dodoMap<unsigned long, execution::job *>::const_iterator i = handles.begin(), j = handles.end(); 
    53  
    54     for (; i != j; ++i) 
    55         delete i->second; 
    56  
    57     delete keeper; 
    58 } 
    59  
    60 //------------------------------------------------------------------- 
    61  
    62 unsigned long 
    63 manager::add(const execution::job &job) 
    64 { 
    65     pc::sync::stack tg(keeper); 
    66  
    67     execution::job *j; 
    68  
    69     execution::job *orig = const_cast<execution::job *>(&job); 
    70  
    71     switch (job.type) { 
    72         case execution::job::TYPE_PROCESS: 
    73             j = new process(*dynamic_cast<process *>(orig)); 
    74  
     131workqueue::workqueue(unsigned int maxThreads, 
     132        unsigned int minThreads, 
     133        unsigned long minIdleTime) : maxThreads(maxThreads), 
     134                                     minThreads(minThreads), 
     135                                     minIdleTime(minIdleTime), 
     136                                     tasksProtector(new pc::sync::thread), 
     137                                     threadsProtector(new pc::sync::thread), 
     138                                     wake(new __wake__) 
     139{ 
     140    if (maxThreads < minThreads) 
     141        maxThreads = minThreads; 
     142 
     143    thread *t; 
     144    for (unsigned int i=0; i<minThreads; ++i) { 
     145        t = new thread((routine)workqueue::worker, this, ON_DESTRUCTION_STOP); 
     146        inactive.push_back(t); 
     147        t->run(); 
     148    } 
     149} 
     150 
     151//------------------------------------------------------------------- 
     152 
     153workqueue::~workqueue() 
     154{ 
     155    dodoList<thread *>::iterator i = inactive.begin(), j = inactive.end(); 
     156 
     157    for (; i != j; ++i) { 
     158        /* (*i)->stop(); */ 
     159        delete *i; 
     160    } 
     161 
     162    delete tasksProtector; 
     163    delete threadsProtector; 
     164    delete wake; 
     165} 
     166 
     167//------------------------------------------------------------------- 
     168 
     169int 
     170workqueue::worker(workqueue *queue) 
     171{ 
     172    __work__ *work; 
     173    thread *self = NULL; 
     174    timespec ts = {0, 0}; 
     175    bool active = false; 
     176 
     177    dodoList<thread *> &inactiveQueue = queue->inactive, 
     178        &activeQueue = queue->active; 
     179    sync::thread *tasksProtector = queue->tasksProtector, 
     180        *threadsProtector = queue->threadsProtector; 
     181    dodoList<__work__ *> &tasks = queue->tasks; 
     182 
     183    threadsProtector->acquire(); 
     184    for (dodoList<thread *>::iterator i = inactiveQueue.begin(), j = inactiveQueue.end(); 
     185         i != j; 
     186         ++i) { 
     187        if ((*i)->self()) { 
     188            self = *i; 
    75189            break; 
    76  
    77         case execution::job::TYPE_THREAD: 
    78             j = new thread(*dynamic_cast<thread *>(orig)); 
    79  
    80             break; 
    81  
    82         default: 
    83             dodo_throw exception::basic(exception::MODULE_PCEXECUTIONMANAGER, MANAGEREX_ADD, exception::ERRNO_LIBDODO, MANAGEREX_UNKNOWNJOB, PCEXECUTIONMANAGEREX_UNKNOWNJOB_STR, __LINE__, __FILE__); 
    84     } 
    85  
    86     handles.insert(std::make_pair(counter, j)); 
    87  
    88     return counter++; 
     190        } 
     191    } 
     192    threadsProtector->release(); 
     193 
     194    for (;;) { 
     195        tasksProtector->acquire(); 
     196        if (tasks.size()) { 
     197            work = *tasks.begin(); 
     198            tasks.pop_front(); 
     199        } else { 
     200            work = NULL; 
     201        } 
     202        tasksProtector->release(); 
     203 
     204        if (work) { 
     205            if (!active) { 
     206                threadsProtector->acquire(); 
     207                inactiveQueue.remove(self); 
     208                activeQueue.push_back(self); 
     209                threadsProtector->release(); 
     210 
     211                active = true; 
     212            } 
     213 
     214            work->routine(work->data); 
     215            delete work; 
     216        } else { 
     217            if (active) { 
     218                threadsProtector->acquire(); 
     219                activeQueue.remove(self); 
     220                inactiveQueue.push_back(self); 
     221                threadsProtector->release(); 
     222 
     223                active = false; 
     224            } 
     225 
     226            unsigned long delta = queue->maxThreads - queue->minThreads; 
     227            if (delta > 0) { 
     228                unsigned long k; 
     229                threadsProtector->acquire(); 
     230                k = activeQueue.size() + inactiveQueue.size(); 
     231                threadsProtector->release(); 
     232                k = ((k - queue->minThreads)*100)/delta; 
     233                ts.tv_sec = (queue->minIdleTime*(100 + k))/100; 
     234            } else { 
     235                ts.tv_sec = 0x00FFFFFF; /* 194 days, enough as for indefinite sleep */ 
     236            } 
     237 
     238          sleep: 
     239            if (!queue->wake->wait(&ts)) { 
     240                unsigned long queueSize = 0; 
     241                tasksProtector->acquire(); 
     242                queueSize = tasks.size(); 
     243                tasksProtector->release(); 
     244 
     245                if (queueSize == 0) { 
     246                    threadsProtector->acquire(); 
     247                    if (inactiveQueue.size() + activeQueue.size() > queue->minThreads) { 
     248                        inactiveQueue.remove(self); 
     249                        threadsProtector->release(); 
     250 
     251                        return 0; 
     252                    } 
     253                    threadsProtector->release(); 
     254 
     255                    goto sleep; 
     256                } 
     257            } 
     258        } 
     259    } 
     260 
     261    return 0; 
    89262} 
    90263 
     
    92265 
    93266void 
    94 manager::remove(unsigned long id, 
    95                 bool          terminate) 
    96 { 
    97     pc::sync::stack tg(keeper); 
    98  
    99     dodoMap<unsigned long, execution::job *>::iterator job = handles.find(id); 
    100  
    101     if (job == handles.end()) 
     267workqueue::add(routine routine, 
     268        void *data) 
     269{ 
     270    struct __work__ *work = new __work__(routine, data); 
     271    unsigned long activeSize, inactiveSize; 
     272 
     273    tasksProtector->acquire(); 
     274    tasks.push_back(work); 
     275    tasksProtector->release(); 
     276 
     277    threadsProtector->acquire(); 
     278    inactiveSize = inactive.size(); 
     279    activeSize = active.size(); 
     280    threadsProtector->release(); 
     281 
     282    if (inactiveSize) { 
     283        wake->raise(); 
     284 
    102285        return; 
    103  
    104     if (terminate && job->second->isRunning()) 
    105         job->second->stop(); 
    106  
    107     delete job->second; 
    108  
    109     handles.erase(job); 
    110 } 
    111  
    112 //------------------------------------------------------------------- 
    113  
    114 void 
    115 manager::run(unsigned long id) 
    116 { 
    117     pc::sync::stack tg(keeper); 
    118  
    119     dodoMap<unsigned long, execution::job *>::iterator job = handles.find(id); 
    120  
    121     if (job == handles.end()) 
    122         dodo_throw exception::basic(exception::MODULE_PCEXECUTIONMANAGER, MANAGEREX_RUN, exception::ERRNO_LIBDODO, MANAGEREX_NOTFOUND, PCEXECUTIONMANAGEREX_NOTFOUND_STR, __LINE__, __FILE__); 
    123  
    124     job->second->run(); 
    125 } 
    126  
    127 //------------------------------------------------------------------- 
    128  
    129 void 
    130 manager::stop(unsigned long id) 
    131 { 
    132     pc::sync::stack tg(keeper); 
    133  
    134     dodoMap<unsigned long, execution::job *>::iterator job = handles.find(id); 
    135  
    136     if (job == handles.end()) 
    137         dodo_throw exception::basic(exception::MODULE_PCEXECUTIONMANAGER, MANAGEREX_STOP, exception::ERRNO_LIBDODO, MANAGEREX_NOTFOUND, PCEXECUTIONMANAGEREX_NOTFOUND_STR, __LINE__, __FILE__); 
    138  
    139     job->second->stop(); 
    140 } 
    141  
    142 //------------------------------------------------------------------- 
    143  
    144 void 
    145 manager::stop() 
    146 { 
    147     pc::sync::stack tg(keeper); 
    148  
    149     dodoMap<unsigned long, execution::job *>::iterator i = handles.begin(), j = handles.end(); 
    150  
    151     for (; i != j; ++i) 
    152         i->second->stop(); 
    153 } 
    154  
    155 //------------------------------------------------------------------- 
    156  
    157 int 
    158 manager::wait(unsigned long id) 
    159 { 
    160     pc::sync::stack tg(keeper); 
    161  
    162     dodoMap<unsigned long, execution::job *>::iterator job = handles.find(id); 
    163  
    164     if (job == handles.end()) 
    165         dodo_throw exception::basic(exception::MODULE_PCEXECUTIONMANAGER, MANAGEREX_WAIT, exception::ERRNO_LIBDODO, MANAGEREX_NOTFOUND, PCEXECUTIONMANAGEREX_NOTFOUND_STR, __LINE__, __FILE__); 
    166  
    167     return job->second->wait(); 
    168 } 
    169  
    170 //------------------------------------------------------------------- 
    171  
    172 void 
    173 manager::wait() 
    174 { 
    175     pc::sync::stack tg(keeper); 
    176  
    177     dodoMap<unsigned long, execution::job *>::iterator i = handles.begin(), j = handles.end(); 
    178  
    179     for (; i != j; ++i) 
    180         i->second->wait(); 
    181 } 
    182  
    183 //------------------------------------------------------------------- 
    184  
    185 bool 
    186 manager::isRunning(unsigned long id) const 
    187 { 
    188     pc::sync::stack tg(keeper); 
    189  
    190     dodoMap<unsigned long, execution::job *>::const_iterator job = handles.find(id); 
    191  
    192     if (job == handles.end()) 
    193         dodo_throw exception::basic(exception::MODULE_PCEXECUTIONMANAGER, MANAGEREX_ISRUNNING, exception::ERRNO_LIBDODO, MANAGEREX_NOTFOUND, PCEXECUTIONMANAGEREX_NOTFOUND_STR, __LINE__, __FILE__); 
    194  
    195     return job->second->isRunning(); 
    196 } 
    197  
    198 //------------------------------------------------------------------- 
    199  
    200 unsigned long 
    201 manager::running() const 
    202 { 
    203     pc::sync::stack tg(keeper); 
    204  
    205     unsigned long jobs; 
    206  
    207     dodoMap<unsigned long, execution::job *>::const_iterator i = handles.begin(), j = handles.end(); 
    208  
    209     for (; i != j; ++i) 
    210         if (i->second->isRunning()) 
    211             ++jobs; 
    212  
    213     return jobs; 
    214 } 
    215  
    216 //------------------------------------------------------------------- 
    217  
    218 dodoList<unsigned long> 
    219 manager::jobs() 
    220 { 
    221     pc::sync::stack tg(keeper); 
    222  
    223     dodoList<unsigned long> jobs; 
    224  
    225     dodoMap<unsigned long, execution::job *>::const_iterator i = handles.begin(), j = handles.end(); 
    226  
    227     for (; i != j; ++i) 
    228         jobs.push_back(i->first); 
    229  
    230     return jobs; 
    231 } 
    232  
    233 //------------------------------------------------------------------- 
    234  
    235 dodo::pc::execution::job * 
    236 manager::job(unsigned long id) 
    237 { 
    238     pc::sync::stack tg(keeper); 
    239  
    240     dodoMap<unsigned long, execution::job *>::const_iterator job = handles.find(id); 
    241  
    242     if (job == handles.end()) 
    243         dodo_throw exception::basic(exception::MODULE_PCEXECUTIONMANAGER, MANAGEREX_JOB, exception::ERRNO_LIBDODO, MANAGEREX_NOTFOUND, PCEXECUTIONMANAGEREX_NOTFOUND_STR, __LINE__, __FILE__); 
    244  
    245     return job->second; 
    246 } 
    247  
    248 //------------------------------------------------------------------- 
     286    } 
     287    if (inactiveSize + activeSize == maxThreads) 
     288        return; 
     289 
     290    thread *t = new thread((execution::routine)workqueue::worker, this, ON_DESTRUCTION_STOP); 
     291 
     292    threadsProtector->acquire(); 
     293    inactive.push_back(t); 
     294    threadsProtector->release(); 
     295 
     296    t->run(); 
     297} 
     298 
     299//------------------------------------------------------------------- 
Note: See TracChangeset for help on using the changeset viewer.