My Project
Loading...
Searching...
No Matches
shared.cc
Go to the documentation of this file.
1#include "threadconf.h"
2#include <iostream>
3#include "kernel/mod2.h"
4#include "Singular/feOpt.h"
6#include <cstring>
7#include <string>
8#include <errno.h>
9#include <stdio.h>
10#include <vector>
11#include <map>
12#include <iterator>
13#include <queue>
14#include <assert.h>
15#include "thread.h"
16#include "lintree.h"
17
18#include "singthreads.h"
19
20using namespace std;
21
22#ifdef ENABLE_THREADS
23extern char *global_argv0;
24#endif
25
27
28namespace LibThread {
29
30#ifdef ENABLE_THREADS
31const int have_threads = 1;
32#else
33const int have_threads = 0;
34#endif
35
36class Command {
37private:
38 const char *name;
39 const char *error;
42 int argc;
43public:
44 Command(const char *n, leftv r, leftv a)
45 {
46 name = n;
47 result = r;
48 error = NULL;
49 argc = 0;
50 for (leftv t = a; t != NULL; t = t->next) {
51 argc++;
52 }
53 args = (leftv *) omAlloc0(sizeof(leftv) * argc);
54 int i = 0;
55 for (leftv t = a; t != NULL; t = t->next) {
56 args[i++] = t;
57 }
58 result->rtyp = NONE;
59 result->data = NULL;
60 }
62 omFree(args);
63 }
64 void check_argc(int n) {
65 if (error) return;
66 if (argc != n) error = "wrong number of arguments";
67 }
68 void check_argc(int lo, int hi) {
69 if (error) return;
70 if (argc < lo || argc > hi) error = "wrong number of arguments";
71 }
72 void check_argc_min(int n) {
73 if (error) return;
74 if (argc < n) error = "wrong number of arguments";
75 }
76 void check_arg(int i, int type, const char *err) {
77 if (error) return;
78 if (args[i]->Typ() != type) error = err;
79 }
80 void check_init(int i, const char *err) {
81 if (error) return;
82 leftv arg = args[i];
83 if (arg->Data() == NULL || *(void **)(arg->Data()) == NULL)
84 error = err;
85 }
86 void check_arg(int i, int type, int type2, const char *err) {
87 if (error) return;
88 if (args[i]->Typ() != type && args[i]->Typ() != type2) error = err;
89 }
90 int argtype(int i) {
91 return args[i]->Typ();
92 }
93 int nargs() {
94 return argc;
95 }
96 void *arg(int i) {
97 return args[i]->Data();
98 }
99 template <typename T>
100 T *shared_arg(int i) {
101 return *(T **)(arg(i));
102 }
103 long int_arg(int i) {
104 return (long)(args[i]->Data());
105 }
106 void report(const char *err) {
107 error = err;
108 }
109 // intentionally not bool, so we can also do
110 // q = p + test_arg(p, type);
111 int test_arg(int i, int type) {
112 if (i >= argc) return 0;
113 return args[i]->Typ() == type;
114 }
115 void set_result(long n) {
116 result->rtyp = INT_CMD;
117 result->data = (char *)n;
118 }
119 void set_result(const char *s) {
120 result->rtyp = STRING_CMD;
121 result->data = omStrDup(s);
122 }
123 void set_result(int type, void *p) {
124 result->rtyp = type;
125 result->data = (char *) p;
126 }
127 void set_result(int type, long n) {
128 result->rtyp = type;
129 result->data = (char *) n;
130 }
131 void no_result() {
132 result->rtyp = NONE;
133 }
134 bool ok() {
135 return error == NULL;
136 }
138 if (error) {
139 Werror("%s: %s", name, error);
140 }
141 return error != NULL;
142 }
143 BOOLEAN abort(const char *err) {
144 report(err);
145 return status();
146 }
147};
148
150private:
153 int type;
154 std::string name;
155public:
157 virtual ~SharedObject() { }
158 void set_type(int type_init) { type = type_init; }
159 int get_type() { return type; }
160 void set_name(std::string &name_init) { name = name_init; }
161 void set_name(const char *s) {
162 name = std::string(s);
163 }
164 std::string &get_name() { return name; }
165 void incref(int by = 1) {
166 lock.lock();
167 refcount += 1;
168 lock.unlock();
169 }
170 long decref() {
171 int result;
172 lock.lock();
173 result = --refcount;
174 lock.unlock();
175 return result;
176 }
177 long getref() {
178 return refcount;
179 }
180 virtual BOOLEAN op2(int op, leftv res, leftv a1, leftv a2) {
181 return TRUE;
182 }
183 virtual BOOLEAN op3(int op, leftv res, leftv a1, leftv a2, leftv a3) {
184 return TRUE;
185 }
186};
187
189 obj->incref();
190}
191
193 if (obj->decref() == 0) {
194 // delete obj;
195 }
196}
197
198typedef std::map<std::string, SharedObject *> SharedObjectTable;
199
200class Region : public SharedObject {
201private:
203public:
206 virtual ~Region() { }
207 Lock *get_lock() { return &region_lock; }
208 void lock() {
209 if (!region_lock.is_locked())
210 region_lock.lock();
211 }
212 void unlock() {
213 if (region_lock.is_locked())
214 region_lock.unlock();
215 }
216 int is_locked() {
217 return region_lock.is_locked();
218 }
219};
220
227
240
243
245 Lock *lock, int type, string &name, SharedConstructor scons)
246{
247 int was_locked = lock->is_locked();
249 if (!was_locked)
250 lock->lock();
251 if (table.count(name)) {
252 result = table[name];
253 if (result->get_type() != type)
254 result = NULL;
255 } else {
256 result = scons();
257 result->set_type(type);
258 result->set_name(name);
259 table.insert(pair<string,SharedObject *>(name, result));
260 }
261 if (!was_locked)
262 lock->unlock();
263 return result;
264}
265
267 Lock *lock, string &name)
268{
269 int was_locked = lock->is_locked();
271 if (!was_locked)
272 lock->lock();
273 if (table.count(name)) {
274 result = table[name];
275 }
276 if (!was_locked)
277 lock->unlock();
278 return result;
279}
280
282private:
285protected:
286 int tx_begin() {
287 if (!region)
288 lock->lock();
289 else {
290 if (!lock->is_locked()) {
291 return 0;
292 }
293 }
294 return 1;
295 }
296 void tx_end() {
297 if (!region)
298 lock->unlock();
299 }
300public:
304 void set_region(Region *region_init) {
305 region = region_init;
306 if (region_init) {
307 lock = region_init->get_lock();
308 } else {
309 lock = new Lock();
310 }
311 }
312 virtual ~Transactional() { if (!region && lock) delete lock; }
313};
314
315class TxTable: public Transactional {
316private:
317 std::map<string, string> entries;
318public:
320 virtual ~TxTable() { }
321 int put(string &key, string &value) {
322 int result = 0;
323 if (!tx_begin()) return -1;
324 if (entries.count(key)) {
325 entries[key] = value;
326 } else {
327 entries.insert(pair<string, string>(key, value));
328 result = 1;
329 }
330 tx_end();
331 return result;
332 }
333 int get(string &key, string &value) {
334 int result = 0;
335 if (!tx_begin()) return -1;
336 if (entries.count(key)) {
337 value = entries[key];
338 result = 1;
339 }
340 tx_end();
341 return result;
342 }
343 int check(string &key) {
344 int result;
345 if (!tx_begin()) return -1;
346 result = entries.count(key);
347 tx_end();
348 return result;
349 }
350};
351
352class TxList: public Transactional {
353private:
354 vector<string> entries;
355public:
357 virtual ~TxList() { }
358 int put(size_t index, string &value) {
359 int result = -1;
360 if (!tx_begin()) return -1;
361 if (index >= 1 && index <= entries.size()) {
362 entries[index-1] = value;
363 result = 1;
364 } else {
365 entries.resize(index+1);
366 entries[index-1] = value;
367 result = 0;
368 }
369 tx_end();
370 return result;
371 }
372 int get(size_t index, string &value) {
373 int result = 0;
374 if (!tx_begin()) return -1;
375 if (index >= 1 && index <= entries.size()) {
376 result = (entries[index-1].size() != 0);
377 if (result)
378 value = entries[index-1];
379 }
380 tx_end();
381 return result;
382 }
383 long size() {
384 long result;
385 if (!tx_begin()) return -1;
386 result = (long) entries.size();
387 tx_end();
388 return result;
389 }
390};
391
393private:
394 queue<string> q;
397public:
399 virtual ~SingularChannel() { }
400 void send(string item) {
401 lock.lock();
402 q.push(item);
403 cond.signal();
404 lock.unlock();
405 }
406 string receive() {
407 lock.lock();
408 while (q.empty()) {
409 cond.wait();
410 }
411 string result = q.front();
412 q.pop();
413 if (!q.empty())
414 cond.signal();
415 lock.unlock();
416 return result;
417 }
418 long count() {
419 lock.lock();
420 long result = q.size();
421 lock.unlock();
422 return result;
423 }
424};
425
427private:
428 string value;
429 int init;
432public:
434 virtual ~SingularSyncVar() { }
435 void acquire() {
436 lock.lock();
437 }
438 void release() {
439 lock.unlock();
440 }
441 void wait_init() {
442 while (!init)
443 cond.wait();
444 }
446 if (value.size() == 0) return NULL;
448 }
449 void update(leftv val) {
451 init = 1;
452 cond.broadcast();
453 }
454 int write(string item) {
455 int result = 0;
456 lock.lock();
457 if (!init) {
458 value = item;
459 init = 1;
460 cond.broadcast();
461 result = 1;
462 }
463 lock.unlock();
464 return result;
465 }
466 string read() {
467 lock.lock();
468 while (!init)
469 cond.wait();
470 string result = value;
471 lock.unlock();
472 return result;
473 }
474 int check() {
475 lock.lock();
476 int result = init;
477 lock.unlock();
478 return result;
479 }
480};
481
482void *shared_init(blackbox *b) {
483 return omAlloc0(sizeof(SharedObject *));
484}
485
487 acquireShared(obj);
488 void *result = omAlloc0(sizeof(SharedObject *));
489 *(SharedObject **)result = obj;
490 return result;
491}
492
493void shared_destroy(blackbox *b, void *d) {
494 SharedObject *obj = *(SharedObject **)d;
495 if (obj) {
497 *(SharedObject **)d = NULL;
498 }
499}
500
501void rlock_destroy(blackbox *b, void *d) {
502 SharedObject *obj = *(SharedObject **)d;
503 ((Region *) obj)->unlock();
504 if (obj) {
506 *(SharedObject **)d = NULL;
507 }
508}
509
510void *shared_copy(blackbox *b, void *d) {
511 SharedObject *obj = *(SharedObject **)d;
512 void *result = shared_init(b);
513 *(SharedObject **)result = obj;
514 if (obj)
515 acquireShared(obj);
516 return result;
517}
518
520 if (r->Typ() == l->Typ()) {
521 if (l->rtyp == IDHDL) {
522 omFree(IDDATA((idhdl)l->data));
523 IDDATA((idhdl)l->data) = (char*)shared_copy(NULL,r->Data());
524 } else {
525 leftv ll=l->LData();
526 if (ll==NULL)
527 {
528 return TRUE; // out of array bounds or similiar
529 }
530 if (ll->data) {
532 omFree(ll->data);
533 }
534 ll->data = shared_copy(NULL,r->Data());
535 }
536 } else {
537 Werror("assign %s(%d) = %s(%d)",
538 Tok2Cmdname(l->Typ()),l->Typ(),Tok2Cmdname(r->Typ()),r->Typ());
539 return TRUE;
540 }
541 return FALSE;
542}
543
545 if (r->Typ() == l->Typ()) {
546 if (l->rtyp == IDHDL) {
547 omFree(IDDATA((idhdl)l->data));
548 IDDATA((idhdl)l->data) = (char*)shared_copy(NULL,r->Data());
549 } else {
550 leftv ll=l->LData();
551 if (ll==NULL)
552 {
553 return TRUE; // out of array bounds or similiar
554 }
555 rlock_destroy(NULL, ll->data);
556 omFree(ll->data);
557 ll->data = shared_copy(NULL,r->Data());
558 }
559 } else {
560 Werror("assign %s(%d) = %s(%d)",
561 Tok2Cmdname(l->Typ()),l->Typ(),Tok2Cmdname(r->Typ()),r->Typ());
562 return TRUE;
563 }
564 return FALSE;
565}
566
567
569 int lt = l->Typ();
570 int rt = r->Typ();
571 if (lt != DEF_CMD && lt != rt) {
572 const char *rn=Tok2Cmdname(rt);
573 const char *ln=Tok2Cmdname(lt);
574 Werror("cannot assign %s (%d) to %s (%d)\n", rn, rt, ln, lt);
575 return TRUE;
576 }
577 return FALSE;
578}
579
581 SharedObject *obj = *(SharedObject **)a1->Data();
582 return obj->op2(op, res, a1, a2);
583}
584
586 SharedObject *obj = *(SharedObject **)a1->Data();
587 return obj->op3(op, res, a1, a2, a3);
588}
589
590char *shared_string(blackbox *b, void *d) {
591 char buf[80];
592 SharedObject *obj = *(SharedObject **)d;
593 if (!obj)
594 return omStrDup("<uninitialized shared object>");
595 int type = obj->get_type();
596 string &name = obj->get_name();
597 const char *type_name = "unknown";
598 if (type == type_channel)
599 type_name = "channel";
600 else if (type == type_atomic_table)
601 type_name = "atomic_table";
602 else if (type == type_shared_table)
603 type_name = "shared_table";
604 else if (type == type_atomic_list)
605 type_name = "atomic_list";
606 else if (type == type_shared_list)
607 type_name = "shared_list";
608 else if (type == type_syncvar)
609 type_name = "syncvar";
610 else if (type == type_region)
611 type_name = "region";
612 else if (type == type_regionlock)
613 type_name = "regionlock";
614 else if (type == type_thread) {
615 sprintf(buf, "<thread #%s>", name.c_str());
616 return omStrDup(buf);
617 }
618 else if (type == type_threadpool) {
619 if (name.size() > 0) {
620 name_lock.lock();
621 sprintf(buf, "<threadpool \"%.40s\"@%p>", name.c_str(), obj);
622 name_lock.unlock();
623 } else
624 sprintf(buf, "<threadpool @%p>", obj);
625 return omStrDup(buf);
626 }
627 else if (type == type_job) {
628 if (name.size() > 0) {
629 name_lock.lock();
630 sprintf(buf, "<job \"%.40s\"@%p>", name.c_str(), obj);
631 name_lock.unlock();
632 } else
633 sprintf(buf, "<job @%p>", obj);
634 return omStrDup(buf);
635 }
636 else if (type == type_trigger) {
637 if (name.size() > 0) {
638 name_lock.lock();
639 sprintf(buf, "<trigger \"%.40s\"@%p>", name.c_str(), obj);
640 name_lock.unlock();
641 } else
642 sprintf(buf, "<trigger @%p>", obj);
643 return omStrDup(buf);
644 } else {
645 sprintf(buf, "<unknown type %d>", type);
646 return omStrDup(buf);
647 }
648 sprintf(buf, "<%s \"%.40s\">", type_name, name.c_str());
649 return omStrDup(buf);
650}
651
652char *rlock_string(blackbox *b, void *d) {
653 char buf[80];
654 SharedObject *obj = *(SharedObject **)d;
655 if (!obj)
656 return omStrDup("<uninitialized region lock>");
657 sprintf(buf, "<region lock \"%.40s\">", obj->get_name().c_str());
658 return omStrDup(buf);
659}
660
661void report(const char *fmt, const char *name) {
662 char buf[80];
663 sprintf(buf, fmt, name);
664 WerrorS(buf);
665}
666
667int wrong_num_args(const char *name, leftv arg, int n) {
668 for (int i=1; i<=n; i++) {
669 if (!arg) {
670 report("%s: too few arguments", name);
671 return TRUE;
672 }
673 arg = arg->next;
674 }
675 if (arg) {
676 report("%s: too many arguments", name);
677 return TRUE;
678 }
679 return FALSE;
680}
681
682int not_a_uri(const char *name, leftv arg) {
683 if (arg->Typ() != STRING_CMD) {
684 report("%s: not a valid URI", name);
685 return TRUE;
686 }
687 return FALSE;
688}
689
690int not_a_region(const char *name, leftv arg) {
691 if (arg->Typ() != type_region || !arg->Data()) {
692 report("%s: not a region", name);
693 return TRUE;
694 }
695 return FALSE;
696}
697
698
699char *str(leftv arg) {
700 return (char *)(arg->Data());
701}
702
704 return new TxTable();
705}
706
708 return new TxList();
709}
710
712 return new SingularChannel();
713}
714
716 return new SingularSyncVar();
717}
718
720 return new Region();
721}
722
723static void appendArg(vector<leftv> &argv, string &s) {
724 if (s.size() == 0) return;
726 if (val->Typ() == NONE) {
727 omFreeBin(val, sleftv_bin);
728 return;
729 }
730 argv.push_back(val);
731}
732
733static void appendArg(vector<leftv> &argv, leftv arg) {
734 argv.push_back(arg);
735}
736
737static void appendArgCopy(vector<leftv> &argv, leftv arg) {
739 val->Copy(arg);
740 argv.push_back(val);
741}
742
743
745 const char *procname, const vector<leftv> &argv)
746{
747 leftv procnode = (leftv) omAlloc0Bin(sleftv_bin);
748 procnode->name = omStrDup(procname);
749 procnode->req_packhdl = basePack;
750 int error = procnode->Eval();
751 if (error) {
752 Werror("procedure \"%s\" not found", procname);
753 omFreeBin(procnode, sleftv_bin);
754 return TRUE;
755 }
756 memset(&result, 0, sizeof(result));
757 leftv *tail = &procnode->next;
758 for (unsigned i = 0; i < argv.size(); i++) {
759 *tail = argv[i];
760 tail = &(*tail)->next;
761 }
762 *tail = NULL;
763 error = iiExprArithM(&result, procnode, '(');
764 procnode->CleanUp();
765 omFreeBin(procnode, sleftv_bin);
766 if (error) {
767 Werror("procedure call of \"%s\" failed", procname);
768 return TRUE;
769 }
770 return FALSE;
771}
772
774 if (wrong_num_args("makeAtomicTable", arg, 1))
775 return TRUE;
776 if (not_a_uri("makeAtomicTable", arg))
777 return TRUE;
778 string uri = str(arg);
781 ((TxTable *) obj)->set_region(NULL);
783 result->data = new_shared(obj);
784 return FALSE;
785}
786
788 if (wrong_num_args("makeAtomicList", arg, 1))
789 return TRUE;
790 if (not_a_uri("makeAtomicList", arg))
791 return TRUE;
792 string uri = str(arg);
795 ((TxList *) obj)->set_region(NULL);
796 result->rtyp = type_atomic_list;
797 result->data = new_shared(obj);
798 return FALSE;
799}
800
802 if (wrong_num_args("makeSharedTable", arg, 2))
803 return TRUE;
804 if (not_a_region("makeSharedTable", arg))
805 return TRUE;
806 if (not_a_uri("makeSharedTable", arg->next))
807 return TRUE;
808 Region *region = *(Region **) arg->Data();
809 fflush(stdout);
810 string s = str(arg->next);
813 ((TxTable *) obj)->set_region(region);
815 result->data = new_shared(obj);
816 return FALSE;
817}
818
820 if (wrong_num_args("makeSharedList", arg, 2))
821 return TRUE;
822 if (not_a_region("makeSharedList", arg))
823 return TRUE;
824 if (not_a_uri("makeSharedList", arg->next))
825 return TRUE;
826 Region *region = *(Region **) arg->Data();
827 string s = str(arg->next);
829 region->get_lock(), type_shared_list, s, consList);
830 ((TxList *) obj)->set_region(region);
831 result->rtyp = type_shared_list;
832 result->data = new_shared(obj);
833 return FALSE;
834}
835
837 if (wrong_num_args("makeChannel", arg, 1))
838 return TRUE;
839 if (not_a_uri("makeChannel", arg))
840 return TRUE;
841 string uri = str(arg);
844 result->rtyp = type_channel;
845 result->data = new_shared(obj);
846 return FALSE;
847}
848
850 if (wrong_num_args("makeSyncVar", arg, 1))
851 return TRUE;
852 if (not_a_uri("makeSyncVar", arg))
853 return TRUE;
854 string uri = str(arg);
857 result->rtyp = type_syncvar;
858 result->data = new_shared(obj);
859 return FALSE;
860}
861
863 if (wrong_num_args("makeRegion", arg, 1))
864 return TRUE;
865 if (not_a_uri("makeRegion", arg))
866 return TRUE;
867 string uri = str(arg);
870 result->rtyp = type_region;
871 result->data = new_shared(obj);
872 return FALSE;
873}
874
876 if (wrong_num_args("findSharedObject", arg, 1))
877 return TRUE;
878 if (not_a_uri("findSharedObject", arg))
879 return TRUE;
880 string uri = str(arg);
882 &global_objects_lock, uri);
883 result->rtyp = INT_CMD;
884 result->data = (char *)(long)(obj != NULL);
885 return FALSE;
886}
887
889 if (wrong_num_args("findSharedObject", arg, 1))
890 return TRUE;
891 if (not_a_uri("findSharedObject", arg))
892 return TRUE;
893 string uri = str(arg);
895 &global_objects_lock, uri);
896 int type = obj ? obj->get_type() : -1;
897 const char *type_name = "undefined";
898 if (type == type_channel)
899 type_name = "channel";
900 else if (type == type_atomic_table)
901 type_name = "atomic_table";
902 else if (type == type_shared_table)
903 type_name = "shared_table";
904 else if (type == type_atomic_list)
905 type_name = "atomic_list";
906 else if (type == type_shared_list)
907 type_name = "shared_list";
908 else if (type == type_syncvar)
909 type_name = "syncvar";
910 else if (type == type_region)
911 type_name = "region";
912 else if (type == type_regionlock)
913 type_name = "regionlock";
914 result->rtyp = STRING_CMD;
915 result->data = (char *)(omStrDup(type_name));
916 return FALSE;
917}
918
920 if (wrong_num_args("bindSharedObject", arg, 1))
921 return TRUE;
922 if (not_a_uri("bindSharedObject", arg))
923 return TRUE;
924 string uri = str(arg);
926 &global_objects_lock, uri);
927 if (!obj) {
928 WerrorS("bindSharedObject: cannot find object");
929 return TRUE;
930 }
931 result->rtyp = obj->get_type();
932 result->data = new_shared(obj);
933 return FALSE;
934}
935
937 if (wrong_num_args("getTable", arg, 2))
938 return TRUE;
939 if (arg->Typ() != type_atomic_table && arg->Typ() != type_shared_table) {
940 WerrorS("getTable: not a valid table");
941 return TRUE;
942 }
943 if (arg->next->Typ() != STRING_CMD) {
944 WerrorS("getTable: not a valid table key");
945 return TRUE;
946 }
947 TxTable *table = *(TxTable **) arg->Data();
948 if (!table) {
949 WerrorS("getTable: table has not been initialized");
950 return TRUE;
951 }
952 string key = (char *)(arg->next->Data());
953 string value;
954 int success = table->get(key, value);
955 if (success < 0) {
956 WerrorS("getTable: region not acquired");
957 return TRUE;
958 }
959 if (success == 0) {
960 WerrorS("getTable: key not found");
961 return TRUE;
962 }
963 leftv tmp = LinTree::from_string(value);
964 result->rtyp = tmp->Typ();
965 result->data = tmp->Data();
966 return FALSE;
967}
968
970 if (wrong_num_args("inTable", arg, 2))
971 return TRUE;
972 if (arg->Typ() != type_atomic_table && arg->Typ() != type_shared_table) {
973 WerrorS("inTable: not a valid table");
974 return TRUE;
975 }
976 if (arg->next->Typ() != STRING_CMD) {
977 WerrorS("inTable: not a valid table key");
978 return TRUE;
979 }
980 TxTable *table = *(TxTable **) arg->Data();
981 if (!table) {
982 WerrorS("inTable: table has not been initialized");
983 return TRUE;
984 }
985 string key = (char *)(arg->next->Data());
986 int success = table->check(key);
987 if (success < 0) {
988 WerrorS("inTable: region not acquired");
989 return TRUE;
990 }
991 result->rtyp = INT_CMD;
992 result->data = (char *)(long)(success);
993 return FALSE;
994}
995
997 if (wrong_num_args("putTable", arg, 3))
998 return TRUE;
999 if (arg->Typ() != type_atomic_table && arg->Typ() != type_shared_table) {
1000 WerrorS("putTable: not a valid table");
1001 return TRUE;
1002 }
1003 if (arg->next->Typ() != STRING_CMD) {
1004 WerrorS("putTable: not a valid table key");
1005 return TRUE;
1006 }
1007 TxTable *table = *(TxTable **) arg->Data();
1008 if (!table) {
1009 WerrorS("putTable: table has not been initialized");
1010 return TRUE;
1011 }
1012 string key = (char *)(arg->next->Data());
1013 string value = LinTree::to_string(arg->next->next);
1014 int success = table->put(key, value);
1015 if (success < 0) {
1016 WerrorS("putTable: region not acquired");
1017 return TRUE;
1018 }
1019 result->rtyp = NONE;
1020 return FALSE;
1021}
1022
1024 if (wrong_num_args("getList", arg, 2))
1025 return TRUE;
1026 if (arg->Typ() != type_atomic_list && arg->Typ() != type_shared_list) {
1027 WerrorS("getList: not a valid list (atomic or shared)");
1028 return TRUE;
1029 }
1030 if (arg->next->Typ() != INT_CMD) {
1031 WerrorS("getList: index must be an integer");
1032 return TRUE;
1033 }
1034 TxList *list = *(TxList **) arg->Data();
1035 if (!list) {
1036 WerrorS("getList: list has not been initialized");
1037 return TRUE;
1038 }
1039 long index = (long)(arg->next->Data());
1040 string value;
1041 int success = list->get(index, value);
1042 if (success < 0) {
1043 WerrorS("getList: region not acquired");
1044 return TRUE;
1045 }
1046 if (success == 0) {
1047 WerrorS("getList: no value at position");
1048 return TRUE;
1049 }
1050 leftv tmp = LinTree::from_string(value);
1051 result->rtyp = tmp->Typ();
1052 result->data = tmp->Data();
1053 return FALSE;
1054}
1055
1057 if (wrong_num_args("putList", arg, 3))
1058 return TRUE;
1059 if (arg->Typ() != type_atomic_list && arg->Typ() != type_shared_list) {
1060 WerrorS("putList: not a valid list (shared or atomic)");
1061 return TRUE;
1062 }
1063 if (arg->next->Typ() != INT_CMD) {
1064 WerrorS("putList: index must be an integer");
1065 return TRUE;
1066 }
1067 TxList *list = *(TxList **) arg->Data();
1068 if (!list) {
1069 WerrorS("putList: list has not been initialized");
1070 return TRUE;
1071 }
1072 long index = (long)(arg->next->Data());
1073 string value = LinTree::to_string(arg->next->next);
1074 int success = list->put(index, value);
1075 if (success < 0) {
1076 WerrorS("putList: region not acquired");
1077 return TRUE;
1078 }
1079 result->rtyp = NONE;
1080 return FALSE;
1081}
1082
1084 if (wrong_num_args("lockRegion", arg, 1))
1085 return TRUE;
1086 if (not_a_region("lockRegion", arg))
1087 return TRUE;
1088 Region *region = *(Region **)arg->Data();
1089 if (region->is_locked()) {
1090 WerrorS("lockRegion: region is already locked");
1091 return TRUE;
1092 }
1093 region->lock();
1094 result->rtyp = NONE;
1095 return FALSE;
1096}
1097
1099 if (wrong_num_args("lockRegion", arg, 1))
1100 return TRUE;
1101 if (not_a_region("lockRegion", arg))
1102 return TRUE;
1103 Region *region = *(Region **)arg->Data();
1104 if (region->is_locked()) {
1105 WerrorS("lockRegion: region is already locked");
1106 return TRUE;
1107 }
1108 region->lock();
1109 result->rtyp = type_regionlock;
1110 result->data = new_shared(region);
1111 return FALSE;
1112}
1113
1114
1116 if (wrong_num_args("unlockRegion", arg, 1))
1117 return TRUE;
1118 if (not_a_region("unlockRegion", arg))
1119 return TRUE;
1120 Region *region = *(Region **)arg->Data();
1121 if (!region->is_locked()) {
1122 WerrorS("unlockRegion: region is not locked");
1123 return TRUE;
1124 }
1125 region->unlock();
1126 result->rtyp = NONE;
1127 return FALSE;
1128}
1129
1131 if (wrong_num_args("sendChannel", arg, 2))
1132 return TRUE;
1133 if (arg->Typ() != type_channel) {
1134 WerrorS("sendChannel: argument is not a channel");
1135 return TRUE;
1136 }
1137 SingularChannel *channel = *(SingularChannel **)arg->Data();
1138 if (!channel) {
1139 WerrorS("sendChannel: channel has not been initialized");
1140 return TRUE;
1141 }
1142 channel->send(LinTree::to_string(arg->next));
1143 result->rtyp = NONE;
1144 return FALSE;
1145}
1146
1148 if (wrong_num_args("receiveChannel", arg, 1))
1149 return TRUE;
1150 if (arg->Typ() != type_channel) {
1151 WerrorS("receiveChannel: argument is not a channel");
1152 return TRUE;
1153 }
1154 SingularChannel *channel = *(SingularChannel **)arg->Data();
1155 if (!channel) {
1156 WerrorS("receiveChannel: channel has not been initialized");
1157 return TRUE;
1158 }
1159 string item = channel->receive();
1160 leftv val = LinTree::from_string(item);
1161 result->rtyp = val->Typ();
1162 result->data = val->Data();
1163 return FALSE;
1164}
1165
1167 if (wrong_num_args("statChannel", arg, 1))
1168 return TRUE;
1169 if (arg->Typ() != type_channel) {
1170 WerrorS("statChannel: argument is not a channel");
1171 return TRUE;
1172 }
1173 SingularChannel *channel = *(SingularChannel **)arg->Data();
1174 if (!channel) {
1175 WerrorS("receiveChannel: channel has not been initialized");
1176 return TRUE;
1177 }
1178 long n = channel->count();
1179 result->rtyp = INT_CMD;
1180 result->data = (char *)n;
1181 return FALSE;
1182}
1183
1185 if (wrong_num_args("writeSyncVar", arg, 2))
1186 return TRUE;
1187 if (arg->Typ() != type_syncvar) {
1188 WerrorS("writeSyncVar: argument is not a syncvar");
1189 return TRUE;
1190 }
1191 SingularSyncVar *syncvar = *(SingularSyncVar **)arg->Data();
1192 if (!syncvar) {
1193 WerrorS("writeSyncVar: syncvar has not been initialized");
1194 return TRUE;
1195 }
1196 if (!syncvar->write(LinTree::to_string(arg->next))) {
1197 WerrorS("writeSyncVar: variable already has a value");
1198 return TRUE;
1199 }
1200 result->rtyp = NONE;
1201 return FALSE;
1202}
1203
1205 Command cmd("updateSyncVar", result, arg);
1206 cmd.check_argc_min(2);
1207 cmd.check_arg(0, type_syncvar, "first argument must be a syncvar");
1208 cmd.check_init(0, "syncvar has not been initialized");
1209 cmd.check_arg(1, STRING_CMD, "second argument must be a string");
1210 if (cmd.ok()) {
1211 SingularSyncVar *syncvar = cmd.shared_arg<SingularSyncVar>(0);
1212 char *procname = (char *) cmd.arg(1);
1213 arg = arg->next->next;
1214 syncvar->acquire();
1215 syncvar->wait_init();
1216 vector<leftv> argv;
1217 appendArg(argv, syncvar->get());
1218 while (arg) {
1219 appendArgCopy(argv, arg);
1220 arg = arg->next;
1221 }
1222 int error = executeProc(*result, procname, argv);
1223 if (!error) {
1224 syncvar->update(result);
1225 }
1226 syncvar->release();
1227 return error;
1228 }
1229 return cmd.status();
1230}
1231
1232
1234 if (wrong_num_args("readSyncVar", arg, 1))
1235 return TRUE;
1236 if (arg->Typ() != type_syncvar) {
1237 WerrorS("readSyncVar: argument is not a syncvar");
1238 return TRUE;
1239 }
1240 SingularSyncVar *syncvar = *(SingularSyncVar **)arg->Data();
1241 if (!syncvar) {
1242 WerrorS("readSyncVar: syncvar has not been initialized");
1243 return TRUE;
1244 }
1245 string item = syncvar->read();
1246 leftv val = LinTree::from_string(item);
1247 result->rtyp = val->Typ();
1248 result->data = val->Data();
1249 return FALSE;
1250}
1251
1253 if (wrong_num_args("statSyncVar", arg, 1))
1254 return TRUE;
1255 if (arg->Typ() != type_syncvar) {
1256 WerrorS("statSyncVar: argument is not a syncvar");
1257 return TRUE;
1258 }
1259 SingularSyncVar *syncvar = *(SingularSyncVar **)arg->Data();
1260 if (!syncvar) {
1261 WerrorS("statSyncVar: syncvar has not been initialized");
1262 return TRUE;
1263 }
1264 int init = syncvar->check();
1265 result->rtyp = INT_CMD;
1266 result->data = (char *)(long) init;
1267 return FALSE;
1268}
1269
1271 SharedObject *obj = *(SharedObject **)(val->Data());
1272 acquireShared(obj);
1273 lintree.put(obj);
1274}
1275
1277 int type = lintree.get_prev<int>();
1278 SharedObject *obj = lintree.get<SharedObject *>();
1280 result->rtyp = type;
1281 result->data = (void *)new_shared(obj);
1282 return result;
1283}
1284
1285void ref_shared(LinTree::LinTree &lintree, int by) {
1286 SharedObject *obj = lintree.get<SharedObject *>();
1287 while (by > 0) {
1288 obj->incref();
1289 by--;
1290 }
1291 while (by < 0) {
1292 obj->decref();
1293 by++;
1294 }
1295}
1296
1300
1301void makeSharedType(int &type, const char *name) {
1302 if (type != 0) return;
1303 blackbox *b=(blackbox*)omAlloc0(sizeof(blackbox));
1304 b->blackbox_Init = shared_init;
1305 b->blackbox_destroy = shared_destroy;
1306 b->blackbox_Copy = shared_copy;
1307 b->blackbox_String = shared_string;
1308 b->blackbox_Assign = shared_assign;
1309 b->blackbox_CheckAssign = shared_check_assign;
1310 // b->blackbox_Op2 = shared_op2;
1311 // b->blackbox_Op3 = shared_op3;
1312 type = setBlackboxStuff(b, name);
1313 installShared(type);
1314}
1315
1316void makeRegionlockType(int &type, const char *name) {
1317 if (type != 0) return;
1318 blackbox *b=(blackbox*)omAlloc0(sizeof(blackbox));
1319 b->blackbox_Init = shared_init;
1320 b->blackbox_destroy = rlock_destroy;
1321 b->blackbox_Copy = shared_copy;
1322 b->blackbox_String = shared_string;
1323 b->blackbox_Assign = rlock_assign;
1324 b->blackbox_CheckAssign = shared_check_assign;
1325 type = setBlackboxStuff(b, name);
1326 installShared(type);
1327}
1328
1329#define MAX_THREADS 128
1330
1332public:
1336 void *(*thread_func)(ThreadState *, void *);
1337 void *arg, *result;
1338 pthread_t id;
1339 pthread_t parent;
1343 queue<string> to_thread;
1344 queue<string> from_thread;
1346 to_thread(), from_thread() {
1347 active = false;
1348 running = false;
1349 index = -1;
1350 }
1352 // We do nothing here. This is to prevent the condition
1353 // variable destructor from firing upon program exit,
1354 // which would invoke undefined behavior if the thread
1355 // is still running.
1356 }
1357};
1358
1360
1362
1363void setOption(int ch) {
1364 int index = feGetOptIndex(ch);
1365 feSetOptValue((feOptIndex) index, (int) 1);
1366}
1367
1369 master_lock.lock();
1371 master_lock.unlock();
1372#ifdef ENABLE_THREADS
1374 siInit(global_argv0);
1375#endif
1376 setOption('q');
1377 // setOption('b');
1378}
1379
1380void *thread_main(void *arg) {
1381 ThreadState *ts = (ThreadState *)arg;
1382 thread_init();
1383 return ts->thread_func(ts, ts->arg);
1384}
1385
1386void *interpreter_thread(ThreadState *ts, void *arg) {
1387 ts->lock.lock();
1388 for (;;) {
1389 bool eval = false;
1390 while (ts->to_thread.empty())
1391 ts->to_cond.wait();
1392 /* TODO */
1393 string expr = ts->to_thread.front();
1394 switch (expr[0]) {
1395 case '\0': case 'q':
1396 ts->lock.unlock();
1397 return NULL;
1398 case 'x':
1399 eval = false;
1400 break;
1401 case 'e':
1402 eval = true;
1403 break;
1404 }
1405 ts->to_thread.pop();
1406 expr = ts->to_thread.front();
1407 /* this will implicitly eval commands */
1408 leftv val = LinTree::from_string(expr);
1409 expr = LinTree::to_string(val);
1410 ts->to_thread.pop();
1411 if (eval)
1412 ts->from_thread.push(expr);
1413 ts->from_cond.signal();
1414 }
1415 ts->lock.unlock();
1416 return NULL;
1417}
1418
1420private:
1422public:
1423 InterpreterThread(ThreadState *ts_init) : SharedObject(), ts(ts_init) { }
1427 ts = NULL;
1428 }
1429};
1430
1431static ThreadState *newThread(void *(*thread_func)(ThreadState *, void *),
1432 void *arg, const char **error) {
1433 ThreadState *ts = NULL;
1434 if (error) *error = NULL;
1435 thread_lock.lock();
1436 for (int i=0; i<MAX_THREADS; i++) {
1437 if (!thread_state[i].active) {
1438 ts = thread_state + i;
1439 ts->index = i;
1440 ts->parent = pthread_self();
1441 ts->active = true;
1442 ts->running = true;
1443 ts->to_thread = queue<string>();
1444 ts->from_thread = queue<string>();
1445 ts->thread_func = thread_func;
1446 ts->arg = arg;
1447 ts->result = NULL;
1448 if (pthread_create(&ts->id, NULL, thread_main, ts)<0) {
1449 if (error)
1450 *error = "createThread: internal error: failed to create thread";
1451 goto fail;
1452 }
1453 goto exit;
1454 }
1455 }
1456 if (error) *error = "createThread: too many threads";
1457 fail:
1458 ts = NULL;
1459 exit:
1460 thread_lock.unlock();
1461 return ts;
1462}
1463
1464ThreadState *createThread(void *(*thread_func)(ThreadState *, void *),
1465 void *arg) {
1466 return newThread(thread_func, arg, NULL);
1467}
1468
1470 void *result;
1471 pthread_join(ts->id, NULL);
1472 result = ts->result;
1473 thread_lock.lock();
1474 ts->running = false;
1475 ts->active = false;
1476 thread_lock.unlock();
1477 return result;
1478}
1479
1482 if (*error) return NULL;
1483 InterpreterThread *thread = new InterpreterThread(ts);
1484 char buf[10];
1485 sprintf(buf, "%d", ts->index);
1486 string name(buf);
1487 thread->set_name(name);
1488 thread->set_type(type_thread);
1489 return thread;
1490}
1491
1493 Command cmd("createThread", result, arg);
1494 cmd.check_argc(0);
1495 const char *error;
1496 if (!have_threads)
1497 cmd.report("thread support not available");
1498 if (!cmd.ok()) return cmd.status();
1500 if (error) {
1501 return cmd.abort(error);
1502 }
1503 cmd.set_result(type_thread, new_shared(thread));
1504 return cmd.status();
1505}
1506
1508 ThreadState *ts = thread->getThreadState();
1509 if (ts && ts->parent != pthread_self()) {
1510 return false;
1511 }
1512 ts->lock.lock();
1513 string quit("q");
1514 ts->to_thread.push(quit);
1515 ts->to_cond.signal();
1516 ts->lock.unlock();
1517 pthread_join(ts->id, NULL);
1518 thread_lock.lock();
1519 ts->running = false;
1520 ts->active = false;
1521 thread->clearThreadState();
1522 thread_lock.unlock();
1523 return true;
1524}
1525
1527 if (wrong_num_args("joinThread", arg, 1))
1528 return TRUE;
1529 if (arg->Typ() != type_thread) {
1530 WerrorS("joinThread: argument is not a thread");
1531 return TRUE;
1532 }
1533 InterpreterThread *thread = *(InterpreterThread **)arg->Data();
1534 if (!joinInterpreterThread(thread)) {
1535 WerrorS("joinThread: can only be called from parent thread");
1536 return TRUE;
1537 }
1538 return FALSE;
1539}
1540
1541class ThreadPool;
1542class Trigger;
1543
1544class Job : public SharedObject {
1545public:
1547 long prio;
1548 size_t id;
1550 vector<Job *> deps;
1551 vector<Job *> notify;
1552 vector<Trigger *> triggers;
1553 vector<string> args;
1554 string result; // lintree-encoded
1555 void *data;
1556 bool fast;
1557 bool done;
1565 ~Job();
1566 void addDep(Job *job) {
1567 deps.push_back(job);
1568 }
1569 void addDep(vector<Job *> &jobs);
1570 void addDep(long ndeps, Job **jobs);
1571 void addNotify(vector<Job *> &jobs);
1572 void addNotify(Job *job);
1573 virtual bool ready();
1574 virtual void execute() = 0;
1575 void run();
1576};
1577
1579 bool operator()(const Job* lhs, const Job* rhs) {
1580 if (lhs->fast < rhs->fast) {
1581 return true;
1582 }
1583 if (lhs->prio < rhs->prio) {
1584 return true;
1585 }
1586 if (lhs->prio == rhs->prio) {
1587 return lhs->id > rhs->id;
1588 }
1589 return false;
1590 }
1591};
1592
1593class Trigger : public Job {
1594public:
1595 virtual bool accept(leftv arg) = 0;
1596 virtual void activate(leftv arg) = 0;
1598};
1599
1601 vector<Job *>::iterator it;
1602 for (it = deps.begin(); it != deps.end(); it++) {
1603 if (!(*it)->done) return false;
1604 }
1605 return true;
1606}
1607
1609 vector<Job *>::iterator it;
1610 for (it = deps.begin(); it != deps.end(); it++) {
1611 releaseShared(*it);
1612 }
1613}
1614
1615typedef queue<Job *> JobQueue;
1616
1617class Scheduler;
1618
1624
1627
1628class ThreadPool : public SharedObject {
1629public:
1632 ThreadPool(Scheduler *sched, int n);
1633 ThreadPool(int n);
1634 ~ThreadPool();
1635 ThreadState *getThread(int i);
1636 void shutdown(bool wait);
1637 void addThread(ThreadState *thread);
1638 void attachJob(Job *job);
1639 void detachJob(Job *job);
1640 void queueJob(Job *job);
1641 void broadcastJob(Job *job);
1642 void cancelDeps(Job * job);
1643 void cancelJob(Job *job);
1644 void waitJob(Job *job);
1645 void clearThreadState();
1646};
1647
1648
1649class Scheduler : public SharedObject {
1650private:
1652 size_t jobid;
1658 vector<ThreadState *> threads;
1659 vector<ThreadPool *> thread_owners;
1660 priority_queue<Job *, vector<Job *>, JobCompare> global_queue;
1661 vector<JobQueue *> thread_queues;
1662 vector<Job *> pending;
1665 friend class Job;
1666public:
1668 Scheduler(int n) :
1670 single_threaded(n==0), nthreads(n == 0 ? 1 : n),
1671 lock(true), cond(&lock), response(&lock),
1673 maxconcurrency(n), running(0)
1674 {
1675 thread_queues.push_back(new JobQueue());
1676 }
1678 maxconcurrency = n;
1679 }
1681 return maxconcurrency;
1682 }
1684 int n;
1685 for (unsigned i = 0; i <thread_owners.size(); i++) {
1686 if (thread_owners[i] == pool)
1687 n++;
1688 }
1689 return n;
1690 }
1691 virtual ~Scheduler() {
1692 for (unsigned i = 0; i < thread_queues.size(); i++) {
1693 JobQueue *q = thread_queues[i];
1694 while (!q->empty()) {
1695 Job *job = q->front();
1696 q->pop();
1697 releaseShared(job);
1698 }
1699 }
1700 thread_queues.clear();
1701 threads.clear();
1702 }
1703 ThreadState *getThread(int i) { return threads[i]; }
1704 void shutdown(bool wait) {
1705 if (single_threaded) {
1706 SchedInfo *info = new SchedInfo();
1707 info->num = 0;
1708 info->scheduler = this;
1709 acquireShared(this);
1710 info->job = NULL;
1712 return;
1713 }
1714 lock.lock();
1715 if (wait) {
1716 while (!global_queue.empty()) {
1717 response.wait();
1718 }
1719 }
1720 shutting_down = true;
1721 while (shutdown_counter < nthreads) {
1722 cond.broadcast();
1723 response.wait();
1724 }
1725 lock.unlock();
1726 for (unsigned i = 0; i <threads.size(); i++) {
1728 }
1729 }
1730 void addThread(ThreadPool *owner, ThreadState *thread) {
1731 lock.lock();
1732 thread_owners.push_back(owner);
1733 threads.push_back(thread);
1734 thread_queues.push_back(new JobQueue());
1735 lock.unlock();
1736 }
1737 void attachJob(ThreadPool *pool, Job *job) {
1738 lock.lock();
1739 job->pool = pool;
1740 job->id = jobid++;
1741 acquireShared(job);
1742 if (job->ready()) {
1743 global_queue.push(job);
1744 cond.signal();
1745 }
1746 else if (job->pending_index < 0) {
1747 job->pool = pool;
1748 job->pending_index = pending.size();
1749 pending.push_back(job);
1750 }
1751 lock.unlock();
1752 }
1753 void detachJob(Job *job) {
1754 lock.lock();
1755 long i = job->pending_index;
1756 job->pending_index = -1;
1757 if (i >= 0) {
1758 job = pending.back();
1759 pending.resize(pending.size()-1);
1760 pending[i] = job;
1761 job->pending_index = i;
1762 }
1763 lock.unlock();
1764 }
1765 void queueJob(Job *job) {
1766 lock.lock();
1767 global_queue.push(job);
1768 cond.signal();
1769 lock.unlock();
1770 }
1771 void broadcastJob(ThreadPool *pool, Job *job) {
1772 lock.lock();
1773 for (unsigned i = 0; i <thread_queues.size(); i++) {
1774 if (thread_owners[i] == pool) {
1775 acquireShared(job);
1776 thread_queues[i]->push(job);
1777 }
1778 }
1779 lock.unlock();
1780 }
1781 void cancelDeps(Job * job) {
1782 vector<Job *> &notify = job->notify;
1783 for (unsigned i = 0; i <notify.size(); i++) {
1784 Job *next = notify[i];
1785 if (!next->cancelled) {
1786 cancelJob(next);
1787 }
1788 }
1789 }
1790 void cancelJob(Job *job) {
1791 lock.lock();
1792 if (!job->cancelled) {
1793 job->cancelled = true;
1794 if (!job->running && !job->done) {
1795 job->done = true;
1796 cancelDeps(job);
1797 }
1798 }
1799 lock.unlock();
1800 }
1801 void waitJob(Job *job) {
1802 if (single_threaded) {
1803 SchedInfo *info = new SchedInfo();
1804 info->num = 0;
1805 info->scheduler = this;
1806 acquireShared(this);
1807 info->job = job;
1809 } else {
1810 lock.lock();
1811 for (;;) {
1812 if (job->done || job->cancelled) {
1813 break;
1814 }
1815 response.wait();
1816 }
1817 response.signal(); // forward signal
1818 lock.unlock();
1819 }
1820 }
1822 threads.clear();
1823 }
1824 static void notifyDeps(Scheduler *scheduler, Job *job) {
1825 vector<Job *> &notify = job->notify;
1826 job->incref(notify.size());
1827 for (unsigned i = 0; i <notify.size(); i++) {
1828 Job *next = notify[i];
1829 if (!next->queued && next->ready() && !next->cancelled) {
1830 next->queued = true;
1831 scheduler->queueJob(next);
1832 }
1833 }
1834 vector<Trigger *> &triggers = job->triggers;
1835 leftv arg = NULL;
1836 if (triggers.size() > 0 && job->result.size() > 0)
1837 arg = LinTree::from_string(job->result);
1838 for (unsigned i = 0; i < triggers.size(); i++) {
1839 Trigger *trigger = triggers[i];
1840 if (trigger->accept(arg)) {
1841 trigger->activate(arg);
1842 if (trigger->ready())
1843 scheduler->queueJob(trigger);
1844 }
1845 }
1846 if (arg) {
1847 arg->CleanUp();
1848 omFreeBin(arg, sleftv_bin);
1849 }
1850 }
1851 static void *main(ThreadState *ts, void *arg) {
1852 SchedInfo *info = (SchedInfo *) arg;
1853 Scheduler *scheduler = info->scheduler;
1854 ThreadPool *oldThreadPool = currentThreadPoolRef;
1855 // TODO: set current thread pool
1856 // currentThreadPoolRef = pool;
1857 Lock &lock = scheduler->lock;
1858 ConditionVariable &cond = scheduler->cond;
1859 ConditionVariable &response = scheduler->response;
1860 JobQueue *my_queue = scheduler->thread_queues[info->num];
1861 if (!scheduler->single_threaded)
1862 thread_init();
1863 lock.lock();
1864 for (;;) {
1865 if (info->job && info->job->done)
1866 break;
1867 if (scheduler->shutting_down) {
1868 scheduler->shutdown_counter++;
1869 scheduler->response.signal();
1870 break;
1871 }
1872 if (!my_queue->empty()) {
1873 Job *job = my_queue->front();
1874 my_queue->pop();
1875 if (!scheduler->global_queue.empty())
1876 cond.signal();
1877 currentJobRef = job;
1878 job->run();
1880 notifyDeps(scheduler, job);
1881 releaseShared(job);
1882 scheduler->response.signal();
1883 continue;
1884 } else if (!scheduler->global_queue.empty()) {
1885 Job *job = scheduler->global_queue.top();
1886 scheduler->global_queue.pop();
1887 if (!scheduler->global_queue.empty())
1888 cond.signal();
1889 currentJobRef = job;
1890 job->run();
1892 notifyDeps(scheduler, job);
1893 releaseShared(job);
1894 scheduler->response.signal();
1895 continue;
1896 } else {
1897 if (scheduler->single_threaded) {
1898 break;
1899 }
1900 cond.wait();
1901 }
1902 }
1903 // TODO: correct current thread pool
1904 // releaseShared(currentThreadPoolRef);
1905 currentThreadPoolRef = oldThreadPool;
1906 scheduler->lock.unlock();
1907 delete info;
1908 return NULL;
1909 }
1910};
1911
1917 scheduler = sched;
1918 acquireShared(sched);
1919}
1923ThreadState *ThreadPool::getThread(int i) { return scheduler->getThread(i); }
1924void ThreadPool::shutdown(bool wait) { scheduler->shutdown(wait); }
1926 scheduler->addThread(this, thread);
1927}
1929 scheduler->attachJob(this, job);
1930}
1932 scheduler->detachJob(job);
1933}
1935 scheduler->queueJob(job);
1936}
1938 scheduler->broadcastJob(this, job);
1939}
1941 scheduler->cancelDeps(job);
1942}
1944 scheduler->cancelJob(job);
1945}
1947 scheduler->waitJob(job);
1948}
1950 scheduler->clearThreadState();
1951}
1952
1953void Job::addDep(vector<Job *> &jobs) {
1954 deps.insert(deps.end(), jobs.begin(), jobs.end());
1955}
1956
1957void Job::addDep(long ndeps, Job **jobs) {
1958 for (long i = 0; i < ndeps; i++) {
1959 deps.push_back(jobs[i]);
1960 }
1961}
1962
1963void Job::addNotify(vector<Job *> &jobs) {
1964 notify.insert(notify.end(), jobs.begin(), jobs.end());
1965 if (done) {
1966 Scheduler::notifyDeps(pool->scheduler, this);
1967 }
1968}
1969
1971 notify.push_back(job);
1972 if (done) {
1973 Scheduler::notifyDeps(pool->scheduler, this);
1974 }
1975}
1976
1977void Job::run() {
1978 if (!cancelled) {
1979 running = true;
1980 pool->scheduler->lock.unlock();
1981 pool->scheduler->running++;
1982 execute();
1983 pool->scheduler->running--;
1984 pool->scheduler->lock.lock();
1985 running = false;
1986 }
1987 done = true;
1988}
1989
1990class AccTrigger : public Trigger {
1991private:
1992 long count;
1993public:
1994 AccTrigger(long count_init): Trigger(), count(count_init) {
1995 }
1996 virtual bool ready() {
1997 if (!Trigger::ready()) return false;
1998 return args.size() >= count;
1999 }
2000 virtual bool accept(leftv arg) {
2001 return true;
2002 }
2003 virtual void activate(leftv arg) {
2004 while (arg != NULL && !ready()) {
2005 args.push_back(LinTree::to_string(arg));
2006 if (ready()) {
2007 return;
2008 }
2009 arg = arg->next;
2010 }
2011 }
2012 virtual void execute() {
2014 l->Init(args.size());
2015 for (unsigned i = 0; i < args.size(); i++) {
2017 memcpy(&l->m[i], val, sizeof(*val));
2018 omFreeBin(val, sleftv_bin);
2019 }
2020 sleftv val;
2021 memset(&val, 0, sizeof(val));
2022 val.rtyp = LIST_CMD;
2023 val.data = l;
2024 result = LinTree::to_string(&val);
2025 // val.CleanUp();
2026 }
2027};
2028
2029class CountTrigger : public Trigger {
2030private:
2031 long count;
2032public:
2033 CountTrigger(long count_init): Trigger(), count(count_init) {
2034 }
2035 virtual bool ready() {
2036 if (!Trigger::ready()) return false;
2037 return count <= 0;
2038 }
2039 virtual bool accept(leftv arg) {
2040 return arg == NULL;
2041 }
2042 virtual void activate(leftv arg) {
2043 if (!ready()) {
2044 count--;
2045 }
2046 }
2047 virtual void execute() {
2048 // do nothing
2049 }
2050};
2051
2052class SetTrigger : public Trigger {
2053private:
2054 vector<bool> set;
2055 long count;
2056public:
2057 SetTrigger(long count_init) : Trigger(), count(0),
2058 set(count_init) {
2059 }
2060 virtual bool ready() {
2061 if (!Trigger::ready()) return false;
2062 return count == set.size();
2063 }
2064 virtual bool accept(leftv arg) {
2065 return arg->Typ() == INT_CMD;
2066 }
2067 virtual void activate(leftv arg) {
2068 if (!ready()) {
2069 long value = (long) arg->Data();
2070 if (value < 0 || value >= count) return;
2071 if (set[value]) return;
2072 set[value] = true;
2073 count++;
2074 }
2075 }
2076 virtual void execute() {
2077 // do nothing
2078 }
2079};
2080
2081
2082class ProcTrigger : public Trigger {
2083private:
2084 string procname;
2086public:
2087 ProcTrigger(const char *p) : Trigger(), procname(p), success(false) {
2088 }
2089 virtual bool ready() {
2090 if (!Trigger::ready()) return false;
2091 return success;
2092 }
2093 virtual bool accept(leftv arg) {
2094 return TRUE;
2095 }
2096 virtual void activate(leftv arg) {
2097 if (!ready()) {
2098 pool->scheduler->lock.unlock();
2099 vector<leftv> argv;
2100 for (unsigned i = 0; i < args.size(); i++) {
2101 appendArg(argv, args[i]);
2102 }
2103 int error = false;
2104 while (arg) {
2105 appendArgCopy(argv, arg);
2106 arg = arg->next;
2107 }
2108 sleftv val;
2109 if (!error)
2110 error = executeProc(val, procname.c_str(), argv);
2111 if (!error) {
2112 if (val.Typ() == NONE || (val.Typ() == INT_CMD &&
2113 (long) val.Data()))
2114 {
2115 success = true;
2116 }
2117 val.CleanUp();
2118 }
2119 pool->scheduler->lock.lock();
2120 }
2121 }
2122 virtual void execute() {
2123 // do nothing
2124 }
2125};
2126
2128 long n;
2129 Command cmd("createThreadPool", result, arg);
2130 cmd.check_argc(1, 2);
2131 cmd.check_arg(0, INT_CMD, "first argument must be an integer");
2132 if (cmd.ok()) {
2133 n = (long) cmd.arg(0);
2134 if (n < 0) cmd.report("number of threads must be non-negative");
2135 else if (n >= 256) cmd.report("number of threads too large");
2136 if (!have_threads && n != 0)
2137 cmd.report("in single-threaded mode, number of threads must be zero");
2138 }
2139 if (cmd.ok()) {
2140 ThreadPool *pool = new ThreadPool((int) n);
2142 for (int i = 0; i <n; i++) {
2143 const char *error;
2144 SchedInfo *info = new SchedInfo();
2145 info->scheduler = pool->scheduler;
2146 acquireShared(pool->scheduler);
2147 info->job = NULL;
2148 info->num = i;
2150 if (!thread) {
2151 // TODO: clean up bad pool
2152 return cmd.abort(error);
2153 }
2154 pool->addThread(thread);
2155 }
2157 }
2158 return cmd.status();
2159}
2160
2162 Command cmd("createThreadPoolSet", result, arg);
2163 cmd.check_argc(2);
2164 cmd.check_arg(0, INT_CMD, "first argument must be an integer");
2165 cmd.check_arg(1, LIST_CMD, "second argument must be a list of integers");
2166 lists l;
2167 int n;
2168 if (cmd.ok()) {
2169 l = (lists) (cmd.arg(1));
2170 n = lSize(l)+1;
2171 if (n == 0)
2172 return cmd.abort("second argument must not be empty");
2173 for (int i = 0; i < n; i++) {
2174 if (l->m[i].Typ() != INT_CMD)
2175 return cmd.abort("second argument must be a list of integers");
2176 }
2177 }
2178 lists pools = (lists) omAlloc0Bin(slists_bin);
2179 pools->Init(n);
2180 if (cmd.ok()) {
2181 long s = 0;
2182 for (int i = 0; i < n; i++) {
2183 s += (long) (l->m[i].Data());
2184 }
2185 Scheduler *sched = new Scheduler((int)s);
2186 sched->set_maxconcurrency(cmd.int_arg(0));
2187 for (int i = 0; i < n; i++) {
2188 long m = (long) (l->m[i].Data());
2189 ThreadPool *pool = new ThreadPool(sched, (int) m);
2191 for (int j = 0; j < m; j++) {
2192 const char *error;
2193 SchedInfo *info = new SchedInfo();
2194 info->scheduler = pool->scheduler;
2195 acquireShared(pool->scheduler);
2196 info->job = NULL;
2197 info->num = i;
2199 if (!thread) {
2200 // TODO: clean up bad pool
2201 return cmd.abort(error);
2202 }
2203 pool->addThread(thread);
2204 }
2205 pools->m[i].rtyp = type_threadpool;
2206 pools->m[i].data = new_shared(pool);
2207 }
2208 cmd.set_result(LIST_CMD, pools);
2209 }
2210 return cmd.status();
2211}
2212
2213ThreadPool *createThreadPool(int nthreads, int prioThreads = 0) {
2214 ThreadPool *pool = new ThreadPool((int) nthreads);
2216 for (int i = 0; i <nthreads; i++) {
2217 const char *error;
2218 SchedInfo *info = new SchedInfo();
2219 info->scheduler = pool->scheduler;
2220 acquireShared(pool);
2221 info->job = NULL;
2222 info->num = i;
2224 if (!thread) {
2225 return NULL;
2226 }
2227 pool->addThread(thread);
2228 }
2229 return pool;
2230}
2231
2232void release(ThreadPool *pool) {
2233 releaseShared(pool);
2234}
2235
2236void retain(ThreadPool *pool) {
2237 acquireShared(pool);
2238}
2239
2243
2245 Command cmd("getThreadPoolWorkers", result, arg);
2246 cmd.check_argc(1);
2247 cmd.check_arg(0, type_threadpool, "argument must be a threadpool");
2248 cmd.check_init(0, "threadpool not initialized");
2249 int r = 0;
2250 if (cmd.ok()) {
2251 ThreadPool *pool = cmd.shared_arg<ThreadPool>(0);
2252 Scheduler *sched = pool->scheduler;
2253 sched->lock.lock();
2254 r = sched->threadpool_size(pool);
2255 sched->lock.unlock();
2256 cmd.set_result(INT_CMD, r);
2257 }
2258 return cmd.status();
2259}
2260
2262 Command cmd("setThreadPoolWorkers", result, arg);
2263 cmd.check_argc(2);
2264 cmd.check_arg(0, type_threadpool, "first argument must be a threadpool");
2265 cmd.check_arg(1, INT_CMD, "second argument must be an integer");
2266 cmd.check_init(0, "threadpool not initialized");
2267 if (cmd.ok()) {
2268 ThreadPool *pool = cmd.shared_arg<ThreadPool>(0);
2269 Scheduler *sched = pool->scheduler;
2270 // TODO: count/add threads
2271 cmd.no_result();
2272 }
2273 return cmd.status();
2274}
2275
2277 Command cmd("getThreadPoolConcurrency", result, arg);
2278 cmd.check_argc(1);
2279 cmd.check_arg(0, type_threadpool, "argument must be a threadpool");
2280 cmd.check_init(0, "threadpool not initialized");
2281 if (cmd.ok()) {
2282 ThreadPool *pool = cmd.shared_arg<ThreadPool>(0);
2283 Scheduler *sched = pool->scheduler;
2284 sched->lock.lock();
2285 cmd.set_result(INT_CMD, sched->get_maxconcurrency());
2286 sched->lock.unlock();
2287 }
2288 return cmd.status();
2289}
2290
2292 Command cmd("setThreadPoolWorkers", result, arg);
2293 cmd.check_argc(2);
2294 cmd.check_arg(0, type_threadpool, "first argument must be a threadpool");
2295 cmd.check_arg(1, INT_CMD, "second argument must be an integer");
2296 cmd.check_init(0, "threadpool not initialized");
2297 if (cmd.ok()) {
2298 ThreadPool *pool = cmd.shared_arg<ThreadPool>(0);
2299 Scheduler *sched = pool->scheduler;
2300 sched->lock.lock();
2301 sched->set_maxconcurrency(cmd.int_arg(1));
2302 sched->lock.unlock();
2303 cmd.no_result();
2304 }
2305 return cmd.status();
2306}
2307
2309 Command cmd("closeThreadPool", result, arg);
2310 cmd.check_argc(1, 2);
2311 cmd.check_arg(0, type_threadpool, "first argument must be a threadpool");
2312 cmd.check_init(0, "threadpool not initialized");
2313 if (cmd.nargs() > 1)
2314 cmd.check_arg(1, INT_CMD, "optional argument must be an integer");
2315 if (cmd.ok()) {
2316 ThreadPool *pool = *(ThreadPool **)(cmd.arg(0));
2317 bool wait = cmd.nargs() == 2 ? (cmd.int_arg(1) != 0) : 1;
2318 pool->shutdown(wait);
2319 cmd.no_result();
2320 }
2321 return cmd.status();
2322}
2323
2325 pool->shutdown(wait);
2326}
2327
2328
2330 Command cmd("currentThreadPool", result, arg);
2331 cmd.check_argc(0);
2333 if (pool) {
2335 } else {
2336 cmd.report("no current threadpool");
2337 }
2338 return cmd.status();
2339}
2340
2342 Command cmd("setCurrentThreadPool", result, arg);
2343 cmd.check_argc(1);
2344 cmd.check_init(0, "threadpool not initialized");
2345 if (cmd.ok()) {
2346 ThreadPool *pool = *(ThreadPool **)(cmd.arg(0));
2347 acquireShared(pool);
2350 currentThreadPoolRef = pool;
2351 }
2352 return cmd.status();
2353}
2354
2355class EvalJob : public Job {
2356public:
2357 EvalJob() : Job() { }
2358 virtual void execute() {
2360 result = (LinTree::to_string(val));
2361 val->CleanUp();
2362 omFreeBin(val, sleftv_bin);
2363 }
2364};
2365
2366class ExecJob : public Job {
2367public:
2368 ExecJob() : Job() { }
2369 virtual void execute() {
2371 val->CleanUp();
2372 omFreeBin(val, sleftv_bin);
2373 }
2374};
2375
2376class ProcJob : public Job {
2377 string procname;
2378public:
2379 ProcJob(const char *procname_init) : Job(),
2380 procname(procname_init) {
2381 set_name(procname_init);
2382 }
2383 virtual void execute() {
2384 vector<leftv> argv;
2385 for (unsigned i = 0; i <args.size(); i++) {
2386 appendArg(argv, args[i]);
2387 }
2388 for (unsigned i = 0; i < deps.size(); i++) {
2389 appendArg(argv, deps[i]->result);
2390 }
2391 sleftv val;
2392 int error = executeProc(val, procname.c_str(), argv);
2393 if (!error) {
2394 result = (LinTree::to_string(&val));
2395 val.CleanUp();
2396 }
2397 }
2398};
2399
2400class KernelJob : public Job {
2401private:
2402 void (*cfunc)(leftv result, leftv arg);
2403public:
2404 KernelJob(void (*func)(leftv result, leftv arg)) : cfunc(func) { }
2405 virtual void execute() {
2406 vector<leftv> argv;
2407 for (unsigned i = 0; i <args.size(); i++) {
2408 appendArg(argv, args[i]);
2409 }
2410 for (unsigned i = 0; i < deps.size(); i++) {
2411 appendArg(argv, deps[i]->result);
2412 }
2413 sleftv val;
2414 memset(&val, 0, sizeof(val));
2415 if (argv.size() > 0) {
2416 leftv *tail = &argv[0]->next;
2417 for (unsigned i = 1; i < argv.size(); i++) {
2418 *tail = argv[i];
2419 tail = &(*tail)->next;
2420 }
2421 *tail = NULL;
2422 }
2423 cfunc(&val, argv[0]);
2424 result = (LinTree::to_string(&val));
2425 val.CleanUp();
2426 }
2427};
2428
2429class RawKernelJob : public Job {
2430private:
2431 void (*cfunc)(long ndeps, Job **deps);
2432public:
2433 RawKernelJob(void (*func)(long ndeps, Job **deps)) : cfunc(func) { }
2434 virtual void execute() {
2435 long ndeps = deps.size();
2436 Job **jobs = (Job **) omAlloc0(sizeof(Job *) * ndeps);
2437 for (long i = 0; i < ndeps; i++)
2438 jobs[i] = deps[i];
2439 cfunc(ndeps, jobs);
2440 omFree(jobs);
2441 }
2442};
2443
2445 Command cmd("createJob", result, arg);
2446 cmd.check_argc_min(1);
2448 "job name must be a string or quote expression");
2449 if (cmd.ok()) {
2450 if (cmd.test_arg(0, STRING_CMD)) {
2451 ProcJob *job = new ProcJob((char *)(cmd.arg(0)));
2452 for (leftv a = arg->next; a != NULL; a = a->next) {
2453 job->args.push_back(LinTree::to_string(a));
2454 }
2455 cmd.set_result(type_job, new_shared(job));
2456 } else {
2457 cmd.check_argc(1);
2458 Job *job = new EvalJob();
2459 job->args.push_back(LinTree::to_string(arg));
2460 cmd.set_result(type_job, new_shared(job));
2461 }
2462 }
2463 return cmd.status();
2464}
2465
2466Job *createJob(void (*func)(leftv result, leftv arg)) {
2467 KernelJob *job = new KernelJob(func);
2468 return job;
2469}
2470
2471Job *createJob(void (*func)(long ndeps, Job **deps)) {
2472 RawKernelJob *job = new RawKernelJob(func);
2473 return job;
2474}
2475
2476Job *startJob(ThreadPool *pool, Job *job, leftv arg) {
2477 if (job->pool) return NULL;
2478 while (arg) {
2479 job->args.push_back(LinTree::to_string(arg));
2480 arg = arg->next;
2481 }
2482 pool->attachJob(job);
2483 return job;
2484}
2485
2487 return startJob(pool, job, NULL);
2488}
2489
2490// Job *scheduleJob(ThreadPool *pool, Job *job, long ndeps, Job **deps) {
2491// if (job->pool) return NULL;
2492// pool->scheduler->lock.lock();
2493// bool cancelled = false;
2494// job->addDep(ndeps, deps);
2495// for (long i = 0; i < ndeps; i++) {
2496// deps[i]->addNotify(job);
2497// cancelled |= deps[i]->cancelled;
2498// }
2499// if (cancelled) {
2500// job->pool = pool;
2501// pool->cancelJob(job);
2502// }
2503// else
2504// pool->attachJob(job);
2505// pool->scheduler->lock.unlock();
2506// return FIXME: missing/unclear what this is supposed to be
2507// }
2508
2509void cancelJob(Job *job) {
2510 ThreadPool *pool = job->pool;
2511 if (pool) pool->cancelJob(job);
2512}
2513
2515 return currentJobRef;
2516}
2517
2519 Command cmd("startJob", result, arg);
2520 cmd.check_argc_min(1);
2521 int has_pool = cmd.test_arg(0, type_threadpool);
2522 cmd.check_argc_min(1+has_pool);
2523 if (has_pool)
2524 cmd.check_init(0, "threadpool not initialized");
2525 int has_prio = cmd.test_arg(has_pool, INT_CMD);
2526 long prio = has_prio ? (long) cmd.arg(has_pool) : 0L;
2527 int first_arg = has_pool + has_prio;
2528 cmd.check_arg(first_arg, type_job, STRING_CMD,
2529 "job argument must be a job or string");
2530 if (cmd.ok() && cmd.argtype(first_arg) == type_job)
2531 cmd.check_init(first_arg, "job not initialized");
2532 if (!cmd.ok()) return cmd.status();
2533 ThreadPool *pool;
2534 if (has_pool)
2535 pool = cmd.shared_arg<ThreadPool>(0);
2536 else {
2538 return cmd.abort("no current threadpool defined");
2539 pool = currentThreadPoolRef;
2540 }
2541 Job *job;
2542 if (cmd.argtype(first_arg) == type_job)
2543 job = *(Job **)(cmd.arg(first_arg));
2544 else
2545 job = new ProcJob((char *)(cmd.arg(first_arg)));
2546 leftv a = arg->next;
2547 if (has_pool) a = a->next;
2548 if (has_prio) a = a->next;
2549 for (; a != NULL; a = a->next) {
2550 job->args.push_back(LinTree::to_string(a));
2551 }
2552 if (job->pool)
2553 return cmd.abort("job has already been scheduled");
2554 job->prio = prio;
2555 pool->attachJob(job);
2556 cmd.set_result(type_job, new_shared(job));
2557 return cmd.status();
2558}
2559
2561 Command cmd("waitJob", result, arg);
2562 cmd.check_argc(1);
2563 cmd.check_arg(0, type_job, "argument must be a job");
2564 cmd.check_init(0, "job not initialized");
2565 if (cmd.ok()) {
2566 Job *job = *(Job **)(cmd.arg(0));
2567 ThreadPool *pool = job->pool;
2568 if (!pool) {
2569 return cmd.abort("job has not yet been started or scheduled");
2570 }
2571 pool->waitJob(job);
2572 if (job->cancelled) {
2573 return cmd.abort("job has been cancelled");
2574 }
2575 if (job->result.size() == 0)
2576 cmd.no_result();
2577 else {
2579 cmd.set_result(res->Typ(), res->Data());
2580 }
2581 }
2582 return cmd.status();
2583}
2584
2585void waitJob(Job *job) {
2586 assert(job->pool != NULL);
2587 job->pool->waitJob(job);
2588}
2589
2591 Command cmd("cancelJob", result, arg);
2592 cmd.check_argc(1);
2593 cmd.check_arg(0, type_job, "argument must be a job");
2594 cmd.check_init(0, "job not initialized");
2595 if (cmd.ok()) {
2596 Job *job = cmd.shared_arg<Job>(0);
2597 ThreadPool *pool = job->pool;
2598 if (!pool) {
2599 return cmd.abort("job has not yet been started or scheduled");
2600 }
2601 pool->cancelJob(job);
2602 cmd.no_result();
2603 }
2604 return cmd.status();
2605}
2606
2608 Job *job;
2609 Command cmd("jobCancelled", result, arg);
2610 cmd.check_argc(0, 1);
2611 if (cmd.nargs() == 1) {
2612 cmd.check_arg(0, type_job, "argument must be a job");
2613 cmd.check_init(0, "job not initialized");
2614 job = cmd.shared_arg<Job>(0);
2615 } else {
2616 job = currentJobRef;
2617 if (!job)
2618 cmd.report("no current job");
2619 }
2620 if (cmd.ok()) {
2621 ThreadPool *pool = job->pool;
2622 if (!pool) {
2623 return cmd.abort("job has not yet been started or scheduled");
2624 }
2625 pool->scheduler->lock.lock();
2626 cmd.set_result((long) job->cancelled);
2627 pool->scheduler->lock.unlock();
2628 }
2629 return cmd.status();
2630}
2631
2633 ThreadPool *pool = job->pool;
2634 if (pool) pool->scheduler->lock.lock();
2635 bool result = job->cancelled;
2636 if (pool) pool->scheduler->lock.unlock();
2637 return result;
2638}
2639
2642}
2643
2644void setJobData(Job *job, void *data) {
2645 ThreadPool *pool = job->pool;
2646 if (pool) pool->scheduler->lock.lock();
2647 job->data = data;
2648 if (pool) pool->scheduler->lock.unlock();
2649}
2650
2651
2652void *getJobData(Job *job) {
2653 ThreadPool *pool = job->pool;
2654 if (pool) pool->scheduler->lock.lock();
2655 void *result = job->data;
2656 if (pool) pool->scheduler->lock.unlock();
2657 return result;
2658}
2659
2660void addJobArgs(Job *job, leftv arg) {
2661 ThreadPool *pool = job->pool;
2662 if (pool) pool->scheduler->lock.lock();
2663 while (arg) {
2664 job->args.push_back(LinTree::to_string(arg));
2665 arg = arg->next;
2666 }
2667 if (pool) pool->scheduler->lock.unlock();
2668}
2669
2671 ThreadPool *pool = job->pool;
2672 if (pool) pool->scheduler->lock.lock();
2674 if (pool) pool->scheduler->lock.unlock();
2675 return result;
2676}
2677
2678const char *getJobName(Job *job) {
2679 // TODO
2680 return "";
2681}
2682
2683void setJobName(Job *job, const char *name) {
2684 // TODO
2685}
2686
2688 Command cmd("createTrigger", result, arg);
2689 cmd.check_argc_min(1);
2690 int has_pool = cmd.test_arg(0, type_threadpool);
2691 ThreadPool *pool;
2692 if (has_pool) {
2693 cmd.check_init(0, "threadpool not initialized");
2694 pool = cmd.shared_arg<ThreadPool>(0);
2695 } else {
2696 pool = currentThreadPoolRef;
2697 if (!pool)
2698 return cmd.abort("no default threadpool");
2699 }
2700 cmd.check_argc(has_pool + 2);
2701 cmd.check_arg(has_pool + 0, STRING_CMD, "trigger subtype must be a string");
2702 const char *kind = (const char *)(cmd.arg(has_pool + 0));
2703 if (0 == strcmp(kind, "proc")) {
2704 cmd.check_arg(has_pool + 1, STRING_CMD, "proc trigger argument must be a string");
2705 } else {
2706 cmd.check_arg(has_pool + 1, INT_CMD, "trigger argument must be an integer");
2707 }
2708 if (cmd.ok()) {
2709 Trigger *trigger;
2710 long n = (long) (cmd.arg(has_pool + 1));
2711 if (n < 0)
2712 return cmd.abort("trigger argument must be a non-negative integer");
2713 if (0 == strcmp(kind, "acc")) {
2714 trigger = new AccTrigger(n);
2715 } else if (0 == strcmp(kind, "count")) {
2716 trigger = new CountTrigger(n);
2717 } else if (0 == strcmp(kind, "set")) {
2718 trigger = new SetTrigger(n);
2719 } else if (0 == strcmp(kind, "proc")) {
2720 trigger = new ProcTrigger((const char *) cmd.arg(has_pool + 1));
2721 } else {
2722 return cmd.abort("unknown trigger subtype");
2723 }
2724 pool->attachJob(trigger);
2725 cmd.set_result(type_trigger, new_shared(trigger));
2726 }
2727 return cmd.status();
2728}
2729
2731 Command cmd("updateTrigger", result, arg);
2732 cmd.check_argc_min(1);
2733 cmd.check_arg(0, type_trigger, "first argument must be a trigger");
2734 cmd.check_init(0, "trigger not initialized");
2735 if (cmd.ok()) {
2736 Trigger *trigger = cmd.shared_arg<Trigger>(0);
2737 trigger->pool->scheduler->lock.lock();
2738 if (!trigger->accept(arg->next))
2739 cmd.report("incompatible argument type(s) for this trigger");
2740 else {
2741 trigger->activate(arg->next);
2742 if (trigger->ready()) {
2743 trigger->run();
2744 Scheduler::notifyDeps(trigger->pool->scheduler, trigger);
2745 }
2746 }
2747 trigger->pool->scheduler->lock.unlock();
2748 }
2749 return cmd.status();
2750}
2751
2753 Command cmd("chainTrigger", result, arg);
2754 cmd.check_argc(2);
2755 cmd.check_arg(0, type_trigger, "first argument must be a trigger");
2757 "second argument must be a trigger or job");
2758 cmd.check_init(0, "trigger not initialized");
2759 cmd.check_init(1, "trigger/job not initialized");
2760 if (cmd.ok()) {
2761 Trigger *trigger = cmd.shared_arg<Trigger>(0);
2762 Job *job = cmd.shared_arg<Job>(1);
2763 if (trigger->pool != job->pool)
2764 return cmd.abort("arguments use different threadpools");
2765 ThreadPool *pool = trigger->pool;
2766 pool->scheduler->lock.lock();
2767 job->triggers.push_back(trigger);
2768 pool->scheduler->lock.unlock();
2769 }
2770 return cmd.status();
2771}
2772
2774 Command cmd("testTrigger", result, arg);
2775 cmd.check_argc(1);
2776 cmd.check_arg(0, type_trigger, "argument must be a trigger");
2777 cmd.check_init(0, "trigger not initialized");
2778 if (cmd.ok()) {
2779 Trigger *trigger = cmd.shared_arg<Trigger>(0);
2780 ThreadPool *pool = trigger->pool;
2781 pool->scheduler->lock.lock();
2782 cmd.set_result((long)trigger->ready());
2783 pool->scheduler->lock.unlock();
2784 }
2785 return cmd.status();
2786}
2787
2788
2790 vector<Job *> jobs;
2791 vector<Job *> deps;
2792 Command cmd("scheduleJob", result, arg);
2793 cmd.check_argc_min(1);
2794 int has_pool = cmd.test_arg(0, type_threadpool);
2795 if (has_pool) {
2796 cmd.check_arg(0, type_threadpool, "first argument must be a threadpool");
2797 cmd.check_init(0, "threadpool not initialized");
2798 }
2799 cmd.check_argc_min(has_pool+1);
2800 int has_prio = cmd.test_arg(has_pool, INT_CMD);
2801 ThreadPool *pool;
2802 if (has_pool)
2803 pool = cmd.shared_arg<ThreadPool>(0);
2804 else {
2806 return cmd.abort("no current threadpool defined");
2807 pool = currentThreadPoolRef;
2808 }
2809 long prio = has_prio ? (long) cmd.arg(has_pool) : 0L;
2810 int first_arg = has_pool + has_prio;
2811 if (cmd.test_arg(first_arg, type_job)) {
2812 jobs.push_back(*(Job **)(cmd.arg(first_arg)));
2813 } else if (cmd.test_arg(first_arg, STRING_CMD)) {
2814 jobs.push_back(new ProcJob((char *)(cmd.arg(first_arg))));
2815 } else if (cmd.test_arg(first_arg, LIST_CMD)) {
2816 lists l = (lists) (cmd.arg(first_arg));
2817 int n = lSize(l);
2818 for (int i = 0; i < n; i++) {
2819 if (l->m[i].Typ() != type_job)
2820 return cmd.abort("job argument must be a job, string, or list of jobs");
2821 }
2822 for (int i = 0; i < n; i++) {
2823 Job *job = *(Job **) (l->m[i].Data());
2824 if (!job)
2825 return cmd.abort("job not initialized");
2826 jobs.push_back(job);
2827 }
2828 } else {
2829 return cmd.abort("job argument must be a job, string, or list of jobs");
2830 }
2831 bool error = false;
2832 leftv a = arg->next;
2833 if (has_pool) a = a->next;
2834 if (has_prio) a = a->next;
2835 for (; !error && a; a = a->next) {
2836 if (a->Typ() == type_job || a->Typ() == type_trigger) {
2837 deps.push_back(*(Job **)(a->Data()));
2838 } else if (a->Typ() == LIST_CMD) {
2839 lists l = (lists) a->Data();
2840 int n = lSize(l);
2841 for (int i = 0; i < n; i++) {
2842 if (l->m[i].Typ() == type_job || l->m[i].Typ() == type_trigger) {
2843 deps.push_back(*(Job **)(l->m[i].Data()));
2844 } else {
2845 error = true;
2846 break;
2847 }
2848 }
2849 }
2850 }
2851 if (error) {
2852 return cmd.abort("illegal dependency");
2853 }
2854 for (unsigned i = 0; i < jobs.size(); i++) {
2855 Job *job = jobs[i];
2856 if (job->pool) {
2857 return cmd.abort("job has already been scheduled");
2858 }
2859 job->prio = prio;
2860 }
2861 for (unsigned i = 0; i < deps.size(); i++) {
2862 Job *job = deps[i];
2863 if (!job->pool) {
2864 return cmd.abort("dependency has not yet been scheduled");
2865 }
2866 if (job->pool != pool) {
2867 return cmd.abort("dependency has been scheduled on a different threadpool");
2868 }
2869 }
2870 pool->scheduler->lock.lock();
2871 bool cancelled = false;
2872 for (unsigned i = 0; i < jobs.size(); i++) {
2873 jobs[i]->addDep(deps);
2874 }
2875 for (unsigned i = 0; i < deps.size(); i++) {
2876 deps[i]->addNotify(jobs);
2877 cancelled |= deps[i]->cancelled;
2878 }
2879 for (unsigned i = 0; i < jobs.size(); i++) {
2880 if (cancelled) {
2881 jobs[i]->pool = pool;
2882 pool->cancelJob(jobs[i]);
2883 }
2884 else
2885 pool->attachJob(jobs[i]);
2886 }
2887 pool->scheduler->lock.unlock();
2888 if (jobs.size() > 0)
2889 cmd.set_result(type_job, new_shared(jobs[0]));
2890 return cmd.status();
2891}
2892
2894 Command cmd("currentJob", result, arg);
2895 cmd.check_argc(0);
2896 Job *job = currentJobRef;
2897 if (job) {
2898 cmd.set_result(type_job, new_shared(job));
2899 } else {
2900 cmd.report("no current job");
2901 }
2902 return cmd.status();
2903}
2904
2905
2907 if (wrong_num_args("threadID", arg, 0))
2908 return TRUE;
2909 result->rtyp = INT_CMD;
2910 result->data = (char *)thread_id;
2911 return FALSE;
2912}
2913
2915 if (wrong_num_args("mainThread", arg, 0))
2916 return TRUE;
2917 result->rtyp = INT_CMD;
2918 result->data = (char *)(long)(thread_id == 0L);
2919 return FALSE;
2920}
2921
2923 if (wrong_num_args("threadEval", arg, 2))
2924 return TRUE;
2925 if (arg->Typ() != type_thread) {
2926 WerrorS("threadEval: argument is not a thread");
2927 return TRUE;
2928 }
2929 InterpreterThread *thread = *(InterpreterThread **)arg->Data();
2930 string expr = LinTree::to_string(arg->next);
2931 ThreadState *ts = thread->getThreadState();
2932 if (ts && ts->parent != pthread_self()) {
2933 WerrorS("threadEval: can only be called from parent thread");
2934 return TRUE;
2935 }
2936 if (ts) ts->lock.lock();
2937 if (!ts || !ts->running || !ts->active) {
2938 WerrorS("threadEval: thread is no longer running");
2939 if (ts) ts->lock.unlock();
2940 return TRUE;
2941 }
2942 ts->to_thread.push("e");
2943 ts->to_thread.push(expr);
2944 ts->to_cond.signal();
2945 ts->lock.unlock();
2946 result->rtyp = NONE;
2947 return FALSE;
2948}
2949
2951 if (wrong_num_args("threadExec", arg, 2))
2952 return TRUE;
2953 if (arg->Typ() != type_thread) {
2954 WerrorS("threadExec: argument is not a thread");
2955 return TRUE;
2956 }
2957 InterpreterThread *thread = *(InterpreterThread **)arg->Data();
2958 string expr = LinTree::to_string(arg->next);
2959 ThreadState *ts = thread->getThreadState();
2960 if (ts && ts->parent != pthread_self()) {
2961 WerrorS("threadExec: can only be called from parent thread");
2962 return TRUE;
2963 }
2964 if (ts) ts->lock.lock();
2965 if (!ts || !ts->running || !ts->active) {
2966 WerrorS("threadExec: thread is no longer running");
2967 if (ts) ts->lock.unlock();
2968 return TRUE;
2969 }
2970 ts->to_thread.push("x");
2971 ts->to_thread.push(expr);
2972 ts->to_cond.signal();
2973 ts->lock.unlock();
2974 result->rtyp = NONE;
2975 return FALSE;
2976}
2977
2979 Command cmd("threadPoolExec", result, arg);
2980 ThreadPool *pool;
2981 cmd.check_argc(1, 2);
2982 int has_pool = cmd.nargs() == 2;
2983 if (has_pool) {
2984 cmd.check_arg(0, type_threadpool, "first argument must be a threadpool");
2985 cmd.check_init(0, "threadpool not initialized");
2986 pool = cmd.shared_arg<ThreadPool>(0);
2987 } else {
2988 pool = currentThreadPoolRef;
2989 if (!pool)
2990 return cmd.abort("no current threadpool");
2991 }
2992 if (cmd.ok()) {
2993 string expr = LinTree::to_string(has_pool ? arg->next : arg);
2994 Job* job = new ExecJob();
2995 job->args.push_back(expr);
2996 job->pool = pool;
2997 pool->broadcastJob(job);
2998 }
2999 return cmd.status();
3000}
3001
3003 if (wrong_num_args("threadResult", arg, 1))
3004 return TRUE;
3005 if (arg->Typ() != type_thread) {
3006 WerrorS("threadResult: argument is not a thread");
3007 return TRUE;
3008 }
3009 InterpreterThread *thread = *(InterpreterThread **)arg->Data();
3010 ThreadState *ts = thread->getThreadState();
3011 if (ts && ts->parent != pthread_self()) {
3012 WerrorS("threadResult: can only be called from parent thread");
3013 return TRUE;
3014 }
3015 if (ts) ts->lock.lock();
3016 if (!ts || !ts->running || !ts->active) {
3017 WerrorS("threadResult: thread is no longer running");
3018 if (ts) ts->lock.unlock();
3019 return TRUE;
3020 }
3021 while (ts->from_thread.empty()) {
3022 ts->from_cond.wait();
3023 }
3024 string expr = ts->from_thread.front();
3025 ts->from_thread.pop();
3026 ts->lock.unlock();
3027 leftv val = LinTree::from_string(expr);
3028 result->rtyp = val->Typ();
3029 result->data = val->Data();
3030 return FALSE;
3031}
3032
3034 Command cmd("setSharedName", result, arg);
3035 cmd.check_argc(2);
3036 int type = cmd.argtype(0);
3037 cmd.check_init(0, "first argument is not initialized");
3038 if (type != type_job && type != type_trigger && type != type_threadpool) {
3039 cmd.report("first argument must be a job, trigger, or threadpool");
3040 }
3041 cmd.check_arg(1, STRING_CMD, "second argument must be a string");
3042 if (cmd.ok()) {
3043 SharedObject *obj = cmd.shared_arg<SharedObject>(0);
3044 name_lock.lock();
3045 obj->set_name((char *) cmd.arg(1));
3046 name_lock.unlock();
3047 }
3048 return cmd.status();
3049}
3050
3052 Command cmd("getSharedName", result, arg);
3053 cmd.check_argc(1);
3054 int type = cmd.argtype(0);
3055 cmd.check_init(0, "first argument is not initialized");
3056 if (type != type_job && type != type_trigger && type != type_threadpool) {
3057 cmd.report("first argument must be a job, trigger, or threadpool");
3058 }
3059 if (cmd.ok()) {
3060 SharedObject *obj = cmd.shared_arg<SharedObject>(0);
3061 name_lock.lock();
3062 cmd.set_result(obj->get_name().c_str());
3063 name_lock.unlock();
3064 }
3065 return cmd.status();
3066}
3067
3068}
3069
3070using namespace LibThread;
3071
3072
3073extern "C" int SI_MOD_INIT(systhreads)(SModulFunctions *fn)
3074{
3075 const char *libname = currPack->libname;
3076 if (!libname) libname = "";
3077 master_lock.lock();
3078 if (!thread_state)
3080 makeSharedType(type_atomic_table, "atomic_table");
3081 makeSharedType(type_atomic_list, "atomic_list");
3082 makeSharedType(type_shared_table, "shared_table");
3083 makeSharedType(type_shared_list, "shared_list");
3084 makeSharedType(type_channel, "channel");
3085 makeSharedType(type_syncvar, "syncvar");
3086 makeSharedType(type_region, "region");
3087 makeSharedType(type_thread, "thread");
3088 makeSharedType(type_threadpool, "threadpool");
3089 makeSharedType(type_job, "job");
3090 makeSharedType(type_trigger, "trigger");
3091 makeRegionlockType(type_regionlock, "regionlock");
3092
3093 fn->iiAddCproc(libname, "putTable", FALSE, putTable);
3094 fn->iiAddCproc(libname, "getTable", FALSE, getTable);
3095 fn->iiAddCproc(libname, "inTable", FALSE, inTable);
3096 fn->iiAddCproc(libname, "putList", FALSE, putList);
3097 fn->iiAddCproc(libname, "getList", FALSE, getList);
3098 fn->iiAddCproc(libname, "lockRegion", FALSE, lockRegion);
3099 fn->iiAddCproc(libname, "regionLock", FALSE, regionLock);
3100 fn->iiAddCproc(libname, "unlockRegion", FALSE, unlockRegion);
3101 fn->iiAddCproc(libname, "sendChannel", FALSE, sendChannel);
3102 fn->iiAddCproc(libname, "receiveChannel", FALSE, receiveChannel);
3103 fn->iiAddCproc(libname, "statChannel", FALSE, statChannel);
3104 fn->iiAddCproc(libname, "writeSyncVar", FALSE, writeSyncVar);
3105 fn->iiAddCproc(libname, "updateSyncVar", FALSE, updateSyncVar);
3106 fn->iiAddCproc(libname, "readSyncVar", FALSE, readSyncVar);
3107 fn->iiAddCproc(libname, "statSyncVar", FALSE, statSyncVar);
3108
3109 fn->iiAddCproc(libname, "makeAtomicTable", FALSE, makeAtomicTable);
3110 fn->iiAddCproc(libname, "makeAtomicList", FALSE, makeAtomicList);
3111 fn->iiAddCproc(libname, "makeSharedTable", FALSE, makeSharedTable);
3112 fn->iiAddCproc(libname, "makeSharedList", FALSE, makeSharedList);
3113 fn->iiAddCproc(libname, "makeChannel", FALSE, makeChannel);
3114 fn->iiAddCproc(libname, "makeSyncVar", FALSE, makeSyncVar);
3115 fn->iiAddCproc(libname, "makeRegion", FALSE, makeRegion);
3116 fn->iiAddCproc(libname, "findSharedObject", FALSE, findSharedObject);
3117 fn->iiAddCproc(libname, "bindSharedObject", FALSE, bindSharedObject);
3118 fn->iiAddCproc(libname, "typeSharedObject", FALSE, typeSharedObject);
3119
3120 fn->iiAddCproc(libname, "createThread", FALSE, createThread);
3121 fn->iiAddCproc(libname, "joinThread", FALSE, joinThread);
3122 fn->iiAddCproc(libname, "createThreadPool", FALSE, createThreadPool);
3123 fn->iiAddCproc(libname, "createThreadPoolSet", FALSE, createThreadPoolSet);
3124#if 0
3125 fn->iiAddCproc(libname, "adjoinThreadPool", FALSE, adjoinThreadPool);
3126 fn->iiAddCproc(libname, "getAdjoinedThreadPools", FALSE, getAdjoinedThreadPools);
3127#endif
3128 fn->iiAddCproc(libname, "closeThreadPool", FALSE, closeThreadPool);
3129 fn->iiAddCproc(libname, "getThreadPoolWorkers", FALSE, getThreadPoolWorkers);
3130 fn->iiAddCproc(libname, "setThreadPoolWorkers", FALSE, setThreadPoolWorkers);
3131 fn->iiAddCproc(libname, "getThreadPoolConcurrency", FALSE, getThreadPoolConcurrency);
3132 fn->iiAddCproc(libname, "setThreadPoolConcurrency", FALSE, setThreadPoolConcurrency);
3133 fn->iiAddCproc(libname, "currentThreadPool", FALSE, currentThreadPool);
3134 fn->iiAddCproc(libname, "setCurrentThreadPool", FALSE, setCurrentThreadPool);
3135 fn->iiAddCproc(libname, "threadPoolExec", FALSE, threadPoolExec);
3136 fn->iiAddCproc(libname, "threadID", FALSE, threadID);
3137 fn->iiAddCproc(libname, "mainThread", FALSE, mainThread);
3138 fn->iiAddCproc(libname, "threadEval", FALSE, threadEval);
3139 fn->iiAddCproc(libname, "threadExec", FALSE, threadExec);
3140 fn->iiAddCproc(libname, "threadResult", FALSE, threadResult);
3141 fn->iiAddCproc(libname, "createJob", FALSE, createJob);
3142 fn->iiAddCproc(libname, "currentJob", FALSE, currentJob);
3143 fn->iiAddCproc(libname, "setSharedName", FALSE, setSharedName);
3144 fn->iiAddCproc(libname, "getSharedName", FALSE, getSharedName);
3145 fn->iiAddCproc(libname, "startJob", FALSE, startJob);
3146 fn->iiAddCproc(libname, "waitJob", FALSE, waitJob);
3147 fn->iiAddCproc(libname, "cancelJob", FALSE, cancelJob);
3148 fn->iiAddCproc(libname, "jobCancelled", FALSE, jobCancelled);
3149 fn->iiAddCproc(libname, "scheduleJob", FALSE, scheduleJob);
3150 fn->iiAddCproc(libname, "scheduleJobs", FALSE, scheduleJob);
3151 fn->iiAddCproc(libname, "createTrigger", FALSE, createTrigger);
3152 fn->iiAddCproc(libname, "updateTrigger", FALSE, updateTrigger);
3153 fn->iiAddCproc(libname, "testTrigger", FALSE, testTrigger);
3154 fn->iiAddCproc(libname, "chainTrigger", FALSE, chainTrigger);
3155
3156 LinTree::init();
3157 master_lock.unlock();
3158
3159 return MAX_TOK;
3160}
int BOOLEAN
Definition auxiliary.h:88
#define TRUE
Definition auxiliary.h:101
#define FALSE
Definition auxiliary.h:97
int setBlackboxStuff(blackbox *bb, const char *n)
define a new type
Definition blackbox.cc:143
int l
Definition cfEzgcd.cc:100
int m
Definition cfEzgcd.cc:128
int i
Definition cfEzgcd.cc:132
int p
Definition cfModGcd.cc:4086
return false
Definition cfModGcd.cc:85
CanonicalForm b
Definition cfModGcd.cc:4111
void signal()
Definition thread.h:97
AccTrigger(long count_init)
Definition shared.cc:1994
virtual void execute()
Definition shared.cc:2012
virtual void activate(leftv arg)
Definition shared.cc:2003
virtual bool accept(leftv arg)
Definition shared.cc:2000
virtual bool ready()
Definition shared.cc:1996
void set_result(int type, long n)
Definition shared.cc:127
int test_arg(int i, int type)
Definition shared.cc:111
void check_init(int i, const char *err)
Definition shared.cc:80
void report(const char *err)
Definition shared.cc:106
BOOLEAN abort(const char *err)
Definition shared.cc:143
long int_arg(int i)
Definition shared.cc:103
BOOLEAN status()
Definition shared.cc:137
void check_arg(int i, int type, int type2, const char *err)
Definition shared.cc:86
void check_arg(int i, int type, const char *err)
Definition shared.cc:76
const char * error
Definition shared.cc:39
void * arg(int i)
Definition shared.cc:96
void set_result(long n)
Definition shared.cc:115
const char * name
Definition shared.cc:38
void check_argc_min(int n)
Definition shared.cc:72
void set_result(int type, void *p)
Definition shared.cc:123
int argtype(int i)
Definition shared.cc:90
void set_result(const char *s)
Definition shared.cc:119
T * shared_arg(int i)
Definition shared.cc:100
void check_argc(int n)
Definition shared.cc:64
void check_argc(int lo, int hi)
Definition shared.cc:68
Command(const char *n, leftv r, leftv a)
Definition shared.cc:44
CountTrigger(long count_init)
Definition shared.cc:2033
virtual void execute()
Definition shared.cc:2047
virtual void activate(leftv arg)
Definition shared.cc:2042
virtual bool accept(leftv arg)
Definition shared.cc:2039
virtual bool ready()
Definition shared.cc:2035
virtual void execute()
Definition shared.cc:2358
virtual void execute()
Definition shared.cc:2369
ThreadState * getThreadState()
Definition shared.cc:1425
InterpreterThread(ThreadState *ts_init)
Definition shared.cc:1423
vector< string > args
Definition shared.cc:1553
vector< Job * > deps
Definition shared.cc:1550
ThreadPool * pool
Definition shared.cc:1546
string result
Definition shared.cc:1554
long pending_index
Definition shared.cc:1549
virtual bool ready()
Definition shared.cc:1600
void addDep(Job *job)
Definition shared.cc:1566
virtual void execute()=0
void addNotify(vector< Job * > &jobs)
Definition shared.cc:1963
vector< Job * > notify
Definition shared.cc:1551
vector< Trigger * > triggers
Definition shared.cc:1552
KernelJob(void(*func)(leftv result, leftv arg))
Definition shared.cc:2404
void(* cfunc)(leftv result, leftv arg)
Definition shared.cc:2402
virtual void execute()
Definition shared.cc:2405
ProcJob(const char *procname_init)
Definition shared.cc:2379
virtual void execute()
Definition shared.cc:2383
virtual bool accept(leftv arg)
Definition shared.cc:2093
virtual void execute()
Definition shared.cc:2122
ProcTrigger(const char *p)
Definition shared.cc:2087
virtual void activate(leftv arg)
Definition shared.cc:2096
virtual bool ready()
Definition shared.cc:2089
virtual void execute()
Definition shared.cc:2434
void(* cfunc)(long ndeps, Job **deps)
Definition shared.cc:2431
RawKernelJob(void(*func)(long ndeps, Job **deps))
Definition shared.cc:2433
SharedObjectTable objects
Definition shared.cc:204
Lock * get_lock()
Definition shared.cc:207
virtual ~Region()
Definition shared.cc:206
virtual ~Scheduler()
Definition shared.cc:1691
vector< ThreadPool * > thread_owners
Definition shared.cc:1659
vector< JobQueue * > thread_queues
Definition shared.cc:1661
void addThread(ThreadPool *owner, ThreadState *thread)
Definition shared.cc:1730
ConditionVariable response
Definition shared.cc:1664
void cancelDeps(Job *job)
Definition shared.cc:1781
priority_queue< Job *, vector< Job * >, JobCompare > global_queue
Definition shared.cc:1660
ConditionVariable cond
Definition shared.cc:1663
ThreadState * getThread(int i)
Definition shared.cc:1703
void set_maxconcurrency(int n)
Definition shared.cc:1677
void detachJob(Job *job)
Definition shared.cc:1753
void broadcastJob(ThreadPool *pool, Job *job)
Definition shared.cc:1771
vector< ThreadState * > threads
Definition shared.cc:1658
void cancelJob(Job *job)
Definition shared.cc:1790
void waitJob(Job *job)
Definition shared.cc:1801
friend class Job
Definition shared.cc:1665
static void notifyDeps(Scheduler *scheduler, Job *job)
Definition shared.cc:1824
void queueJob(Job *job)
Definition shared.cc:1765
static void * main(ThreadState *ts, void *arg)
Definition shared.cc:1851
void shutdown(bool wait)
Definition shared.cc:1704
void attachJob(ThreadPool *pool, Job *job)
Definition shared.cc:1737
vector< Job * > pending
Definition shared.cc:1662
int threadpool_size(ThreadPool *pool)
Definition shared.cc:1683
virtual void execute()
Definition shared.cc:2076
virtual void activate(leftv arg)
Definition shared.cc:2067
SetTrigger(long count_init)
Definition shared.cc:2057
virtual bool ready()
Definition shared.cc:2060
vector< bool > set
Definition shared.cc:2054
virtual bool accept(leftv arg)
Definition shared.cc:2064
virtual ~SharedObject()
Definition shared.cc:157
void set_name(std::string &name_init)
Definition shared.cc:160
virtual BOOLEAN op3(int op, leftv res, leftv a1, leftv a2, leftv a3)
Definition shared.cc:183
void set_name(const char *s)
Definition shared.cc:161
void incref(int by=1)
Definition shared.cc:165
virtual BOOLEAN op2(int op, leftv res, leftv a1, leftv a2)
Definition shared.cc:180
void set_type(int type_init)
Definition shared.cc:158
std::string & get_name()
Definition shared.cc:164
ConditionVariable cond
Definition shared.cc:396
queue< string > q
Definition shared.cc:394
void send(string item)
Definition shared.cc:400
int write(string item)
Definition shared.cc:454
ConditionVariable cond
Definition shared.cc:431
void update(leftv val)
Definition shared.cc:449
void attachJob(Job *job)
Definition shared.cc:1928
void waitJob(Job *job)
Definition shared.cc:1946
void queueJob(Job *job)
Definition shared.cc:1934
void broadcastJob(Job *job)
Definition shared.cc:1937
ThreadState * getThread(int i)
Definition shared.cc:1923
void cancelJob(Job *job)
Definition shared.cc:1943
Scheduler * scheduler
Definition shared.cc:1630
ThreadPool(Scheduler *sched, int n)
Definition shared.cc:1916
void detachJob(Job *job)
Definition shared.cc:1931
void cancelDeps(Job *job)
Definition shared.cc:1940
void shutdown(bool wait)
Definition shared.cc:1924
void addThread(ThreadState *thread)
Definition shared.cc:1925
void *(* thread_func)(ThreadState *, void *)
Definition shared.cc:1336
ConditionVariable to_cond
Definition shared.cc:1341
queue< string > from_thread
Definition shared.cc:1344
ConditionVariable from_cond
Definition shared.cc:1342
queue< string > to_thread
Definition shared.cc:1343
void set_region(Region *region_init)
Definition shared.cc:304
virtual void activate(leftv arg)=0
virtual bool accept(leftv arg)=0
virtual ~TxList()
Definition shared.cc:357
int put(size_t index, string &value)
Definition shared.cc:358
vector< string > entries
Definition shared.cc:354
int get(size_t index, string &value)
Definition shared.cc:372
int put(string &key, string &value)
Definition shared.cc:321
std::map< string, string > entries
Definition shared.cc:317
int get(string &key, string &value)
Definition shared.cc:333
int check(string &key)
Definition shared.cc:343
virtual ~TxTable()
Definition shared.cc:320
void put(T data)
Definition lintree.h:61
Definition thread.h:17
bool is_locked()
Definition thread.h:68
void lock()
Definition thread.h:46
void unlock()
Definition thread.h:57
Class used for (list of) interpreter objects.
Definition subexpr.h:83
int Typ()
Definition subexpr.cc:1048
const char * name
Definition subexpr.h:87
package req_packhdl
Definition subexpr.h:106
int rtyp
Definition subexpr.h:91
void * Data()
Definition subexpr.cc:1192
leftv next
Definition subexpr.h:86
int Eval()
Definition subexpr.cc:2000
void Copy(leftv e)
Definition subexpr.cc:689
void * data
Definition subexpr.h:88
void CleanUp(ring r=currRing)
Definition subexpr.cc:351
sleftv * m
Definition lists.h:46
INLINE_THIS void Init(int l=0)
return result
const CanonicalForm int s
Definition facAbsFact.cc:51
CanonicalForm res
Definition facAbsFact.cc:60
CFList & eval
int j
Definition facHensel.cc:110
void WerrorS(const char *s)
Definition feFopen.cc:24
feOptIndex
Definition feOptGen.h:15
feOptIndex feGetOptIndex(const char *name)
Definition feOpt.cc:104
const char * feSetOptValue(feOptIndex opt, char *optarg)
Definition feOpt.cc:154
const char * Tok2Cmdname(int tok)
Definition gentable.cc:137
#define STATIC_VAR
Definition globaldefs.h:7
#define VAR
Definition globaldefs.h:5
BOOLEAN iiExprArithM(leftv res, leftv a, int op)
Definition iparith.cc:9631
VAR package basePack
Definition ipid.cc:56
VAR package currPack
Definition ipid.cc:55
EXTERN_VAR omBin sleftv_bin
Definition ipid.h:145
#define IDDATA(a)
Definition ipid.h:126
STATIC_VAR jList * T
Definition janet.cc:30
ListNode * next
Definition janet.h:31
#define info
Definition libparse.cc:1256
void siInit(char *)
Definition misc_ip.cc:1361
VAR omBin slists_bin
Definition lists.cc:23
int lSize(lists L)
Definition lists.cc:25
#define error(a)
slists * lists
BOOLEAN readSyncVar(leftv result, leftv arg)
Definition shared.cc:1233
char * str(leftv arg)
Definition shared.cc:699
static BOOLEAN getThreadPoolWorkers(leftv result, leftv arg)
Definition shared.cc:2244
void retain(Job *job)
BOOLEAN getTable(leftv result, leftv arg)
Definition shared.cc:936
int type_thread
Definition shared.cc:236
static BOOLEAN getThreadPoolConcurrency(leftv result, leftv arg)
Definition shared.cc:2276
int type_region
Definition shared.cc:228
ThreadState * createThread(void *(*thread_func)(ThreadState *, void *), void *arg)
Definition shared.cc:1464
Job * getCurrentJob()
Definition shared.cc:2514
SharedObject * consSyncVar()
Definition shared.cc:715
int not_a_uri(const char *name, leftv arg)
Definition shared.cc:682
static BOOLEAN setThreadPoolConcurrency(leftv result, leftv arg)
Definition shared.cc:2291
static BOOLEAN scheduleJob(leftv result, leftv arg)
Definition shared.cc:2789
BOOLEAN makeSyncVar(leftv result, leftv arg)
Definition shared.cc:849
BOOLEAN shared_check_assign(blackbox *b, leftv l, leftv r)
Definition shared.cc:568
BOOLEAN makeSharedList(leftv result, leftv arg)
Definition shared.cc:819
BOOLEAN shared_assign(leftv l, leftv r)
Definition shared.cc:519
void * new_shared(SharedObject *obj)
Definition shared.cc:486
void ref_shared(LinTree::LinTree &lintree, int by)
Definition shared.cc:1285
BOOLEAN getList(leftv result, leftv arg)
Definition shared.cc:1023
Lock global_objects_lock
Definition shared.cc:221
int type_atomic_list
Definition shared.cc:234
static void appendArgCopy(vector< leftv > &argv, leftv arg)
Definition shared.cc:737
void * shared_init(blackbox *b)
Definition shared.cc:482
BOOLEAN statChannel(leftv result, leftv arg)
Definition shared.cc:1166
BOOLEAN threadEval(leftv result, leftv arg)
Definition shared.cc:2922
void * shared_copy(blackbox *b, void *d)
Definition shared.cc:510
SharedObject * SharedObjectPtr
Definition shared.cc:241
bool getJobCancelled()
Definition shared.cc:2640
BOOLEAN unlockRegion(leftv result, leftv arg)
Definition shared.cc:1115
BOOLEAN sendChannel(leftv result, leftv arg)
Definition shared.cc:1130
void * joinThread(ThreadState *ts)
Definition shared.cc:1469
Job * startJob(ThreadPool *pool, Job *job, leftv arg)
Definition shared.cc:2476
static ThreadState * newThread(void *(*thread_func)(ThreadState *, void *), void *arg, const char **error)
Definition shared.cc:1431
BOOLEAN setSharedName(leftv result, leftv arg)
Definition shared.cc:3033
static void appendArg(vector< leftv > &argv, string &s)
Definition shared.cc:723
SharedObject * makeSharedObject(SharedObjectTable &table, Lock *lock, int type, string &name, SharedConstructor scons)
Definition shared.cc:244
BOOLEAN writeSyncVar(leftv result, leftv arg)
Definition shared.cc:1184
std::map< std::string, SharedObject * > SharedObjectTable
Definition shared.cc:198
BOOLEAN makeAtomicTable(leftv result, leftv arg)
Definition shared.cc:773
BOOLEAN bindSharedObject(leftv result, leftv arg)
Definition shared.cc:919
void report(const char *fmt, const char *name)
Definition shared.cc:661
char * shared_string(blackbox *b, void *d)
Definition shared.cc:590
BOOLEAN lockRegion(leftv result, leftv arg)
Definition shared.cc:1083
BOOLEAN currentJob(leftv result, leftv arg)
Definition shared.cc:2893
long thread_counter
Definition shared.cc:226
int wrong_num_args(const char *name, leftv arg, int n)
Definition shared.cc:667
ThreadState * thread_state
Definition shared.cc:1361
static InterpreterThread * createInterpreterThread(const char **error)
Definition shared.cc:1480
int type_threadpool
Definition shared.cc:237
Lock name_lock(true)
int type_job
Definition shared.cc:238
static BOOLEAN testTrigger(leftv result, leftv arg)
Definition shared.cc:2773
ThreadPool * createThreadPool(int threads, int prioThreads=0)
Definition shared.cc:2213
BOOLEAN threadExec(leftv result, leftv arg)
Definition shared.cc:2950
BOOLEAN rlock_assign(leftv l, leftv r)
Definition shared.cc:544
const int have_threads
Definition shared.cc:33
void setJobData(Job *job, void *data)
Definition shared.cc:2644
BOOLEAN putTable(leftv result, leftv arg)
Definition shared.cc:996
BOOLEAN makeAtomicList(leftv result, leftv arg)
Definition shared.cc:787
ThreadPool * getCurrentThreadPool()
Definition shared.cc:2240
STATIC_VAR Job * currentJobRef
Definition shared.cc:1626
static BOOLEAN createTrigger(leftv result, leftv arg)
Definition shared.cc:2687
int not_a_region(const char *name, leftv arg)
Definition shared.cc:690
BOOLEAN currentThreadPool(leftv result, leftv arg)
Definition shared.cc:2329
void addJobArgs(Job *job, leftv arg)
Definition shared.cc:2660
void rlock_destroy(blackbox *b, void *d)
Definition shared.cc:501
int type_atomic_table
Definition shared.cc:232
void * interpreter_thread(ThreadState *ts, void *arg)
Definition shared.cc:1386
static BOOLEAN setThreadPoolWorkers(leftv result, leftv arg)
Definition shared.cc:2261
int type_syncvar
Definition shared.cc:231
static BOOLEAN updateTrigger(leftv result, leftv arg)
Definition shared.cc:2730
BOOLEAN getSharedName(leftv result, leftv arg)
Definition shared.cc:3051
SharedObjectTable global_objects
Definition shared.cc:222
SharedObject * consList()
Definition shared.cc:707
BOOLEAN receiveChannel(leftv result, leftv arg)
Definition shared.cc:1147
BOOLEAN mainThread(leftv result, leftv arg)
Definition shared.cc:2914
void releaseShared(SharedObject *obj)
Definition shared.cc:192
void * getJobData(Job *job)
Definition shared.cc:2652
void closeThreadPool(ThreadPool *pool, bool wait)
Definition shared.cc:2324
int type_shared_table
Definition shared.cc:233
SharedObject * consChannel()
Definition shared.cc:711
int type_channel
Definition shared.cc:230
BOOLEAN threadPoolExec(leftv result, leftv arg)
Definition shared.cc:2978
int type_shared_list
Definition shared.cc:235
BOOLEAN setCurrentThreadPool(leftv result, leftv arg)
Definition shared.cc:2341
BOOLEAN regionLock(leftv result, leftv arg)
Definition shared.cc:1098
BOOLEAN threadID(leftv result, leftv arg)
Definition shared.cc:2906
void makeRegionlockType(int &type, const char *name)
Definition shared.cc:1316
char * rlock_string(blackbox *b, void *d)
Definition shared.cc:652
void encode_shared(LinTree::LinTree &lintree, leftv val)
Definition shared.cc:1270
BOOLEAN makeSharedTable(leftv result, leftv arg)
Definition shared.cc:801
BOOLEAN typeSharedObject(leftv result, leftv arg)
Definition shared.cc:888
Job * createJob(void(*func)(leftv result, leftv arg))
Definition shared.cc:2466
void release(Job *job)
VAR long thread_id
Definition shared.cc:225
STATIC_VAR ThreadPool * currentThreadPoolRef
Definition shared.cc:1625
Scheduler * scheduler
Definition shared.cc:1620
BOOLEAN statSyncVar(leftv result, leftv arg)
Definition shared.cc:1252
void setOption(int ch)
Definition shared.cc:1363
queue< Job * > JobQueue
Definition shared.cc:1615
BOOLEAN shared_op2(int op, leftv res, leftv a1, leftv a2)
Definition shared.cc:580
Lock master_lock(true)
SharedObject * findSharedObject(SharedObjectTable &table, Lock *lock, string &name)
Definition shared.cc:266
SharedObjectPtr(* SharedConstructor)()
Definition shared.cc:242
int type_regionlock
Definition shared.cc:229
int type_trigger
Definition shared.cc:239
BOOLEAN makeChannel(leftv result, leftv arg)
Definition shared.cc:836
BOOLEAN shared_op3(int op, leftv res, leftv a1, leftv a2, leftv a3)
Definition shared.cc:585
void * thread_main(void *arg)
Definition shared.cc:1380
const char * getJobName()
void acquireShared(SharedObject *obj)
Definition shared.cc:188
void thread_init()
Definition shared.cc:1368
static BOOLEAN jobCancelled(leftv result, leftv arg)
Definition shared.cc:2607
SharedObject * consTable()
Definition shared.cc:703
static BOOLEAN chainTrigger(leftv result, leftv arg)
Definition shared.cc:2752
BOOLEAN makeRegion(leftv result, leftv arg)
Definition shared.cc:862
BOOLEAN putList(leftv result, leftv arg)
Definition shared.cc:1056
static bool joinInterpreterThread(InterpreterThread *thread)
Definition shared.cc:1507
void setJobName(const char *)
static BOOLEAN createThreadPoolSet(leftv result, leftv arg)
Definition shared.cc:2161
void makeSharedType(int &type, const char *name)
Definition shared.cc:1301
void shared_destroy(blackbox *b, void *d)
Definition shared.cc:493
static BOOLEAN executeProc(sleftv &result, const char *procname, const vector< leftv > &argv)
Definition shared.cc:744
SharedObject * consRegion()
Definition shared.cc:719
BOOLEAN updateSyncVar(leftv result, leftv arg)
Definition shared.cc:1204
BOOLEAN inTable(leftv result, leftv arg)
Definition shared.cc:969
leftv getJobResult(Job *job)
Definition shared.cc:2670
Lock thread_lock
Definition shared.cc:1359
void installShared(int type)
Definition shared.cc:1297
BOOLEAN threadResult(leftv result, leftv arg)
Definition shared.cc:3002
void cancelJob(Job *job)
Definition shared.cc:2509
void waitJob(Job *job)
Definition shared.cc:2585
leftv decode_shared(LinTree::LinTree &lintree)
Definition shared.cc:1276
void init()
Definition lintree.cc:864
std::string to_string(leftv val)
Definition lintree.cc:843
void install(int typ, LinTreeEncodeFunc enc, LinTreeDecodeFunc dec, LinTreeRefFunc ref)
Definition lintree.cc:51
leftv from_string(std::string &str)
Definition lintree.cc:854
#define omStrDup(s)
#define omAlloc0Bin(bin)
#define omFree(addr)
#define omAlloc0(size)
#define omFreeBin(addr, bin)
#define NULL
Definition omList.c:12
static int index(p_Length length, p_Ord ord)
void Werror(const char *fmt,...)
Definition reporter.cc:189
idrec * idhdl
Definition ring.h:22
int SI_MOD_INIT systhreads(SModulFunctions *fn)
Definition shared.cc:3073
#define MAX_THREADS
Definition shared.cc:1329
void pSingular_initialize_thread()
int status int void * buf
Definition si_signals.h:69
wait
Definition si_signals.h:61
bool operator()(const Job *lhs, const Job *rhs)
Definition shared.cc:1579
sleftv * leftv
Definition structs.h:53
#define assert(A)
Definition svd_si.h:3
int name
New type name for int.
#define IDHDL
Definition tok.h:31
@ LIST_CMD
Definition tok.h:118
@ DEF_CMD
Definition tok.h:58
@ STRING_CMD
Definition tok.h:187
@ INT_CMD
Definition tok.h:96
@ MAX_TOK
Definition tok.h:220
#define NONE
Definition tok.h:223
#define COMMAND
Definition tok.h:29