source: sources/src/pcExecutionWorkqueue.cc @ 1500:872b0b69e0d5

Revision 1500:872b0b69e0d5, 8.1 KB checked in by niam, 12 months ago (diff)

{issue #103} added implementation of sl- and dl- lists

Line 
1/***************************************************************************
2 *            pcExecutionWorkqueue.cc
3 *
4 *  Sun Jul 25 2010
5 *  Copyright  2010  Dmytro Milinevskyy
6 *  milinevskyy@gmail.com
7 ****************************************************************************/
8
9/*
10 *  This program is free software; you can redistribute it and/or modify
11 *  it under the terms of the GNU Lesser General Public License version 2.1 as published by
12 *  the Free Software Foundation;
13 *
14 *  This program is distributed in the hope that it will be useful,
15 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
16 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17 *  GNU Library General Public License for more details.
18 *
19 *  You should have received a copy of the GNU Lesser General Public License
20 *  along with this program; if not, write to the Free Software
21 *  Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
22 */
23
24/**
25 * vim indentation settings
26 * set tabstop=4
27 * set shiftwidth=4
28 */
29
30#include <libdodo/directives.h>
31
32#include <time.h>
33#ifdef PTHREAD_EXT
34#include <pthread.h>
35#include <errno.h>
36#endif
37
38#include <libdodo/pcExecutionWorkqueue.h>
39#include <libdodo/pcExecutionThread.h>
40#include <libdodo/pcExecutionJob.h>
41#include <libdodo/pcSyncThread.h>
42#include <libdodo/pcNotificationThread.h>
43#include <libdodo/exceptionBasic.h>
44#include <libdodo/pcSyncStack.h>
45#include <libdodo/types.h>
46
47namespace dodo {
48    namespace pc {
49        namespace execution {
50            /**
51             * @struct __work__
52             * @brief defines workqueue task
53             */
54            struct workqueue::__work__ {
55                /**
56                 * constructor
57                 * @param routine defines routine for execution
58                 * @param data defines data passed to the routine
59                 */
60                __work__(pc::execution::routine routine,
61                        void *data) : routine(routine),
62                                      data(data)
63                {
64                }
65
66                pc::execution::routine routine; ///< task to be exeuted
67                void *data; ///< task data
68            };
69        };
70    };
71};
72
73using namespace dodo::pc::execution;
74
75workqueue::workqueue(unsigned int maxThreads,
76        unsigned int minThreads,
77        unsigned long minIdleTime) : maxThreads(maxThreads),
78                                     minThreads(minThreads),
79                                     minIdleTime(minIdleTime),
80                                     tasksProtector(NULL),
81                                     threadsProtector(NULL),
82                                     notification(NULL),
83                                     closing(false)
84{
85    dodo_try {
86        tasksProtector = new pc::sync::thread;
87        threadsProtector = new pc::sync::thread;
88        notification = new pc::notification::thread(*tasksProtector);
89
90        if (maxThreads < minThreads)
91            maxThreads = minThreads;
92
93        thread *t;
94        for (unsigned int i=0; i<minThreads; ++i) {
95            t = new thread((routine)workqueue::worker, this, ON_DESTRUCTION_STOP);
96            inactive.pushBack(t);
97            t->run();
98        }
99    } dodo_catch (exception::basic *e UNUSED) {
100        delete notification;
101        delete tasksProtector;
102        delete threadsProtector;
103
104        dodo_rethrow;
105    }
106}
107
108//-------------------------------------------------------------------
109
110workqueue::~workqueue()
111{
112    dodo::dlList<thread *>::iterator i, j;
113    dodo::dlList<__work__ *>::iterator o, p;
114
115    tasksProtector->acquire();
116    closing = true;
117
118    o = tasks.begin();
119    p = tasks.end();
120    for (; o != p; ++o)
121        delete *o;
122    tasksProtector->release();
123
124    notification->notify(true);
125
126    threadsProtector->acquire();
127    i = inactive.begin();
128    j = inactive.end();
129    for (; i != j; ++i) {
130        /* (*i)->wait(); */
131        delete *i;
132    }
133    threadsProtector->release();
134
135    delete notification;
136    delete tasksProtector;
137    delete threadsProtector;
138}
139
140//-------------------------------------------------------------------
141
142int
143workqueue::worker(workqueue *queue)
144{
145    __work__ *work;
146    thread *self = NULL;
147    unsigned long timeout;
148    bool active = false;
149
150    dodo::dlList<thread *> &inactiveQueue = queue->inactive,
151        &activeQueue = queue->active;
152    sync::thread *tasksProtector = queue->tasksProtector,
153        *threadsProtector = queue->threadsProtector;
154    dodo::dlList<__work__ *> &tasks = queue->tasks;
155
156    threadsProtector->acquire();
157    for (dodo::dlList<thread *>::iterator i = inactiveQueue.begin(), j = inactiveQueue.end();
158         i != j;
159         ++i) {
160        if ((*i)->self()) {
161            self = *i;
162            break;
163        }
164    }
165    threadsProtector->release();
166
167    /* task protector is aquired in a strange manner
168       to decrease lock contention
169       due to using notification mecanism */
170    tasksProtector->acquire();
171    for (;;) {
172        if (tasks.size()) {
173            work = *tasks.begin();
174            tasks.popFront();
175        } else {
176            work = NULL;
177        }
178        tasksProtector->release();
179
180        if (work) {
181            if (!active) {
182                threadsProtector->acquire();
183                inactiveQueue.remove(self);
184                activeQueue.pushBack(self);
185                threadsProtector->release();
186
187                active = true;
188            }
189
190            work->routine(work->data);
191            delete work;
192
193            tasksProtector->acquire();
194        } else {
195            if (active) {
196                threadsProtector->acquire();
197                activeQueue.remove(self);
198                inactiveQueue.pushBack(self);
199                threadsProtector->release();
200
201                active = false;
202            }
203
204            tasksProtector->acquire();
205            if (queue->closing) {
206                tasksProtector->release();
207
208                return 0;
209            }
210            tasksProtector->release();
211
212            unsigned long delta = queue->maxThreads - queue->minThreads;
213            if (delta > 0) {
214                unsigned long k;
215                threadsProtector->acquire();
216                k = activeQueue.size() + inactiveQueue.size();
217                threadsProtector->release();
218                k = ((k - queue->minThreads)*100)/delta;
219                timeout = (queue->minIdleTime*(100 + k))*10000;
220            } else {
221                timeout = 0;
222            }
223
224          sleep:
225            tasksProtector->acquire();
226            if (!queue->notification->wait(timeout)) {
227                unsigned long queueSize = tasks.size();
228
229                if (queue->closing) {
230                    tasksProtector->release();
231
232                    return 0;
233                }
234                tasksProtector->release();
235
236                if (queueSize == 0) {
237                    threadsProtector->acquire();
238                    if (inactiveQueue.size() + activeQueue.size() > queue->minThreads) {
239                        inactiveQueue.remove(self);
240                        threadsProtector->release();
241
242                        return 0;
243                    }
244                    threadsProtector->release();
245
246                    goto sleep;
247                }
248
249                tasksProtector->acquire();
250            }
251        }
252    }
253
254    return 0;
255}
256
257//-------------------------------------------------------------------
258
259void
260workqueue::add(routine routine,
261        void *data)
262{
263    struct __work__ *work = new __work__(routine, data);
264    unsigned long activeSize, inactiveSize;
265
266    tasksProtector->acquire();
267    tasks.pushBack(work);
268    tasksProtector->release();
269
270    threadsProtector->acquire();
271    inactiveSize = inactive.size();
272    activeSize = active.size();
273    threadsProtector->release();
274
275    if (inactiveSize) {
276        notification->notify();
277
278        return;
279    }
280    if (inactiveSize + activeSize == maxThreads)
281        return;
282
283    thread *t = new thread((execution::routine)workqueue::worker, this, ON_DESTRUCTION_STOP);
284
285    threadsProtector->acquire();
286    inactive.pushBack(t);
287    threadsProtector->release();
288
289    t->run();
290}
291
292//-------------------------------------------------------------------
Note: See TracBrowser for help on using the repository browser.