Changeset 1482:641f3eb4fc92
- Timestamp:
- 08/29/10 12:17:53 (18 months ago)
- Branch:
- default
- Location:
- sources
- Files:
-
- 3 edited
-
examples/pc_workqueue/test.cc (modified) (4 diffs)
-
include/libdodo/pcExecutionWorkqueue.h (modified) (3 diffs)
-
src/pcExecutionWorkqueue.cc (modified) (12 diffs)
Legend:
- Unmodified
- Added
- Removed
-
sources/examples/pc_workqueue/test.cc
r1472 r1482 20 20 work(void *data) 21 21 { 22 if (data == NULL) {23 lock.acquire();24 ++done;25 cout << "#" << done << ": NULL : " << tools::time::nowMs() << endl, cout.flush();26 lock.release();27 28 tools::os::sleep(5);29 tools::os::die("last task\n", 0);30 }31 32 22 dodo_try { 33 23 tools::os::sleep(1); … … 35 25 lock.acquire(); 36 26 ++done; 37 cout << "#" << done << ": " << (char *)data << " : " << tools::time::nowMs() << endl, cout.flush();27 cout << "#" << done << ": " << (char *)data << " : " << tools::time::nowMs() << endl, cout.flush(); 38 28 lock.release(); 39 29 } dodo_catch (exception::basic *e) { … … 49 39 { 50 40 dodo_try { 51 execution::workqueue wq(10, 5 );41 execution::workqueue wq(10, 5, 1); 52 42 53 43 tools::os::sleep(1); … … 57 47 for (int i=0;i<100;++i) 58 48 wq.add(work, (void *)"work"); 59 wq.add(work, NULL);49 wq.add(work, (void *)"last"); 60 50 61 tools::os::sleep(600); 62 51 tools::os::sleep(20); 63 52 } dodo_catch (exception::basic *e) { 64 53 cout << (dodo::string)*e << "\t" << e->line << "\t" << e->file << endl; -
sources/include/libdodo/pcExecutionWorkqueue.h
r1468 r1482 42 42 }; 43 43 44 namespace notification { 45 class thread; 46 }; 47 44 48 namespace execution { 45 49 class thread; 46 struct __work__;47 struct __wake__;48 50 49 51 /** … … 86 88 protected: 87 89 90 struct __work__; 88 91 dodoList<__work__ *> tasks; ///< pending work 89 92 … … 98 101 sync::thread *threadsProtector; ///< threads queues protector 99 102 100 __wake__ *wake; ///< thread wake handler 103 notification::thread *notification; ///< thread wake handler 104 105 bool closing; ///< closing flag 101 106 }; 102 107 }; -
sources/src/pcExecutionWorkqueue.cc
r1478 r1482 39 39 #include <libdodo/pcExecutionThread.h> 40 40 #include <libdodo/pcExecutionJob.h> 41 #include <libdodo/pcSyncThread.h> 42 #include <libdodo/pcNotificationThread.h> 43 #include <libdodo/pcSyncStack.h> 41 44 #include <libdodo/types.h> 42 #include <libdodo/pcSyncThread.h>43 #include <libdodo/pcSyncStack.h>44 45 45 46 namespace dodo { … … 50 51 * @brief defines workqueue task 51 52 */ 52 struct __work__ {53 struct workqueue::__work__ { 53 54 /** 54 55 * constructor … … 65 66 void *data; ///< task data 66 67 }; 67 68 /**69 * @struct __wake__70 * @brief defines wake mechanism71 */72 struct __wake__ {73 /**74 * constructor75 */76 __wake__()77 {78 #ifdef PTHREAD_EXT79 pthread_mutex_init(&mutex, NULL);80 pthread_cond_init(&cond, NULL);81 #endif82 }83 84 /**85 * wait for event occurency86 * @return true if event occured87 * @param timeout defines time to wait for the event88 */89 bool90 wait(timespec *timeout)91 {92 #ifdef PTHREAD_EXT93 int rc;94 timespec now;95 96 clock_gettime(CLOCK_REALTIME, &now);97 now.tv_sec += timeout->tv_sec;98 now.tv_nsec += timeout->tv_nsec;99 if (now.tv_nsec > 999999999) {100 now.tv_sec += 1;101 now.tv_nsec -= 999999999;102 }103 104 pthread_mutex_lock(&mutex);105 rc = pthread_cond_timedwait(&cond, &mutex, &now);106 pthread_mutex_unlock(&mutex);107 108 return (rc != ETIMEDOUT);109 #else110 return true;111 #endif112 }113 114 /**115 * raise an event116 */117 void118 raise()119 {120 #ifdef PTHREAD_EXT121 pthread_cond_signal(&cond);122 #endif123 }124 125 #ifdef PTHREAD_EXT126 pthread_cond_t cond; ///< condition handler127 pthread_mutex_t mutex; ///< lock mutex128 #endif129 };130 68 }; 131 69 }; … … 141 79 tasksProtector(new pc::sync::thread), 142 80 threadsProtector(new pc::sync::thread), 143 wake(new __wake__) 81 notification(new pc::notification::thread(*tasksProtector)), 82 closing(false) 144 83 { 145 84 if (maxThreads < minThreads) … … 158 97 workqueue::~workqueue() 159 98 { 160 dodoList<thread *>::iterator i = inactive.begin(), j = inactive.end(); 161 99 dodoList<thread *>::iterator i, j; 100 dodoList<__work__ *>::iterator o, p; 101 102 threadsProtector->acquire(); 103 closing = true; 104 threadsProtector->release(); 105 106 tasksProtector->acquire(); 107 o = tasks.begin(); 108 p = tasks.end(); 109 for (; o != p; ++o) 110 delete *o; 111 tasksProtector->release(); 112 113 notification->notify(true); 114 115 threadsProtector->acquire(); 116 i = inactive.begin(); 117 j = inactive.end(); 162 118 for (; i != j; ++i) { 163 119 /* (*i)->stop(); */ 164 120 delete *i; 165 121 } 166 122 threadsProtector->release(); 123 124 delete notification; 167 125 delete tasksProtector; 168 126 delete threadsProtector; 169 delete wake;170 127 } 171 128 … … 177 134 __work__ *work; 178 135 thread *self = NULL; 179 timespec ts = {0, 0};136 unsigned long timeout; 180 137 bool active = false; 181 138 … … 197 154 threadsProtector->release(); 198 155 156 /* task protector is aquired in a strange manner 157 to decrease lock contention 158 due to using notification mecanism */ 159 tasksProtector->acquire(); 199 160 for (;;) { 200 tasksProtector->acquire();201 161 if (tasks.size()) { 202 162 work = *tasks.begin(); … … 219 179 work->routine(work->data); 220 180 delete work; 181 182 tasksProtector->acquire(); 221 183 } else { 222 184 if (active) { … … 228 190 active = false; 229 191 } 192 193 if (queue->closing) 194 return 0; 230 195 231 196 unsigned long delta = queue->maxThreads - queue->minThreads; … … 236 201 threadsProtector->release(); 237 202 k = ((k - queue->minThreads)*100)/delta; 238 t s.tv_sec = (queue->minIdleTime*(100 + k))/100;203 timeout = (queue->minIdleTime*(100 + k))*10000; 239 204 } else { 240 t s.tv_sec = 0x00FFFFFF; /* 194 days, enough as for indefinite sleep */205 timeout = 0; 241 206 } 242 207 243 208 sleep: 244 if (!queue->wake->wait(&ts)) { 245 unsigned long queueSize = 0; 246 tasksProtector->acquire(); 247 queueSize = tasks.size(); 209 tasksProtector->acquire(); 210 if (!queue->notification->wait(timeout)) { 211 unsigned long queueSize = tasks.size(); 248 212 tasksProtector->release(); 213 214 if (queue->closing) 215 return 0; 249 216 250 217 if (queueSize == 0) { … … 260 227 goto sleep; 261 228 } 229 230 tasksProtector->acquire(); 262 231 } 263 232 } … … 286 255 287 256 if (inactiveSize) { 288 wake->raise();257 notification->notify(); 289 258 290 259 return;
Note: See TracChangeset
for help on using the changeset viewer.
