My Project
Loading...
Searching...
No Matches
LibThread::Scheduler Class Reference

Public Member Functions

 Scheduler (int n)
 
void set_maxconcurrency (int n)
 
int get_maxconcurrency ()
 
int threadpool_size (ThreadPool *pool)
 
virtual ~Scheduler ()
 
ThreadStategetThread (int i)
 
void shutdown (bool wait)
 
void addThread (ThreadPool *owner, ThreadState *thread)
 
void attachJob (ThreadPool *pool, Job *job)
 
void detachJob (Job *job)
 
void queueJob (Job *job)
 
void broadcastJob (ThreadPool *pool, Job *job)
 
void cancelDeps (Job *job)
 
void cancelJob (Job *job)
 
void waitJob (Job *job)
 
void clearThreadState ()
 
- Public Member Functions inherited from LibThread::SharedObject
 SharedObject ()
 
virtual ~SharedObject ()
 
void set_type (int type_init)
 
int get_type ()
 
void set_name (std::string &name_init)
 
void set_name (const char *s)
 
std::string & get_name ()
 
void incref (int by=1)
 
long decref ()
 
long getref ()
 
virtual BOOLEAN op2 (int op, leftv res, leftv a1, leftv a2)
 
virtual BOOLEAN op3 (int op, leftv res, leftv a1, leftv a2, leftv a3)
 

Static Public Member Functions

static void notifyDeps (Scheduler *scheduler, Job *job)
 
static void * main (ThreadState *ts, void *arg)
 

Data Fields

Lock lock
 

Private Attributes

bool single_threaded
 
size_t jobid
 
int nthreads
 
int maxconcurrency
 
int running
 
bool shutting_down
 
int shutdown_counter
 
vector< ThreadState * > threads
 
vector< ThreadPool * > thread_owners
 
priority_queue< Job *, vector< Job * >, JobCompareglobal_queue
 
vector< JobQueue * > thread_queues
 
vector< Job * > pending
 
ConditionVariable cond
 
ConditionVariable response
 

Friends

class Job
 

Detailed Description

Definition at line 1649 of file shared.cc.

Constructor & Destructor Documentation

◆ Scheduler()

LibThread::Scheduler::Scheduler ( int n)
inline

Definition at line 1668 of file shared.cc.

1668 :
1670 single_threaded(n==0), nthreads(n == 0 ? 1 : n),
1671 lock(true), cond(&lock), response(&lock),
1672 shutting_down(false), shutdown_counter(0), jobid(0),
1673 maxconcurrency(n), running(0)
1674 {
1675 thread_queues.push_back(new JobQueue());
1676 }
vector< ThreadPool * > thread_owners
Definition shared.cc:1659
vector< JobQueue * > thread_queues
Definition shared.cc:1661
ConditionVariable response
Definition shared.cc:1664
priority_queue< Job *, vector< Job * >, JobCompare > global_queue
Definition shared.cc:1660
ConditionVariable cond
Definition shared.cc:1663
vector< ThreadState * > threads
Definition shared.cc:1658
queue< Job * > JobQueue
Definition shared.cc:1615

◆ ~Scheduler()

virtual LibThread::Scheduler::~Scheduler ( )
inlinevirtual

Definition at line 1691 of file shared.cc.

1691 {
1692 for (unsigned i = 0; i < thread_queues.size(); i++) {
1693 JobQueue *q = thread_queues[i];
1694 while (!q->empty()) {
1695 Job *job = q->front();
1696 q->pop();
1697 releaseShared(job);
1698 }
1699 }
1700 thread_queues.clear();
1701 threads.clear();
1702 }
int i
Definition cfEzgcd.cc:132
friend class Job
Definition shared.cc:1665
void releaseShared(SharedObject *obj)
Definition shared.cc:192

Member Function Documentation

◆ addThread()

void LibThread::Scheduler::addThread ( ThreadPool * owner,
ThreadState * thread )
inline

Definition at line 1730 of file shared.cc.

1730 {
1731 lock.lock();
1732 thread_owners.push_back(owner);
1733 threads.push_back(thread);
1734 thread_queues.push_back(new JobQueue());
1735 lock.unlock();
1736 }

◆ attachJob()

void LibThread::Scheduler::attachJob ( ThreadPool * pool,
Job * job )
inline

Definition at line 1737 of file shared.cc.

1737 {
1738 lock.lock();
1739 job->pool = pool;
1740 job->id = jobid++;
1741 acquireShared(job);
1742 if (job->ready()) {
1743 global_queue.push(job);
1744 cond.signal();
1745 }
1746 else if (job->pending_index < 0) {
1747 job->pool = pool;
1748 job->pending_index = pending.size();
1749 pending.push_back(job);
1750 }
1751 lock.unlock();
1752 }
ThreadPool * pool
Definition shared.cc:1546
long pending_index
Definition shared.cc:1549
virtual bool ready()
Definition shared.cc:1600
vector< Job * > pending
Definition shared.cc:1662
void acquireShared(SharedObject *obj)
Definition shared.cc:188

◆ broadcastJob()

void LibThread::Scheduler::broadcastJob ( ThreadPool * pool,
Job * job )
inline

Definition at line 1771 of file shared.cc.

1771 {
1772 lock.lock();
1773 for (unsigned i = 0; i <thread_queues.size(); i++) {
1774 if (thread_owners[i] == pool) {
1775 acquireShared(job);
1776 thread_queues[i]->push(job);
1777 }
1778 }
1779 lock.unlock();
1780 }

◆ cancelDeps()

void LibThread::Scheduler::cancelDeps ( Job * job)
inline

Definition at line 1781 of file shared.cc.

1781 {
1782 vector<Job *> &notify = job->notify;
1783 for (unsigned i = 0; i <notify.size(); i++) {
1784 Job *next = notify[i];
1785 if (!next->cancelled) {
1786 cancelJob(next);
1787 }
1788 }
1789 }
vector< Job * > notify
Definition shared.cc:1551
void cancelJob(Job *job)
Definition shared.cc:1790
ListNode * next
Definition janet.h:31

◆ cancelJob()

void LibThread::Scheduler::cancelJob ( Job * job)
inline

Definition at line 1790 of file shared.cc.

1790 {
1791 lock.lock();
1792 if (!job->cancelled) {
1793 job->cancelled = true;
1794 if (!job->running && !job->done) {
1795 job->done = true;
1796 cancelDeps(job);
1797 }
1798 }
1799 lock.unlock();
1800 }
void cancelDeps(Job *job)
Definition shared.cc:1781

◆ clearThreadState()

void LibThread::Scheduler::clearThreadState ( )
inline

Definition at line 1821 of file shared.cc.

1821 {
1822 threads.clear();
1823 }

◆ detachJob()

void LibThread::Scheduler::detachJob ( Job * job)
inline

Definition at line 1753 of file shared.cc.

1753 {
1754 lock.lock();
1755 long i = job->pending_index;
1756 job->pending_index = -1;
1757 if (i >= 0) {
1758 job = pending.back();
1759 pending.resize(pending.size()-1);
1760 pending[i] = job;
1761 job->pending_index = i;
1762 }
1763 lock.unlock();
1764 }

◆ get_maxconcurrency()

int LibThread::Scheduler::get_maxconcurrency ( )
inline

Definition at line 1680 of file shared.cc.

1680 {
1681 return maxconcurrency;
1682 }

◆ getThread()

ThreadState * LibThread::Scheduler::getThread ( int i)
inline

Definition at line 1703 of file shared.cc.

1703{ return threads[i]; }

◆ main()

static void * LibThread::Scheduler::main ( ThreadState * ts,
void * arg )
inlinestatic

Definition at line 1851 of file shared.cc.

1851 {
1852 SchedInfo *info = (SchedInfo *) arg;
1853 Scheduler *scheduler = info->scheduler;
1854 ThreadPool *oldThreadPool = currentThreadPoolRef;
1855 // TODO: set current thread pool
1856 // currentThreadPoolRef = pool;
1857 Lock &lock = scheduler->lock;
1858 ConditionVariable &cond = scheduler->cond;
1859 ConditionVariable &response = scheduler->response;
1860 JobQueue *my_queue = scheduler->thread_queues[info->num];
1861 if (!scheduler->single_threaded)
1862 thread_init();
1863 lock.lock();
1864 for (;;) {
1865 if (info->job && info->job->done)
1866 break;
1867 if (scheduler->shutting_down) {
1868 scheduler->shutdown_counter++;
1869 scheduler->response.signal();
1870 break;
1871 }
1872 if (!my_queue->empty()) {
1873 Job *job = my_queue->front();
1874 my_queue->pop();
1875 if (!scheduler->global_queue.empty())
1876 cond.signal();
1877 currentJobRef = job;
1878 job->run();
1880 notifyDeps(scheduler, job);
1881 releaseShared(job);
1882 scheduler->response.signal();
1883 continue;
1884 } else if (!scheduler->global_queue.empty()) {
1885 Job *job = scheduler->global_queue.top();
1886 scheduler->global_queue.pop();
1887 if (!scheduler->global_queue.empty())
1888 cond.signal();
1889 currentJobRef = job;
1890 job->run();
1892 notifyDeps(scheduler, job);
1893 releaseShared(job);
1894 scheduler->response.signal();
1895 continue;
1896 } else {
1897 if (scheduler->single_threaded) {
1898 break;
1899 }
1900 cond.wait();
1901 }
1902 }
1903 // TODO: correct current thread pool
1904 // releaseShared(currentThreadPoolRef);
1905 currentThreadPoolRef = oldThreadPool;
1906 scheduler->lock.unlock();
1907 delete info;
1908 return NULL;
1909 }
void signal()
Definition thread.h:97
static void notifyDeps(Scheduler *scheduler, Job *job)
Definition shared.cc:1824
void unlock()
Definition thread.h:57
#define info
Definition libparse.cc:1256
STATIC_VAR Job * currentJobRef
Definition shared.cc:1626
STATIC_VAR ThreadPool * currentThreadPoolRef
Definition shared.cc:1625
void thread_init()
Definition shared.cc:1368
#define NULL
Definition omList.c:12

◆ notifyDeps()

static void LibThread::Scheduler::notifyDeps ( Scheduler * scheduler,
Job * job )
inlinestatic

Definition at line 1824 of file shared.cc.

1824 {
1825 vector<Job *> &notify = job->notify;
1826 job->incref(notify.size());
1827 for (unsigned i = 0; i <notify.size(); i++) {
1828 Job *next = notify[i];
1829 if (!next->queued && next->ready() && !next->cancelled) {
1830 next->queued = true;
1831 scheduler->queueJob(next);
1832 }
1833 }
1834 vector<Trigger *> &triggers = job->triggers;
1835 leftv arg = NULL;
1836 if (triggers.size() > 0 && job->result.size() > 0)
1837 arg = LinTree::from_string(job->result);
1838 for (unsigned i = 0; i < triggers.size(); i++) {
1839 Trigger *trigger = triggers[i];
1840 if (trigger->accept(arg)) {
1841 trigger->activate(arg);
1842 if (trigger->ready())
1843 scheduler->queueJob(trigger);
1844 }
1845 }
1846 if (arg) {
1847 arg->CleanUp();
1848 omFreeBin(arg, sleftv_bin);
1849 }
1850 }
string result
Definition shared.cc:1554
vector< Trigger * > triggers
Definition shared.cc:1552
void queueJob(Job *job)
Definition shared.cc:1765
void incref(int by=1)
Definition shared.cc:165
virtual void activate(leftv arg)=0
virtual bool accept(leftv arg)=0
void CleanUp(ring r=currRing)
Definition subexpr.cc:351
EXTERN_VAR omBin sleftv_bin
Definition ipid.h:145
leftv from_string(std::string &str)
Definition lintree.cc:854
#define omFreeBin(addr, bin)
sleftv * leftv
Definition structs.h:53

◆ queueJob()

void LibThread::Scheduler::queueJob ( Job * job)
inline

Definition at line 1765 of file shared.cc.

1765 {
1766 lock.lock();
1767 global_queue.push(job);
1768 cond.signal();
1769 lock.unlock();
1770 }

◆ set_maxconcurrency()

void LibThread::Scheduler::set_maxconcurrency ( int n)
inline

Definition at line 1677 of file shared.cc.

1677 {
1678 maxconcurrency = n;
1679 }

◆ shutdown()

void LibThread::Scheduler::shutdown ( bool wait)
inline

Definition at line 1704 of file shared.cc.

1704 {
1705 if (single_threaded) {
1706 SchedInfo *info = new SchedInfo();
1707 info->num = 0;
1708 info->scheduler = this;
1709 acquireShared(this);
1710 info->job = NULL;
1712 return;
1713 }
1714 lock.lock();
1715 if (wait) {
1716 while (!global_queue.empty()) {
1717 response.wait();
1718 }
1719 }
1720 shutting_down = true;
1721 while (shutdown_counter < nthreads) {
1722 cond.broadcast();
1723 response.wait();
1724 }
1725 lock.unlock();
1726 for (unsigned i = 0; i <threads.size(); i++) {
1728 }
1729 }
static void * main(ThreadState *ts, void *arg)
Definition shared.cc:1851
void * joinThread(ThreadState *ts)
Definition shared.cc:1469
wait
Definition si_signals.h:61

◆ threadpool_size()

int LibThread::Scheduler::threadpool_size ( ThreadPool * pool)
inline

Definition at line 1683 of file shared.cc.

1683 {
1684 int n;
1685 for (unsigned i = 0; i <thread_owners.size(); i++) {
1686 if (thread_owners[i] == pool)
1687 n++;
1688 }
1689 return n;
1690 }

◆ waitJob()

void LibThread::Scheduler::waitJob ( Job * job)
inline

Definition at line 1801 of file shared.cc.

1801 {
1802 if (single_threaded) {
1803 SchedInfo *info = new SchedInfo();
1804 info->num = 0;
1805 info->scheduler = this;
1806 acquireShared(this);
1807 info->job = job;
1809 } else {
1810 lock.lock();
1811 for (;;) {
1812 if (job->done || job->cancelled) {
1813 break;
1814 }
1815 response.wait();
1816 }
1817 response.signal(); // forward signal
1818 lock.unlock();
1819 }
1820 }

Friends And Related Symbol Documentation

◆ Job

friend class Job
friend

Definition at line 1665 of file shared.cc.

Field Documentation

◆ cond

ConditionVariable LibThread::Scheduler::cond
private

Definition at line 1663 of file shared.cc.

◆ global_queue

priority_queue<Job *, vector<Job *>, JobCompare> LibThread::Scheduler::global_queue
private

Definition at line 1660 of file shared.cc.

◆ jobid

size_t LibThread::Scheduler::jobid
private

Definition at line 1652 of file shared.cc.

◆ lock

Lock LibThread::Scheduler::lock

Definition at line 1667 of file shared.cc.

◆ maxconcurrency

int LibThread::Scheduler::maxconcurrency
private

Definition at line 1654 of file shared.cc.

◆ nthreads

int LibThread::Scheduler::nthreads
private

Definition at line 1653 of file shared.cc.

◆ pending

vector<Job *> LibThread::Scheduler::pending
private

Definition at line 1662 of file shared.cc.

◆ response

ConditionVariable LibThread::Scheduler::response
private

Definition at line 1664 of file shared.cc.

◆ running

int LibThread::Scheduler::running
private

Definition at line 1655 of file shared.cc.

◆ shutdown_counter

int LibThread::Scheduler::shutdown_counter
private

Definition at line 1657 of file shared.cc.

◆ shutting_down

bool LibThread::Scheduler::shutting_down
private

Definition at line 1656 of file shared.cc.

◆ single_threaded

bool LibThread::Scheduler::single_threaded
private

Definition at line 1651 of file shared.cc.

◆ thread_owners

vector<ThreadPool *> LibThread::Scheduler::thread_owners
private

Definition at line 1659 of file shared.cc.

◆ thread_queues

vector<JobQueue *> LibThread::Scheduler::thread_queues
private

Definition at line 1661 of file shared.cc.

◆ threads

vector<ThreadState *> LibThread::Scheduler::threads
private

Definition at line 1658 of file shared.cc.


The documentation for this class was generated from the following file: