Changeset 1482:641f3eb4fc92


Ignore:
Timestamp:
08/29/10 12:17:53 (18 months ago)
Author:
niam
Branch:
default
Message:

[pc::execution::workqueue] exploit pc::notification, fixes on destruction

Location:
sources
Files:
3 edited

Legend:

Unmodified
Added
Removed
  • sources/examples/pc_workqueue/test.cc

    r1472 r1482  
    2020work(void *data) 
    2121{ 
    22     if (data == NULL) { 
    23         lock.acquire(); 
    24         ++done; 
    25         cout << "#" << done << ": NULL : " << tools::time::nowMs() << endl, cout.flush(); 
    26         lock.release(); 
    27  
    28         tools::os::sleep(5); 
    29         tools::os::die("last task\n", 0); 
    30     } 
    31  
    3222    dodo_try { 
    3323        tools::os::sleep(1); 
     
    3525        lock.acquire(); 
    3626        ++done; 
    37         cout << "#" << done << ": " << (char *)data << ": " << tools::time::nowMs() << endl, cout.flush(); 
     27        cout << "#" << done << ": " << (char *)data << " : " << tools::time::nowMs() << endl, cout.flush(); 
    3828        lock.release(); 
    3929    } dodo_catch (exception::basic *e)   { 
     
    4939{ 
    5040    dodo_try { 
    51         execution::workqueue wq(10, 5); 
     41        execution::workqueue wq(10, 5, 1); 
    5242 
    5343        tools::os::sleep(1); 
     
    5747        for (int i=0;i<100;++i) 
    5848            wq.add(work, (void *)"work"); 
    59         wq.add(work, NULL); 
     49        wq.add(work, (void *)"last"); 
    6050 
    61         tools::os::sleep(600); 
    62  
     51        tools::os::sleep(20); 
    6352    } dodo_catch (exception::basic *e)   { 
    6453        cout << (dodo::string)*e << "\t" << e->line << "\t" << e->file << endl; 
  • sources/include/libdodo/pcExecutionWorkqueue.h

    r1468 r1482  
    4242        }; 
    4343 
     44        namespace notification { 
     45            class thread; 
     46        }; 
     47 
    4448        namespace execution { 
    4549            class thread; 
    46             struct __work__; 
    47             struct __wake__; 
    4850 
    4951            /** 
     
    8688              protected: 
    8789 
     90                struct __work__; 
    8891                dodoList<__work__ *> tasks;  ///< pending work 
    8992 
     
    98101                sync::thread *threadsProtector; ///< threads queues protector 
    99102 
    100                 __wake__ *wake; ///< thread wake handler 
     103                notification::thread *notification; ///< thread wake handler 
     104 
     105                bool closing;                   ///< closing flag 
    101106            }; 
    102107        }; 
  • sources/src/pcExecutionWorkqueue.cc

    r1478 r1482  
    3939#include <libdodo/pcExecutionThread.h> 
    4040#include <libdodo/pcExecutionJob.h> 
     41#include <libdodo/pcSyncThread.h> 
     42#include <libdodo/pcNotificationThread.h> 
     43#include <libdodo/pcSyncStack.h> 
    4144#include <libdodo/types.h> 
    42 #include <libdodo/pcSyncThread.h> 
    43 #include <libdodo/pcSyncStack.h> 
    4445 
    4546namespace dodo { 
     
    5051             * @brief defines workqueue task 
    5152             */ 
    52             struct __work__ { 
     53            struct workqueue::__work__ { 
    5354                /** 
    5455                 * constructor 
     
    6566                void *data; ///< task data 
    6667            }; 
    67  
    68             /** 
    69              * @struct __wake__ 
    70              * @brief defines wake mechanism 
    71              */ 
    72             struct __wake__ { 
    73                 /** 
    74                  * constructor 
    75                  */ 
    76                 __wake__() 
    77                 { 
    78 #ifdef PTHREAD_EXT 
    79                     pthread_mutex_init(&mutex, NULL); 
    80                     pthread_cond_init(&cond, NULL); 
    81 #endif 
    82                 } 
    83  
    84                 /** 
    85                  * wait for event occurency 
    86                  * @return true if event occured 
    87                  * @param timeout defines time to wait for the event 
    88                  */ 
    89                 bool 
    90                 wait(timespec *timeout) 
    91                 { 
    92 #ifdef PTHREAD_EXT 
    93                     int rc; 
    94                     timespec now; 
    95  
    96                     clock_gettime(CLOCK_REALTIME, &now); 
    97                     now.tv_sec += timeout->tv_sec; 
    98                     now.tv_nsec += timeout->tv_nsec; 
    99                     if (now.tv_nsec > 999999999) { 
    100                         now.tv_sec += 1; 
    101                         now.tv_nsec -= 999999999; 
    102                     } 
    103  
    104                     pthread_mutex_lock(&mutex); 
    105                     rc = pthread_cond_timedwait(&cond, &mutex, &now); 
    106                     pthread_mutex_unlock(&mutex); 
    107  
    108                     return (rc != ETIMEDOUT); 
    109 #else 
    110                     return true; 
    111 #endif 
    112                 } 
    113  
    114                 /** 
    115                  * raise an event 
    116                  */ 
    117                 void 
    118                 raise() 
    119                 { 
    120 #ifdef PTHREAD_EXT 
    121                     pthread_cond_signal(&cond); 
    122 #endif 
    123                 } 
    124  
    125 #ifdef PTHREAD_EXT 
    126                 pthread_cond_t cond; ///< condition handler 
    127                 pthread_mutex_t mutex; ///< lock mutex 
    128 #endif 
    129             }; 
    13068        }; 
    13169    }; 
     
    14179                                     tasksProtector(new pc::sync::thread), 
    14280                                     threadsProtector(new pc::sync::thread), 
    143                                      wake(new __wake__) 
     81                                     notification(new pc::notification::thread(*tasksProtector)), 
     82                                     closing(false) 
    14483{ 
    14584    if (maxThreads < minThreads) 
     
    15897workqueue::~workqueue() 
    15998{ 
    160     dodoList<thread *>::iterator i = inactive.begin(), j = inactive.end(); 
    161  
     99    dodoList<thread *>::iterator i, j; 
     100    dodoList<__work__ *>::iterator o, p; 
     101 
     102    threadsProtector->acquire(); 
     103    closing = true; 
     104    threadsProtector->release(); 
     105 
     106    tasksProtector->acquire(); 
     107    o = tasks.begin(); 
     108    p = tasks.end(); 
     109    for (; o != p; ++o) 
     110        delete *o; 
     111    tasksProtector->release(); 
     112 
     113    notification->notify(true); 
     114 
     115    threadsProtector->acquire(); 
     116    i = inactive.begin(); 
     117    j = inactive.end(); 
    162118    for (; i != j; ++i) { 
    163119        /* (*i)->stop(); */ 
    164120        delete *i; 
    165121    } 
    166  
     122    threadsProtector->release(); 
     123 
     124    delete notification; 
    167125    delete tasksProtector; 
    168126    delete threadsProtector; 
    169     delete wake; 
    170127} 
    171128 
     
    177134    __work__ *work; 
    178135    thread *self = NULL; 
    179     timespec ts = {0, 0}; 
     136    unsigned long timeout; 
    180137    bool active = false; 
    181138 
     
    197154    threadsProtector->release(); 
    198155 
     156    /* task protector is aquired in a strange manner 
     157       to decrease lock contention 
     158       due to using notification mecanism */ 
     159    tasksProtector->acquire(); 
    199160    for (;;) { 
    200         tasksProtector->acquire(); 
    201161        if (tasks.size()) { 
    202162            work = *tasks.begin(); 
     
    219179            work->routine(work->data); 
    220180            delete work; 
     181 
     182            tasksProtector->acquire(); 
    221183        } else { 
    222184            if (active) { 
     
    228190                active = false; 
    229191            } 
     192 
     193            if (queue->closing) 
     194                return 0; 
    230195 
    231196            unsigned long delta = queue->maxThreads - queue->minThreads; 
     
    236201                threadsProtector->release(); 
    237202                k = ((k - queue->minThreads)*100)/delta; 
    238                 ts.tv_sec = (queue->minIdleTime*(100 + k))/100; 
     203                timeout = (queue->minIdleTime*(100 + k))*10000; 
    239204            } else { 
    240                 ts.tv_sec = 0x00FFFFFF; /* 194 days, enough as for indefinite sleep */ 
     205                timeout = 0; 
    241206            } 
    242207 
    243208          sleep: 
    244             if (!queue->wake->wait(&ts)) { 
    245                 unsigned long queueSize = 0; 
    246                 tasksProtector->acquire(); 
    247                 queueSize = tasks.size(); 
     209            tasksProtector->acquire(); 
     210            if (!queue->notification->wait(timeout)) { 
     211                unsigned long queueSize = tasks.size(); 
    248212                tasksProtector->release(); 
     213 
     214                if (queue->closing) 
     215                    return 0; 
    249216 
    250217                if (queueSize == 0) { 
     
    260227                    goto sleep; 
    261228                } 
     229 
     230                tasksProtector->acquire(); 
    262231            } 
    263232        } 
     
    286255 
    287256    if (inactiveSize) { 
    288         wake->raise(); 
     257        notification->notify(); 
    289258 
    290259        return; 
Note: See TracChangeset for help on using the changeset viewer.